defmodule ArcadiaCloud.LlmUsage do @moduledoc """ Stores `llm_usage_recorded` events pushed by arcadia-llm-gateway and rolls them into tenant invoices. The gateway is the LLM-pricing authority — it sends the already-priced `customer_charge_cents` per request. arcadia-cloud does not re-price; it sums received events per deployment into invoice lines. Note on `plan_inclusive` LLM pricing: the per-request charge the gateway sends is the *marginal* (overage-rate) price. For `flat_percent` / `fixed_per_token` policies the sum equals the bill exactly. If a tenant is put on a `plan_inclusive` LLM policy, a period true-up against the gateway's authoritative `period_charge` would be needed — none are in use today. """ import Ecto.Query, warn: false alias ArcadiaCloud.Repo alias ArcadiaCloud.Billing.LlmUsageEvent @doc """ Idempotently records an event. A repeated `gateway_request_id` (a gateway retry) is a no-op. Returns `{:ok, :recorded | :duplicate}`. """ def record_event(attrs) do gateway_request_id = attrs[:gateway_request_id] || attrs["gateway_request_id"] if gateway_request_id && Repo.exists?(seen(gateway_request_id)) do {:ok, :duplicate} else insert_event(attrs) end end defp insert_event(attrs) do case %LlmUsageEvent{} |> LlmUsageEvent.changeset(attrs) |> Repo.insert() do {:ok, _event} -> {:ok, :recorded} {:error, %{errors: errors} = changeset} -> # A concurrent insert of the same event tripped the unique index. if Keyword.has_key?(errors, :gateway_request_id), do: {:ok, :duplicate}, else: {:error, changeset} end end defp seen(gateway_request_id) do from(e in LlmUsageEvent, where: e.gateway_request_id == ^gateway_request_id) end @doc """ Invoice lines for a tenant's LLM usage in `[period_start, period_end]`, one per deployment. Returns `[]` when there was no usage. """ def period_lines(tenant_id, %Date{} = period_start, %Date{} = period_end) do from = DateTime.new!(period_start, ~T[00:00:00], "Etc/UTC") to = DateTime.new!(Date.add(period_end, 1), ~T[00:00:00], "Etc/UTC") from(e in LlmUsageEvent, where: e.tenant_id == ^tenant_id and e.occurred_at >= ^from and e.occurred_at < ^to, group_by: e.deployment_id, select: %{ deployment_id: e.deployment_id, requests: count(e.id), input_tokens: coalesce(sum(e.input_tokens), 0), output_tokens: coalesce(sum(e.output_tokens), 0), # Sum the exact decimal charge, round to cents once — summing the # gateway's pre-rounded per-request cents would drift on high # request counts. charge: coalesce(sum(e.customer_charge), 0) } ) |> Repo.all() |> Enum.map(&to_line/1) |> Enum.reject(&(&1.amount_cents == 0)) end defp to_line(row) do amount_cents = row.charge |> to_decimal() |> Decimal.mult(Decimal.new(100)) |> Decimal.round(0, :half_up) |> Decimal.to_integer() %{ deployment_id: row.deployment_id, kind: "llm_usage", resource_kind: "llm_tokens", description: "LLM tokens — #{row.input_tokens} in / #{row.output_tokens} out (#{row.requests} requests)", qty: row.input_tokens + row.output_tokens, unit: "token", unit_price_cents: nil, amount_cents: amount_cents, meta: %{ "input_tokens" => row.input_tokens, "output_tokens" => row.output_tokens, "requests" => row.requests } } end defp to_decimal(%Decimal{} = d), do: d defp to_decimal(n) when is_integer(n), do: Decimal.new(n) defp to_decimal(_), do: Decimal.new(0) end