From e1f0aedcf7df3af586675a7011ad365b145bd0ee Mon Sep 17 00:00:00 2001 From: Giuliano Silvestro Date: Thu, 21 May 2026 08:29:36 +1000 Subject: [PATCH] Receive llm_usage_recorded events from arcadia-llm-gateway MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit POST /api/v1/integrations/llm-usage stores priced LLM usage events (idempotent on gateway_request_id) in llm_usage_events. The gateway is the LLM-pricing authority — arcadia-cloud trusts the charge it sends rather than re-pricing. The monthly invoice rollup now appends an llm_usage line per deployment alongside the infra quote lines; the exact decimal charges are summed and rounded to cents once. Closes the gateway→cloud billing loop. Co-Authored-By: Claude Opus 4.7 (1M context) --- lib/arcadia_cloud/billing/llm_usage_event.ex | 43 +++++++ lib/arcadia_cloud/invoicing.ex | 7 +- lib/arcadia_cloud/llm_usage.ex | 110 ++++++++++++++++++ .../controllers/integrations_controller.ex | 69 +++++++++++ lib/arcadia_cloud_web/router.ex | 2 + ...20260521000000_create_llm_usage_events.exs | 39 +++++++ 6 files changed, 269 insertions(+), 1 deletion(-) create mode 100644 lib/arcadia_cloud/billing/llm_usage_event.ex create mode 100644 lib/arcadia_cloud/llm_usage.ex create mode 100644 lib/arcadia_cloud_web/controllers/integrations_controller.ex create mode 100644 priv/repo/migrations/20260521000000_create_llm_usage_events.exs diff --git a/lib/arcadia_cloud/billing/llm_usage_event.ex b/lib/arcadia_cloud/billing/llm_usage_event.ex new file mode 100644 index 0000000..771fd24 --- /dev/null +++ b/lib/arcadia_cloud/billing/llm_usage_event.ex @@ -0,0 +1,43 @@ +defmodule ArcadiaCloud.Billing.LlmUsageEvent do + use Ecto.Schema + import Ecto.Changeset + + @primary_key {:id, :binary_id, autogenerate: true} + @foreign_key_type :binary_id + + schema "llm_usage_events" do + field :gateway_request_id, :string + field :tenant_id, :string + field :deployment_id, :string + + field :provider, :string + field :model, :string + field :request_kind, :string + + field :input_tokens, :integer, default: 0 + field :output_tokens, :integer, default: 0 + field :cached_input_tokens, :integer, default: 0 + field :total_tokens, :integer, default: 0 + + field :upstream_cost, :decimal + field :customer_charge, :decimal + field :customer_charge_cents, :integer, default: 0 + field :markup_mode, :string + + field :occurred_at, :utc_datetime_usec + + timestamps(type: :utc_datetime, updated_at: false) + end + + @required ~w(gateway_request_id tenant_id occurred_at)a + @optional ~w(deployment_id provider model request_kind input_tokens + output_tokens cached_input_tokens total_tokens upstream_cost + customer_charge customer_charge_cents markup_mode)a + + def changeset(event, attrs) do + event + |> cast(attrs, @required ++ @optional) + |> validate_required(@required) + |> unique_constraint(:gateway_request_id) + end +end diff --git a/lib/arcadia_cloud/invoicing.ex b/lib/arcadia_cloud/invoicing.ex index aa460e2..5e84080 100644 --- a/lib/arcadia_cloud/invoicing.ex +++ b/lib/arcadia_cloud/invoicing.ex @@ -77,7 +77,12 @@ defmodule ArcadiaCloud.Invoicing do }) |> Repo.insert() - lines = Enum.flat_map(subscriptions, &subscription_lines(&1, period_start, period_end)) + # Infra lines from the quote engine + LLM-token lines from metered + # gateway usage (arcadia-llm-gateway pushes priced usage events). + lines = + Enum.flat_map(subscriptions, &subscription_lines(&1, period_start, period_end)) ++ + ArcadiaCloud.LlmUsage.period_lines(tenant_id, period_start, period_end) + Enum.each(lines, &insert_line(invoice.id, &1)) subtotal = Enum.reduce(lines, 0, &(&1.amount_cents + &2)) diff --git a/lib/arcadia_cloud/llm_usage.ex b/lib/arcadia_cloud/llm_usage.ex new file mode 100644 index 0000000..e0f8bfe --- /dev/null +++ b/lib/arcadia_cloud/llm_usage.ex @@ -0,0 +1,110 @@ +defmodule ArcadiaCloud.LlmUsage do + @moduledoc """ + Stores `llm_usage_recorded` events pushed by arcadia-llm-gateway and + rolls them into tenant invoices. + + The gateway is the LLM-pricing authority — it sends the already-priced + `customer_charge_cents` per request. arcadia-cloud does not re-price; + it sums received events per deployment into invoice lines. + + Note on `plan_inclusive` LLM pricing: the per-request charge the + gateway sends is the *marginal* (overage-rate) price. For + `flat_percent` / `fixed_per_token` policies the sum equals the bill + exactly. If a tenant is put on a `plan_inclusive` LLM policy, a + period true-up against the gateway's authoritative `period_charge` + would be needed — none are in use today. + """ + + import Ecto.Query, warn: false + + alias ArcadiaCloud.Repo + alias ArcadiaCloud.Billing.LlmUsageEvent + + @doc """ + Idempotently records an event. A repeated `gateway_request_id` (a + gateway retry) is a no-op. Returns `{:ok, :recorded | :duplicate}`. + """ + def record_event(attrs) do + gateway_request_id = attrs[:gateway_request_id] || attrs["gateway_request_id"] + + if gateway_request_id && Repo.exists?(seen(gateway_request_id)) do + {:ok, :duplicate} + else + insert_event(attrs) + end + end + + defp insert_event(attrs) do + case %LlmUsageEvent{} |> LlmUsageEvent.changeset(attrs) |> Repo.insert() do + {:ok, _event} -> + {:ok, :recorded} + + {:error, %{errors: errors} = changeset} -> + # A concurrent insert of the same event tripped the unique index. + if Keyword.has_key?(errors, :gateway_request_id), + do: {:ok, :duplicate}, + else: {:error, changeset} + end + end + + defp seen(gateway_request_id) do + from(e in LlmUsageEvent, where: e.gateway_request_id == ^gateway_request_id) + end + + @doc """ + Invoice lines for a tenant's LLM usage in `[period_start, period_end]`, + one per deployment. Returns `[]` when there was no usage. + """ + def period_lines(tenant_id, %Date{} = period_start, %Date{} = period_end) do + from = DateTime.new!(period_start, ~T[00:00:00], "Etc/UTC") + to = DateTime.new!(Date.add(period_end, 1), ~T[00:00:00], "Etc/UTC") + + from(e in LlmUsageEvent, + where: e.tenant_id == ^tenant_id and e.occurred_at >= ^from and e.occurred_at < ^to, + group_by: e.deployment_id, + select: %{ + deployment_id: e.deployment_id, + requests: count(e.id), + input_tokens: coalesce(sum(e.input_tokens), 0), + output_tokens: coalesce(sum(e.output_tokens), 0), + # Sum the exact decimal charge, round to cents once — summing the + # gateway's pre-rounded per-request cents would drift on high + # request counts. + charge: coalesce(sum(e.customer_charge), 0) + } + ) + |> Repo.all() + |> Enum.map(&to_line/1) + |> Enum.reject(&(&1.amount_cents == 0)) + end + + defp to_line(row) do + amount_cents = + row.charge + |> to_decimal() + |> Decimal.mult(Decimal.new(100)) + |> Decimal.round(0, :half_up) + |> Decimal.to_integer() + + %{ + deployment_id: row.deployment_id, + kind: "llm_usage", + resource_kind: "llm_tokens", + description: + "LLM tokens — #{row.input_tokens} in / #{row.output_tokens} out (#{row.requests} requests)", + qty: row.input_tokens + row.output_tokens, + unit: "token", + unit_price_cents: nil, + amount_cents: amount_cents, + meta: %{ + "input_tokens" => row.input_tokens, + "output_tokens" => row.output_tokens, + "requests" => row.requests + } + } + end + + defp to_decimal(%Decimal{} = d), do: d + defp to_decimal(n) when is_integer(n), do: Decimal.new(n) + defp to_decimal(_), do: Decimal.new(0) +end diff --git a/lib/arcadia_cloud_web/controllers/integrations_controller.ex b/lib/arcadia_cloud_web/controllers/integrations_controller.ex new file mode 100644 index 0000000..d28b7e5 --- /dev/null +++ b/lib/arcadia_cloud_web/controllers/integrations_controller.ex @@ -0,0 +1,69 @@ +defmodule ArcadiaCloudWeb.IntegrationsController do + @moduledoc """ + Inbound integration hooks from sibling Sky AI services. + + `llm_usage` receives `llm_usage_recorded` events from + arcadia-llm-gateway — one per billable LLM request — and stores them + for the monthly invoice rollup. platform-admin only (the gateway + authenticates with a service JWT carrying that role). + """ + + use ArcadiaCloudWeb, :controller + + alias ArcadiaCloud.LlmUsage + + def llm_usage(conn, params) do + with :ok <- require_platform_admin(conn) do + case LlmUsage.record_event(translate(params)) do + {:ok, :recorded} -> + conn |> put_status(:created) |> json(%{status: "recorded"}) + + {:ok, :duplicate} -> + json(conn, %{status: "duplicate"}) + + {:error, changeset} -> + conn |> put_status(:unprocessable_entity) |> json(%{error: errors(changeset)}) + end + end + end + + # Maps the gateway's wire payload onto the LlmUsageEvent schema. + defp translate(params) do + %{ + gateway_request_id: params["gateway_request_id"], + tenant_id: params["tenant_id"], + deployment_id: params["deployment_id"], + provider: params["provider"], + model: params["model"], + request_kind: params["request_kind"], + input_tokens: params["input_tokens"], + output_tokens: params["output_tokens"], + cached_input_tokens: params["cached_input_tokens"], + total_tokens: params["total_tokens"], + upstream_cost: params["upstream_cost_usd"], + customer_charge: params["customer_charge_usd"], + customer_charge_cents: params["customer_charge_cents"], + markup_mode: params["markup_mode"], + occurred_at: params["occurred_at"] + } + end + + defp require_platform_admin(conn) do + identity = conn.assigns.current_identity + + if is_list(identity.roles) and "platform-admin" in identity.roles do + :ok + else + conn + |> put_status(:forbidden) + |> json(%{error: "platform_admin_required"}) + |> halt() + end + end + + defp errors(changeset) do + Ecto.Changeset.traverse_errors(changeset, fn {msg, opts} -> + Enum.reduce(opts, msg, fn {k, v}, acc -> String.replace(acc, "%{#{k}}", to_string(v)) end) + end) + end +end diff --git a/lib/arcadia_cloud_web/router.ex b/lib/arcadia_cloud_web/router.ex index 2a67f46..d53a5c8 100644 --- a/lib/arcadia_cloud_web/router.ex +++ b/lib/arcadia_cloud_web/router.ex @@ -42,5 +42,7 @@ defmodule ArcadiaCloudWeb.Router do get "/dashboard/margin", DashboardController, :margin get "/dashboard/accrual", DashboardController, :accrual + + post "/integrations/llm-usage", IntegrationsController, :llm_usage end end diff --git a/priv/repo/migrations/20260521000000_create_llm_usage_events.exs b/priv/repo/migrations/20260521000000_create_llm_usage_events.exs new file mode 100644 index 0000000..6946df9 --- /dev/null +++ b/priv/repo/migrations/20260521000000_create_llm_usage_events.exs @@ -0,0 +1,39 @@ +defmodule ArcadiaCloud.Repo.Migrations.CreateLlmUsageEvents do + use Ecto.Migration + + def change do + # `llm_usage_recorded` events received from arcadia-llm-gateway. The + # gateway is the LLM-pricing authority — it sends the already-priced + # customer charge; arcadia-cloud stores it and rolls it into the + # tenant's monthly invoice. Idempotent on gateway_request_id so a + # gateway retry never double-bills. + create table(:llm_usage_events, primary_key: false) do + add :id, :binary_id, primary_key: true + add :gateway_request_id, :string, null: false + add :tenant_id, :string, null: false + add :deployment_id, :string + + add :provider, :string + add :model, :string + add :request_kind, :string + + add :input_tokens, :integer, null: false, default: 0 + add :output_tokens, :integer, null: false, default: 0 + add :cached_input_tokens, :integer, null: false, default: 0 + add :total_tokens, :integer, null: false, default: 0 + + add :upstream_cost, :decimal + add :customer_charge, :decimal + add :customer_charge_cents, :integer, null: false, default: 0 + add :markup_mode, :string + + add :occurred_at, :utc_datetime_usec, null: false + + timestamps(type: :utc_datetime, updated_at: false) + end + + create unique_index(:llm_usage_events, [:gateway_request_id]) + create index(:llm_usage_events, [:tenant_id, :occurred_at]) + create index(:llm_usage_events, [:deployment_id]) + end +end