Files
arcadia-cloud/lib/arcadia_cloud/billing.ex
Giuliano Silvestro 4f2e52af01 Wire skyai-finance push: cloud invoices flow into Sky AI's books
After InvoiceIngestWorker writes cost lines and matches resources, it
pushes a normalized invoice payload to skyai-finance-svc's new endpoint
POST /api/v1/integrations/cloud/import-invoice. Idempotent — finance
dedups on (tenant_id, source, external_id), so re-runs return "updated"
not duplicate rows.

ArcadiaCloud.Integrations.SkyaiFinance is the HTTP client. Auth is a
short-lived JWT minted via Guardian (shared secret per memory) with
identity {tenant_id: "platform-admin", roles: ["admin"]} — the role
satisfies finance's RequireWriteRole plug. Identity + base URL are
configurable; SKYAI_FINANCE_URL env var can override the default
http://localhost:4010 for arcadia-dev / prod.

GST line detection: lines whose description contains "gst" or "tax"
get summed into amount_tax_minor; everything else into amount_net_minor;
sum stays gross. Phase 1 enough — proper tax handling lands when
real per-tenant invoices flow.

cloud_invoices gains pushed_to_finance_at + finance_invoice_id so we
don't re-push uselessly (Billing.mark_invoice_pushed/2 records both).
A missing finance config (no :skyai_finance app env) makes the push a
silent skip rather than a worker failure — environments without finance
configured still get a working ingest.

Live verified end-to-end against both services:
- April 2026 DO invoice (33 lines, $86.92) lands in finance as a row
  with gross=$86.92, tax=$7.90, source=digitalocean, tenant=platform-admin
- DigitalOcean vendor auto-created (category=cloud) under platform-admin
- Re-running the worker returns action: "updated" not "created";
  finance still has exactly 1 row for the invoice

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 22:35:52 +10:00

178 lines
5.1 KiB
Elixir

defmodule ArcadiaCloud.Billing do
@moduledoc """
Context for cloud cost ingestion: balance snapshots, monthly invoices,
per-line-item COGS, and resource matching.
Pipeline:
BalanceWorker → cloud_balance_snapshots (hourly)
BillingHistoryWorker → cloud_invoices (headers) (daily)
InvoiceIngestWorker → cloud_cost_lines + resource match (per invoice)
Phase 1 stops here. Phase 1+ pushes matched cost_lines into skyai-finance
as expense lines (skyai-internal) or AR (tenant).
"""
import Ecto.Query, warn: false
alias ArcadiaCloud.Repo
alias ArcadiaCloud.Billing.{CloudBalanceSnapshot, CloudInvoice, CloudCostLine}
alias ArcadiaCloud.Cloud.CloudResource
# ---- balance --------------------------------------------------------------
def record_balance(attrs) do
%CloudBalanceSnapshot{}
|> CloudBalanceSnapshot.changeset(attrs)
|> Repo.insert()
end
def latest_balance(provider \\ "digitalocean") do
from(s in CloudBalanceSnapshot,
where: s.provider == ^provider,
order_by: [desc: s.generated_at],
limit: 1
)
|> Repo.one()
end
# ---- invoices -------------------------------------------------------------
def upsert_invoice(attrs) do
provider = attrs[:provider] || attrs["provider"]
invoice_id = attrs[:provider_invoice_id] || attrs["provider_invoice_id"]
case Repo.get_by(CloudInvoice, provider: provider, provider_invoice_id: invoice_id) do
nil ->
%CloudInvoice{}
|> CloudInvoice.changeset(attrs)
|> Repo.insert()
existing ->
existing
|> CloudInvoice.changeset(attrs)
|> Repo.update()
end
end
def list_invoices_needing_ingest(provider \\ "digitalocean") do
from(i in CloudInvoice,
where: i.provider == ^provider and is_nil(i.lines_ingested_at),
order_by: [desc: i.invoice_period]
)
|> Repo.all()
end
def mark_invoice_ingested(invoice) do
now = DateTime.utc_now() |> DateTime.truncate(:second)
invoice
|> CloudInvoice.changeset(%{lines_ingested_at: now, csv_fetched_at: now})
|> Repo.update()
end
def mark_invoice_pushed(invoice, finance_invoice_id) do
now = DateTime.utc_now() |> DateTime.truncate(:second)
invoice
|> CloudInvoice.changeset(%{
pushed_to_finance_at: now,
finance_invoice_id: finance_invoice_id
})
|> Repo.update()
end
# ---- cost lines -----------------------------------------------------------
@doc """
Replace all cost lines for an invoice in one transaction. CSV is the
authoritative source for an invoice's content; if we re-fetch, we
replace not merge.
"""
def replace_cost_lines(%CloudInvoice{} = invoice, line_attrs_list) when is_list(line_attrs_list) do
Repo.transaction(fn ->
Repo.delete_all(from(l in CloudCostLine, where: l.invoice_id == ^invoice.id))
Enum.each(line_attrs_list, fn attrs ->
%CloudCostLine{}
|> CloudCostLine.changeset(Map.put(attrs, :invoice_id, invoice.id))
|> Repo.insert!()
end)
end)
end
@doc """
Match unmatched cost lines for an invoice to cloud_resources by
(kind, description=name) — case-insensitive. Updates matched_at + resource_id.
Returns count of newly-matched lines.
"""
def match_cost_lines_to_resources(%CloudInvoice{id: invoice_id}) do
unmatched =
from(l in CloudCostLine,
where: l.invoice_id == ^invoice_id and is_nil(l.resource_id) and not is_nil(l.kind)
)
|> Repo.all()
now = DateTime.utc_now() |> DateTime.truncate(:second)
Enum.reduce(unmatched, 0, fn line, acc ->
case find_resource(line) do
%CloudResource{id: rid} ->
line
|> CloudCostLine.changeset(%{resource_id: rid, matched_at: now})
|> Repo.update!()
acc + 1
nil ->
acc
end
end)
end
defp find_resource(%CloudCostLine{kind: kind, description: desc}) when is_binary(desc) do
name_lower = desc |> extract_name() |> String.downcase()
from(r in CloudResource,
where:
r.kind == ^kind and is_nil(r.deleted_at) and
fragment("LOWER(?) = ?", r.name, ^name_lower),
limit: 1
)
|> Repo.one()
end
defp find_resource(_), do: nil
# DO CSV description is often "name (size_slug)" or "name-1234 (region) NGB Snapshot".
# Strip everything after the first " (" — best-effort name extraction.
defp extract_name(desc) do
case String.split(desc, " (", parts: 2) do
[name | _] -> String.trim(name)
_ -> desc
end
end
def list_cost_lines(opts \\ []) do
base =
from(l in CloudCostLine,
order_by: [desc: l.invoice_period, desc: l.amount_cents]
)
base
|> maybe_filter(:invoice_period, opts[:period])
|> maybe_filter(:kind, opts[:kind])
|> maybe_filter(:resource_id, opts[:resource_id])
|> maybe_limit(opts[:limit])
|> Repo.all()
end
defp maybe_filter(query, _field, nil), do: query
defp maybe_filter(query, field, value) do
from(l in query, where: field(l, ^field) == ^value)
end
defp maybe_limit(query, nil), do: query
defp maybe_limit(query, n) when is_integer(n), do: from(q in query, limit: ^n)
end