Phase 1 cost ingestion: balance + invoices + CSV parse + resource match
Three new schemas: - cloud_balance_snapshots — hourly MTD balance/usage poll for live-accrual. - cloud_invoices — header per provider invoice, with ingest status flags. - cloud_cost_lines — per-line-item COGS, FK to cloud_resources where matched. Three new Oban workers (queue: cloud_billing): - BalanceWorker (hourly) records a snapshot. - BillingHistoryWorker (daily) discovers invoices via /v2/customers/my/ billing_history, upserts headers, enqueues an InvoiceIngestWorker for each not-yet-ingested invoice. - InvoiceIngestWorker (per-invoice) fetches /invoices/:uuid/csv, parses with NimbleCSV (header-keyed so column order shifts don't break us), replaces the invoice's line set, then matches lines to cloud_resources by (kind, name) — case-insensitive, name extracted from "name (size)" description format. DigitalOcean.Client gains get_balance / list_billing_history / get_invoice_summary / fetch_invoice_csv. The CSV endpoint returns text/csv so we bypass Req's body decoder. Cron additions: BalanceWorker hourly at :07, BillingHistoryWorker daily at 02:23. API: - GET /api/v1/billing/balance — latest snapshot, platform_admin only. - GET /api/v1/billing/cost-lines?period=YYYY-MM-DD&kind&limit — per-line COGS, platform_admin only. Live smoke against real DO billing API surfaced and fixed three CSV-format gotchas: column headers use underscores not spaces (group_description, project_name), USD column has $ prefix, dates use "YYYY-MM-DD HH:MM:SS +0000" format (space separator + RFC822 offset). Verified: 137 historical invoices discovered going back to 2014; April 2026 invoice (33 lines, $86.92 total) ingested with 6/33 lines matched to current cloud_resources. Unmatched lines are correctly historic droplets, Spaces buckets (not yet synced), and GST. NimbleCSV ~> 1.2 added as a dep. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
166
lib/arcadia_cloud/billing.ex
Normal file
166
lib/arcadia_cloud/billing.ex
Normal file
@@ -0,0 +1,166 @@
|
||||
defmodule ArcadiaCloud.Billing do
|
||||
@moduledoc """
|
||||
Context for cloud cost ingestion: balance snapshots, monthly invoices,
|
||||
per-line-item COGS, and resource matching.
|
||||
|
||||
Pipeline:
|
||||
BalanceWorker → cloud_balance_snapshots (hourly)
|
||||
BillingHistoryWorker → cloud_invoices (headers) (daily)
|
||||
InvoiceIngestWorker → cloud_cost_lines + resource match (per invoice)
|
||||
|
||||
Phase 1 stops here. Phase 1+ pushes matched cost_lines into skyai-finance
|
||||
as expense lines (skyai-internal) or AR (tenant).
|
||||
"""
|
||||
|
||||
import Ecto.Query, warn: false
|
||||
|
||||
alias ArcadiaCloud.Repo
|
||||
alias ArcadiaCloud.Billing.{CloudBalanceSnapshot, CloudInvoice, CloudCostLine}
|
||||
alias ArcadiaCloud.Cloud.CloudResource
|
||||
|
||||
# ---- balance --------------------------------------------------------------
|
||||
|
||||
def record_balance(attrs) do
|
||||
%CloudBalanceSnapshot{}
|
||||
|> CloudBalanceSnapshot.changeset(attrs)
|
||||
|> Repo.insert()
|
||||
end
|
||||
|
||||
def latest_balance(provider \\ "digitalocean") do
|
||||
from(s in CloudBalanceSnapshot,
|
||||
where: s.provider == ^provider,
|
||||
order_by: [desc: s.generated_at],
|
||||
limit: 1
|
||||
)
|
||||
|> Repo.one()
|
||||
end
|
||||
|
||||
# ---- invoices -------------------------------------------------------------
|
||||
|
||||
def upsert_invoice(attrs) do
|
||||
provider = attrs[:provider] || attrs["provider"]
|
||||
invoice_id = attrs[:provider_invoice_id] || attrs["provider_invoice_id"]
|
||||
|
||||
case Repo.get_by(CloudInvoice, provider: provider, provider_invoice_id: invoice_id) do
|
||||
nil ->
|
||||
%CloudInvoice{}
|
||||
|> CloudInvoice.changeset(attrs)
|
||||
|> Repo.insert()
|
||||
|
||||
existing ->
|
||||
existing
|
||||
|> CloudInvoice.changeset(attrs)
|
||||
|> Repo.update()
|
||||
end
|
||||
end
|
||||
|
||||
def list_invoices_needing_ingest(provider \\ "digitalocean") do
|
||||
from(i in CloudInvoice,
|
||||
where: i.provider == ^provider and is_nil(i.lines_ingested_at),
|
||||
order_by: [desc: i.invoice_period]
|
||||
)
|
||||
|> Repo.all()
|
||||
end
|
||||
|
||||
def mark_invoice_ingested(invoice) do
|
||||
now = DateTime.utc_now() |> DateTime.truncate(:second)
|
||||
|
||||
invoice
|
||||
|> CloudInvoice.changeset(%{lines_ingested_at: now, csv_fetched_at: now})
|
||||
|> Repo.update()
|
||||
end
|
||||
|
||||
# ---- cost lines -----------------------------------------------------------
|
||||
|
||||
@doc """
|
||||
Replace all cost lines for an invoice in one transaction. CSV is the
|
||||
authoritative source for an invoice's content; if we re-fetch, we
|
||||
replace not merge.
|
||||
"""
|
||||
def replace_cost_lines(%CloudInvoice{} = invoice, line_attrs_list) when is_list(line_attrs_list) do
|
||||
Repo.transaction(fn ->
|
||||
Repo.delete_all(from(l in CloudCostLine, where: l.invoice_id == ^invoice.id))
|
||||
|
||||
Enum.each(line_attrs_list, fn attrs ->
|
||||
%CloudCostLine{}
|
||||
|> CloudCostLine.changeset(Map.put(attrs, :invoice_id, invoice.id))
|
||||
|> Repo.insert!()
|
||||
end)
|
||||
end)
|
||||
end
|
||||
|
||||
@doc """
|
||||
Match unmatched cost lines for an invoice to cloud_resources by
|
||||
(kind, description=name) — case-insensitive. Updates matched_at + resource_id.
|
||||
Returns count of newly-matched lines.
|
||||
"""
|
||||
def match_cost_lines_to_resources(%CloudInvoice{id: invoice_id}) do
|
||||
unmatched =
|
||||
from(l in CloudCostLine,
|
||||
where: l.invoice_id == ^invoice_id and is_nil(l.resource_id) and not is_nil(l.kind)
|
||||
)
|
||||
|> Repo.all()
|
||||
|
||||
now = DateTime.utc_now() |> DateTime.truncate(:second)
|
||||
|
||||
Enum.reduce(unmatched, 0, fn line, acc ->
|
||||
case find_resource(line) do
|
||||
%CloudResource{id: rid} ->
|
||||
line
|
||||
|> CloudCostLine.changeset(%{resource_id: rid, matched_at: now})
|
||||
|> Repo.update!()
|
||||
|
||||
acc + 1
|
||||
|
||||
nil ->
|
||||
acc
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
defp find_resource(%CloudCostLine{kind: kind, description: desc}) when is_binary(desc) do
|
||||
name_lower = desc |> extract_name() |> String.downcase()
|
||||
|
||||
from(r in CloudResource,
|
||||
where:
|
||||
r.kind == ^kind and is_nil(r.deleted_at) and
|
||||
fragment("LOWER(?) = ?", r.name, ^name_lower),
|
||||
limit: 1
|
||||
)
|
||||
|> Repo.one()
|
||||
end
|
||||
|
||||
defp find_resource(_), do: nil
|
||||
|
||||
# DO CSV description is often "name (size_slug)" or "name-1234 (region) NGB Snapshot".
|
||||
# Strip everything after the first " (" — best-effort name extraction.
|
||||
defp extract_name(desc) do
|
||||
case String.split(desc, " (", parts: 2) do
|
||||
[name | _] -> String.trim(name)
|
||||
_ -> desc
|
||||
end
|
||||
end
|
||||
|
||||
def list_cost_lines(opts \\ []) do
|
||||
base =
|
||||
from(l in CloudCostLine,
|
||||
order_by: [desc: l.invoice_period, desc: l.amount_cents]
|
||||
)
|
||||
|
||||
base
|
||||
|> maybe_filter(:invoice_period, opts[:period])
|
||||
|> maybe_filter(:kind, opts[:kind])
|
||||
|> maybe_filter(:resource_id, opts[:resource_id])
|
||||
|> maybe_limit(opts[:limit])
|
||||
|> Repo.all()
|
||||
end
|
||||
|
||||
defp maybe_filter(query, _field, nil), do: query
|
||||
|
||||
defp maybe_filter(query, field, value) do
|
||||
from(l in query, where: field(l, ^field) == ^value)
|
||||
end
|
||||
|
||||
defp maybe_limit(query, nil), do: query
|
||||
defp maybe_limit(query, n) when is_integer(n), do: from(q in query, limit: ^n)
|
||||
end
|
||||
Reference in New Issue
Block a user