Files
component-system/lib/components_elixir/inventory.ex
2025-09-20 11:52:43 +02:00

503 lines
13 KiB
Elixir

defmodule ComponentsElixir.Inventory do
@moduledoc """
The Inventory context: managing components and categories.
"""
import Ecto.Query, warn: false
alias ComponentsElixir.Repo
alias ComponentsElixir.Inventory.{Category, Component, StorageLocation}
## Storage Locations
@doc """
Returns the list of storage locations with optimized parent preloading.
Preloads up to 5 levels of parent associations to minimize database queries in full_path calculations.
"""
def list_storage_locations do
# Get all locations with preloaded parents in a single query
locations =
StorageLocation
|> order_by([sl], asc: sl.name)
|> preload(parent: [parent: [parent: [parent: :parent]]])
|> Repo.all()
# Ensure AprilTag SVGs exist for all locations
spawn(fn ->
ComponentsElixir.AprilTag.generate_all_apriltag_svgs()
end)
locations
end
@doc """
Returns the list of root storage locations (no parent).
"""
def list_root_storage_locations do
StorageLocation
|> where([sl], is_nil(sl.parent_id))
|> order_by([sl], asc: sl.name)
|> Repo.all()
end
@doc """
Gets a single storage location with preloaded associations.
"""
def get_storage_location!(id) do
StorageLocation
|> preload(:parent)
|> Repo.get!(id)
end
@doc """
Gets a storage location by AprilTag ID.
"""
def get_storage_location_by_apriltag_id(apriltag_id) do
StorageLocation
|> where([sl], sl.apriltag_id == ^apriltag_id)
|> preload(:parent)
|> Repo.one()
end
@doc """
Creates a storage location.
"""
def create_storage_location(attrs \\ %{}) do
# Convert string keys to atoms to maintain consistency
attrs = normalize_string_keys(attrs)
result =
%StorageLocation{}
|> StorageLocation.changeset(attrs)
|> Repo.insert()
case result do
{:ok, location} ->
{:ok, location}
error ->
error
end
end
@doc """
Updates a storage location.
"""
def update_storage_location(%StorageLocation{} = storage_location, attrs) do
# Convert string keys to atoms to maintain consistency
attrs = normalize_string_keys(attrs)
result =
storage_location
|> StorageLocation.changeset(attrs)
|> Repo.update()
case result do
{:ok, updated_location} ->
{:ok, updated_location}
error ->
error
end
end
@doc """
Deletes a storage location.
"""
def delete_storage_location(%StorageLocation{} = storage_location) do
Repo.delete(storage_location)
end
@doc """
Returns an `%Ecto.Changeset{}` for tracking storage location changes.
"""
def change_storage_location(%StorageLocation{} = storage_location, attrs \\ %{}) do
StorageLocation.changeset(storage_location, attrs)
end
@doc """
Parses an AprilTag ID and returns storage location information.
"""
def parse_apriltag_id(apriltag_id) when is_integer(apriltag_id) do
case get_storage_location_by_apriltag_id(apriltag_id) do
nil ->
{:error, :not_found}
location ->
{:ok,
%{
type: :storage_location,
location: location,
apriltag_id: apriltag_id
}}
end
end
@doc """
Computes the path for a storage location (for display purposes).
"""
def compute_storage_location_path(nil), do: nil
def compute_storage_location_path(%StorageLocation{} = location) do
StorageLocation.full_path(location)
end
# Convert string keys to atoms for consistency
defp normalize_string_keys(attrs) when is_map(attrs) do
Enum.reduce(attrs, %{}, fn
{key, value}, acc when is_binary(key) ->
atom_key = String.to_atom(key)
Map.put(acc, atom_key, value)
{key, value}, acc ->
Map.put(acc, key, value)
end)
end
## Categories
@doc """
Returns the list of categories with optimized parent preloading.
Preloads up to 5 levels of parent associations to minimize database queries in full_path calculations.
"""
def list_categories do
Category
|> preload(parent: [parent: [parent: [parent: :parent]]])
|> Repo.all()
end
@doc """
Gets a single category.
"""
def get_category!(id), do: Repo.get!(Category, id)
@doc """
Creates a category.
"""
def create_category(attrs \\ %{}) do
%Category{}
|> Category.changeset(attrs)
|> Repo.insert()
end
@doc """
Updates a category.
"""
def update_category(%Category{} = category, attrs) do
category
|> Category.changeset(attrs)
|> Repo.update()
end
@doc """
Deletes a category.
"""
def delete_category(%Category{} = category) do
Repo.delete(category)
end
@doc """
Returns an `%Ecto.Changeset{}` for tracking category changes.
"""
def change_category(%Category{} = category, attrs \\ %{}) do
Category.changeset(category, attrs)
end
@doc """
Gets all category IDs that are descendants of the given category ID, including the category itself.
This is used for filtering components by category and all its subcategories.
Returns an empty list if the category doesn't exist.
Note: This implementation loads all categories into memory for traversal, which is efficient
for typical category tree sizes (hundreds of categories). For very large category trees,
a recursive CTE query could be used instead.
"""
def get_category_and_descendant_ids(category_id) when is_integer(category_id) do
categories = list_categories()
# Verify the category exists before getting descendants
case Enum.find(categories, &(&1.id == category_id)) do
nil ->
[]
_category ->
ComponentsElixir.Inventory.Hierarchical.descendant_ids(
categories,
category_id,
& &1.parent_id
)
end
end
def get_category_and_descendant_ids(_), do: []
@doc """
Gets all storage location IDs that are descendants of the given storage location ID, including the location itself.
This is used for filtering components by storage location and all its sub-locations.
Returns an empty list if the storage location doesn't exist.
Note: This implementation loads all storage locations into memory for traversal, which is efficient
for typical storage location tree sizes (hundreds of locations). For very large storage location trees,
a recursive CTE query could be used instead.
"""
def get_storage_location_and_descendant_ids(storage_location_id)
when is_integer(storage_location_id) do
storage_locations = list_storage_locations()
# Verify the storage location exists before getting descendants
case Enum.find(storage_locations, &(&1.id == storage_location_id)) do
nil ->
[]
_storage_location ->
ComponentsElixir.Inventory.Hierarchical.descendant_ids(
storage_locations,
storage_location_id,
& &1.parent_id
)
end
end
def get_storage_location_and_descendant_ids(_), do: []
## Components
@doc """
Returns the list of components.
"""
def list_components(opts \\ []) do
Component
|> apply_component_filters(opts)
|> apply_component_sorting(opts)
|> preload([:category, :storage_location])
|> Repo.all()
end
defp apply_component_filters(query, opts) do
Enum.reduce(opts, query, fn
{:category_id, category_id}, query when not is_nil(category_id) ->
# Get the category and all its descendant category IDs
category_ids = get_category_and_descendant_ids(category_id)
where(query, [c], c.category_id in ^category_ids)
{:storage_location_id, storage_location_id}, query when not is_nil(storage_location_id) ->
# Get the storage location and all its descendant storage location IDs
storage_location_ids = get_storage_location_and_descendant_ids(storage_location_id)
where(query, [c], c.storage_location_id in ^storage_location_ids)
{:search, search_term}, query when is_binary(search_term) and search_term != "" ->
search_pattern = "%#{search_term}%"
where(
query,
[c],
ilike(c.name, ^search_pattern) or
ilike(c.description, ^search_pattern) or
ilike(c.keywords, ^search_pattern) or
ilike(c.position, ^search_pattern)
)
_, query ->
query
end)
end
defp apply_component_sorting(query, opts) do
sort_criteria = Keyword.get(opts, :sort_criteria, "name_asc")
sort_order = get_sort_order(sort_criteria)
order_by(query, [c], ^sort_order)
end
# Map of sort criteria to their corresponding sort orders
@sort_orders %{
"name_asc" => [asc: :name, asc: :id],
"name_desc" => [desc: :name, asc: :id],
"inserted_at_asc" => [asc: :inserted_at, asc: :id],
"inserted_at_desc" => [desc: :inserted_at, asc: :id],
"updated_at_asc" => [asc: :updated_at, asc: :id],
"updated_at_desc" => [desc: :updated_at, asc: :id],
"count_asc" => [asc: :count, asc: :id],
"count_desc" => [desc: :count, asc: :id]
}
defp get_sort_order(criteria) do
Map.get(@sort_orders, criteria, asc: :name, asc: :id)
end
@doc """
Gets a single component.
"""
def get_component!(id) do
Component
|> preload([:category, :storage_location])
|> Repo.get!(id)
end
@doc """
Creates a component.
"""
def create_component(attrs \\ %{}) do
%Component{}
|> Component.changeset(attrs)
|> Repo.insert()
end
@doc """
Creates a component and downloads datasheet from URL if provided.
"""
def create_component_with_datasheet(attrs \\ %{}) do
# If a datasheet_url is provided, download it
updated_attrs =
case Map.get(attrs, "datasheet_url") do
url when is_binary(url) and url != "" ->
case ComponentsElixir.DatasheetDownloader.download_pdf_from_url(url) do
{:ok, filename} ->
Map.put(attrs, "datasheet_filename", filename)
{:error, _reason} ->
# Continue without datasheet file if download fails
attrs
end
_ ->
attrs
end
%Component{}
|> Component.changeset(updated_attrs)
|> Repo.insert()
end
@doc """
Updates a component.
"""
def update_component(%Component{} = component, attrs) do
component
|> Component.changeset(attrs)
|> Repo.update()
end
@doc """
Updates a component and downloads datasheet from URL if provided.
"""
def update_component_with_datasheet(%Component{} = component, attrs) do
# If a datasheet_url is provided and changed, download it
updated_attrs =
case Map.get(attrs, "datasheet_url") do
url when is_binary(url) and url != "" and url != component.datasheet_url ->
case ComponentsElixir.DatasheetDownloader.download_pdf_from_url(url) do
{:ok, filename} ->
# Delete old datasheet file if it exists
if component.datasheet_filename do
ComponentsElixir.DatasheetDownloader.delete_datasheet_file(
component.datasheet_filename
)
end
Map.put(attrs, "datasheet_filename", filename)
{:error, _reason} ->
# Keep existing filename if download fails
attrs
end
_ ->
attrs
end
component
|> Component.changeset(updated_attrs)
|> Repo.update()
end
@doc """
Deletes a component.
"""
def delete_component(%Component{} = component) do
Repo.delete(component)
end
@doc """
Returns an `%Ecto.Changeset{}` for tracking component changes.
"""
def change_component(%Component{} = component, attrs \\ %{}) do
Component.changeset(component, attrs)
end
@doc """
Returns inventory statistics.
"""
def get_inventory_stats do
total_components = Repo.aggregate(Component, :count, :id)
total_stock =
Component
|> Repo.aggregate(:sum, :count)
categories_with_components =
Component
|> distinct([c], c.category_id)
|> Repo.aggregate(:count, :category_id)
%{
total_components: total_components,
total_stock: total_stock,
categories_with_components: categories_with_components
}
end
@doc """
Returns component statistics (alias for get_inventory_stats for compatibility).
"""
def component_stats do
get_inventory_stats()
end
@doc """
Counts components in a specific category.
"""
def count_components_in_category(category_id) do
Component
|> where([c], c.category_id == ^category_id)
|> Repo.aggregate(:count, :id)
end
@doc """
Counts components in a specific storage location.
"""
def count_components_in_storage_location(storage_location_id) do
Component
|> where([c], c.storage_location_id == ^storage_location_id)
|> Repo.aggregate(:count, :id)
end
@doc """
Increment component stock count.
"""
def increment_component_count(%Component{} = component) do
component
|> Component.changeset(%{count: component.count + 1})
|> Repo.update()
end
@doc """
Decrement component stock count.
"""
def decrement_component_count(%Component{} = component) do
new_count = max(0, component.count - 1)
component
|> Component.changeset(%{count: new_count})
|> Repo.update()
end
@doc """
Paginate components with filters.
"""
def paginate_components(opts \\ []) do
# For now, just return all components - pagination can be added later
components = list_components(opts)
%{components: components, has_more: false}
end
end