Two patterns added: 1. ProjectsWorker now does URN-discover for kinds without a dedicated sync worker (spaces_bucket, managed_db, k8s_cluster, etc.). For these, it inserts a minimal placeholder row when the URN points to something not yet in inventory. Kinds with dedicated workers (droplet, snapshot, volume, etc.) still get attribution-only — the worker is source of truth for richer attrs. Implemented by splitting attribute_or_discover/4 on a @dedicated_kinds whitelist. 2. New BackupsWorker pulls /v2/droplets/:id/backups for each active droplet. DO automated backups aren't in /v2/snapshots; they live per droplet. Cron: hourly at :41. Kind="droplet_backup". URN normalization extended for two more aliases DO emits: "volumesnapshot" → snapshot (was creating a duplicate row) "image" → snapshot (DO droplet snapshots show as do:image:id) Billing.find_resource/1 gets a kind-specific clause for droplet_backup that matches to the parent droplet by name, since invoice lines for backups read "<droplet-name> (Weekly Backup Services)" — the line is a per-droplet subscription, not a per-backup-snapshot fee. Live verified on the same April 2026 invoice: - 6 Spaces buckets discovered via URN (account has 6, only 1 visible in the invoice as the $5 subscription line — that's account-level so it can't tie to a specific bucket, expected). - 4 droplet backups discovered via BackupsWorker; the git.sky-ai.com backup line now matches (repo.sky-ai.com backup line can't match — that droplet was destroyed). - Of 16 unmatched lines: 11 are destroyed historic resources, 1 is GST, 1 is the account-level Spaces subscription, 3 are likely tiny snapshot name variances. Effectively ~100% of currently-existing billable resources match. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
192 lines
5.7 KiB
Elixir
192 lines
5.7 KiB
Elixir
defmodule ArcadiaCloud.Billing do
|
|
@moduledoc """
|
|
Context for cloud cost ingestion: balance snapshots, monthly invoices,
|
|
per-line-item COGS, and resource matching.
|
|
|
|
Pipeline:
|
|
BalanceWorker → cloud_balance_snapshots (hourly)
|
|
BillingHistoryWorker → cloud_invoices (headers) (daily)
|
|
InvoiceIngestWorker → cloud_cost_lines + resource match (per invoice)
|
|
|
|
Phase 1 stops here. Phase 1+ pushes matched cost_lines into skyai-finance
|
|
as expense lines (skyai-internal) or AR (tenant).
|
|
"""
|
|
|
|
import Ecto.Query, warn: false
|
|
|
|
alias ArcadiaCloud.Repo
|
|
alias ArcadiaCloud.Billing.{CloudBalanceSnapshot, CloudInvoice, CloudCostLine}
|
|
alias ArcadiaCloud.Cloud.CloudResource
|
|
|
|
# ---- balance --------------------------------------------------------------
|
|
|
|
def record_balance(attrs) do
|
|
%CloudBalanceSnapshot{}
|
|
|> CloudBalanceSnapshot.changeset(attrs)
|
|
|> Repo.insert()
|
|
end
|
|
|
|
def latest_balance(provider \\ "digitalocean") do
|
|
from(s in CloudBalanceSnapshot,
|
|
where: s.provider == ^provider,
|
|
order_by: [desc: s.generated_at],
|
|
limit: 1
|
|
)
|
|
|> Repo.one()
|
|
end
|
|
|
|
# ---- invoices -------------------------------------------------------------
|
|
|
|
def upsert_invoice(attrs) do
|
|
provider = attrs[:provider] || attrs["provider"]
|
|
invoice_id = attrs[:provider_invoice_id] || attrs["provider_invoice_id"]
|
|
|
|
case Repo.get_by(CloudInvoice, provider: provider, provider_invoice_id: invoice_id) do
|
|
nil ->
|
|
%CloudInvoice{}
|
|
|> CloudInvoice.changeset(attrs)
|
|
|> Repo.insert()
|
|
|
|
existing ->
|
|
existing
|
|
|> CloudInvoice.changeset(attrs)
|
|
|> Repo.update()
|
|
end
|
|
end
|
|
|
|
def list_invoices_needing_ingest(provider \\ "digitalocean") do
|
|
from(i in CloudInvoice,
|
|
where: i.provider == ^provider and is_nil(i.lines_ingested_at),
|
|
order_by: [desc: i.invoice_period]
|
|
)
|
|
|> Repo.all()
|
|
end
|
|
|
|
def mark_invoice_ingested(invoice) do
|
|
now = DateTime.utc_now() |> DateTime.truncate(:second)
|
|
|
|
invoice
|
|
|> CloudInvoice.changeset(%{lines_ingested_at: now, csv_fetched_at: now})
|
|
|> Repo.update()
|
|
end
|
|
|
|
def mark_invoice_pushed(invoice, finance_invoice_id) do
|
|
now = DateTime.utc_now() |> DateTime.truncate(:second)
|
|
|
|
invoice
|
|
|> CloudInvoice.changeset(%{
|
|
pushed_to_finance_at: now,
|
|
finance_invoice_id: finance_invoice_id
|
|
})
|
|
|> Repo.update()
|
|
end
|
|
|
|
# ---- cost lines -----------------------------------------------------------
|
|
|
|
@doc """
|
|
Replace all cost lines for an invoice in one transaction. CSV is the
|
|
authoritative source for an invoice's content; if we re-fetch, we
|
|
replace not merge.
|
|
"""
|
|
def replace_cost_lines(%CloudInvoice{} = invoice, line_attrs_list) when is_list(line_attrs_list) do
|
|
Repo.transaction(fn ->
|
|
Repo.delete_all(from(l in CloudCostLine, where: l.invoice_id == ^invoice.id))
|
|
|
|
Enum.each(line_attrs_list, fn attrs ->
|
|
%CloudCostLine{}
|
|
|> CloudCostLine.changeset(Map.put(attrs, :invoice_id, invoice.id))
|
|
|> Repo.insert!()
|
|
end)
|
|
end)
|
|
end
|
|
|
|
@doc """
|
|
Match unmatched cost lines for an invoice to cloud_resources by
|
|
(kind, description=name) — case-insensitive. Updates matched_at + resource_id.
|
|
Returns count of newly-matched lines.
|
|
"""
|
|
def match_cost_lines_to_resources(%CloudInvoice{id: invoice_id}) do
|
|
unmatched =
|
|
from(l in CloudCostLine,
|
|
where: l.invoice_id == ^invoice_id and is_nil(l.resource_id) and not is_nil(l.kind)
|
|
)
|
|
|> Repo.all()
|
|
|
|
now = DateTime.utc_now() |> DateTime.truncate(:second)
|
|
|
|
Enum.reduce(unmatched, 0, fn line, acc ->
|
|
case find_resource(line) do
|
|
%CloudResource{id: rid} ->
|
|
line
|
|
|> CloudCostLine.changeset(%{resource_id: rid, matched_at: now})
|
|
|> Repo.update!()
|
|
|
|
acc + 1
|
|
|
|
nil ->
|
|
acc
|
|
end
|
|
end)
|
|
end
|
|
|
|
# Droplet backups are billed as a per-droplet subscription. The CSV line
|
|
# description carries the droplet name ("git.sky-ai.com (Weekly Backup
|
|
# Services)") so we match the line to the parent droplet, not a specific
|
|
# backup snapshot.
|
|
defp find_resource(%CloudCostLine{kind: "droplet_backup", description: desc})
|
|
when is_binary(desc) do
|
|
find_by_kind_and_name("droplet", desc)
|
|
end
|
|
|
|
defp find_resource(%CloudCostLine{kind: kind, description: desc})
|
|
when is_binary(desc) and is_binary(kind) do
|
|
find_by_kind_and_name(kind, desc)
|
|
end
|
|
|
|
defp find_resource(_), do: nil
|
|
|
|
defp find_by_kind_and_name(kind, description) do
|
|
name_lower = description |> extract_name() |> String.downcase()
|
|
|
|
from(r in CloudResource,
|
|
where:
|
|
r.kind == ^kind and is_nil(r.deleted_at) and
|
|
fragment("LOWER(?) = ?", r.name, ^name_lower),
|
|
limit: 1
|
|
)
|
|
|> Repo.one()
|
|
end
|
|
|
|
# DO CSV description is often "name (size_slug)" or "name-1234 (region) NGB Snapshot".
|
|
# Strip everything after the first " (" — best-effort name extraction.
|
|
defp extract_name(desc) do
|
|
case String.split(desc, " (", parts: 2) do
|
|
[name | _] -> String.trim(name)
|
|
_ -> desc
|
|
end
|
|
end
|
|
|
|
def list_cost_lines(opts \\ []) do
|
|
base =
|
|
from(l in CloudCostLine,
|
|
order_by: [desc: l.invoice_period, desc: l.amount_cents]
|
|
)
|
|
|
|
base
|
|
|> maybe_filter(:invoice_period, opts[:period])
|
|
|> maybe_filter(:kind, opts[:kind])
|
|
|> maybe_filter(:resource_id, opts[:resource_id])
|
|
|> maybe_limit(opts[:limit])
|
|
|> Repo.all()
|
|
end
|
|
|
|
defp maybe_filter(query, _field, nil), do: query
|
|
|
|
defp maybe_filter(query, field, value) do
|
|
from(l in query, where: field(l, ^field) == ^value)
|
|
end
|
|
|
|
defp maybe_limit(query, nil), do: query
|
|
defp maybe_limit(query, n) when is_integer(n), do: from(q in query, limit: ^n)
|
|
end
|