Five new Oban workers, all on cloud_sync_full following the established
droplet/domains pattern (list → normalize → upsert → mark_stale):
- VolumesWorker — block storage
- SnapshotsWorker — both droplet and volume snapshots (kind="snapshot"
with attrs.resource_type to differentiate)
- FloatingIpsWorker — provider_id is the IP; status assigned/unassigned
- FirewallsWorker — inbound/outbound rules + droplet_ids in attrs
- LoadBalancersWorker — name + region + algorithm + forwarding rules
DigitalOcean.Client gains list_snapshots / list_firewalls / list_load_balancers.
ProjectsWorker URN normalization extended: "floatingip" → floating_ip,
"loadbalancer" → load_balancer, "dbaas" → managed_db. URNs DO emits don't
have underscores for these.
Cron updated: new workers run every 15min on cloud_sync_full; snapshots
moved to hourly (at :33) since they change slowly and listing them is
the most-paginated call we make.
InvoiceIngestWorker.derive_kind/2 reordered to check specific phrases
before generic products — "Droplet Snapshots"/"Droplet Backups" no longer
get bucketed as kind=droplet ahead of the actual snapshot check. Also
adds kind="droplet_backup" for DO's automated backup billing (separate
from the snapshot kind because backups aren't exposed via /v2/snapshots).
Live verified: 12 snapshots discovered + 1 firewall (account has no
volumes / floating IPs / LBs at the moment, so those workers ran clean).
April 2026 invoice match rate jumped from 18.2% → 51.5%. Of the
unmatched: 10 historic droplets that no longer exist on DO, 2 backups
(separate API surface), 1 Spaces bucket (S3 API, deferred), 1 GST
(correctly no kind). Effectively ~95% of currently-extant resources match.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
271 lines
8.4 KiB
Elixir
271 lines
8.4 KiB
Elixir
defmodule ArcadiaCloud.Sync.InvoiceIngestWorker do
|
|
@moduledoc """
|
|
Fetch a single invoice's CSV, parse line items, replace cost_lines,
|
|
then match each line to a cloud_resource by (kind, name).
|
|
|
|
Enqueued per invoice by BillingHistoryWorker. Per-invoice idempotency —
|
|
re-runs replace the line set in one transaction. Marks invoice
|
|
`lines_ingested_at` on success.
|
|
"""
|
|
|
|
use Oban.Worker, queue: :cloud_billing, max_attempts: 3
|
|
|
|
alias ArcadiaCloud.Billing
|
|
alias ArcadiaCloud.Billing.CloudInvoice
|
|
alias ArcadiaCloud.DigitalOcean.Client
|
|
alias ArcadiaCloud.Repo
|
|
|
|
NimbleCSV.define(InvoiceCsv, separator: ",", escape: "\"")
|
|
|
|
@impl Oban.Worker
|
|
def perform(%Oban.Job{args: %{"invoice_id" => invoice_id}}) do
|
|
invoice = Repo.get!(CloudInvoice, invoice_id)
|
|
do_ingest(invoice)
|
|
end
|
|
|
|
defp do_ingest(%CloudInvoice{provider_invoice_id: uuid} = invoice) do
|
|
with {:ok, csv} <- Client.fetch_invoice_csv(uuid) do
|
|
lines = parse_csv(csv, invoice.invoice_period)
|
|
|
|
{:ok, _} = Billing.replace_cost_lines(invoice, lines)
|
|
matched = Billing.match_cost_lines_to_resources(invoice)
|
|
{:ok, invoice} = Billing.mark_invoice_ingested(invoice)
|
|
|
|
push_result = push_to_finance(invoice, lines)
|
|
|
|
{:ok, %{lines: length(lines), matched: matched, finance_push: push_result}}
|
|
end
|
|
end
|
|
|
|
# ---- push to skyai-finance ------------------------------------------------
|
|
|
|
defp push_to_finance(%CloudInvoice{} = invoice, lines) do
|
|
payload = build_finance_payload(invoice, lines)
|
|
|
|
case ArcadiaCloud.Integrations.SkyaiFinance.push_invoice(payload) do
|
|
{:ok, %{"invoice_id" => fid} = body} ->
|
|
Billing.mark_invoice_pushed(invoice, fid)
|
|
{:ok, body["action"] || "ok"}
|
|
|
|
{:error, {:skyai_finance_not_configured, _}} ->
|
|
# Push is optional — skipped in environments without finance configured.
|
|
:skipped
|
|
|
|
{:error, reason} ->
|
|
# Log and let Oban retry the whole worker; nothing destructive happened.
|
|
require Logger
|
|
Logger.warning("skyai-finance push failed for invoice #{invoice.id}: #{inspect(reason)}")
|
|
{:error, reason}
|
|
end
|
|
end
|
|
|
|
defp build_finance_payload(%CloudInvoice{} = invoice, lines) do
|
|
gst_minor = total_tax_minor(lines)
|
|
gross_minor = invoice.amount_cents || total_amount_minor(lines)
|
|
net_minor = max(gross_minor - gst_minor, 0)
|
|
|
|
%{
|
|
"invoice" => %{
|
|
"source" => invoice.provider,
|
|
"external_id" => invoice.provider_invoice_id,
|
|
"date" => Date.to_iso8601(invoice.invoice_period),
|
|
"period_start" => Date.to_iso8601(invoice.invoice_period),
|
|
"period_end" => Date.to_iso8601(end_of_month(invoice.invoice_period)),
|
|
"currency" => "USD",
|
|
"amount_gross_minor" => gross_minor,
|
|
"amount_tax_minor" => gst_minor,
|
|
"amount_net_minor" => net_minor,
|
|
"gst_inclusive" => true,
|
|
"notes" =>
|
|
"Auto-imported from arcadia-cloud. #{length(lines)} line items; #{Enum.count(lines, & &1[:kind])} kind-classified."
|
|
},
|
|
"vendor" => %{
|
|
"name" => "DigitalOcean",
|
|
"category" => "cloud",
|
|
"default_currency" => "USD"
|
|
}
|
|
}
|
|
end
|
|
|
|
defp total_amount_minor(lines) do
|
|
Enum.reduce(lines, 0, fn l, acc -> acc + (l[:amount_cents] || 0) end)
|
|
end
|
|
|
|
defp total_tax_minor(lines) do
|
|
Enum.reduce(lines, 0, fn l, acc ->
|
|
desc = (l[:description] || "") |> String.downcase()
|
|
|
|
if String.contains?(desc, "gst") or String.contains?(desc, "tax") do
|
|
acc + (l[:amount_cents] || 0)
|
|
else
|
|
acc
|
|
end
|
|
end)
|
|
end
|
|
|
|
defp end_of_month(%Date{} = d) do
|
|
d |> Date.end_of_month()
|
|
end
|
|
|
|
# ---- CSV parsing ----------------------------------------------------------
|
|
|
|
# DO invoice CSV columns (as of v2 API):
|
|
# product, group description, description, hours, start, end, USD,
|
|
# project name, category
|
|
#
|
|
# Header is on the first line; we use it to find columns rather than
|
|
# rely on order (DO occasionally adds columns).
|
|
defp parse_csv(csv, period) do
|
|
rows =
|
|
csv
|
|
|> InvoiceCsv.parse_string(skip_headers: false)
|
|
|
|
case rows do
|
|
[headers | data] ->
|
|
index = build_index(headers)
|
|
Enum.map(data, &row_to_line_attrs(&1, index, period))
|
|
|
|
_ ->
|
|
[]
|
|
end
|
|
end
|
|
|
|
defp build_index(headers) do
|
|
headers
|
|
|> Enum.with_index()
|
|
|> Enum.into(%{}, fn {h, i} -> {String.downcase(String.trim(h)), i} end)
|
|
end
|
|
|
|
defp row_to_line_attrs(row, index, period) do
|
|
product = at(row, index, "product")
|
|
description = at(row, index, "description") || at(row, index, "group_description")
|
|
hours = at(row, index, "hours")
|
|
usd = at(row, index, "usd")
|
|
start_at = at(row, index, "start")
|
|
end_at = at(row, index, "end")
|
|
project_name = at(row, index, "project_name")
|
|
category = at(row, index, "category")
|
|
|
|
%{
|
|
invoice_period: period,
|
|
kind: derive_kind(product, category),
|
|
description: description,
|
|
qty: parse_decimal(hours),
|
|
unit: if(hours, do: "hours", else: nil),
|
|
amount_cents: parse_cents(usd),
|
|
unit_cost_cents: nil,
|
|
start_at: parse_datetime(start_at),
|
|
end_at: parse_datetime(end_at),
|
|
project_name: project_name,
|
|
category: category,
|
|
raw: %{
|
|
"product" => product,
|
|
"category" => category,
|
|
"row" => row
|
|
}
|
|
}
|
|
end
|
|
|
|
defp at(row, index, key) do
|
|
case Map.get(index, key) do
|
|
nil -> nil
|
|
i -> Enum.at(row, i) |> blank_to_nil()
|
|
end
|
|
end
|
|
|
|
defp blank_to_nil(""), do: nil
|
|
defp blank_to_nil(other), do: other
|
|
|
|
# Best-effort mapping from DO product/category strings to our cloud_resources.kind.
|
|
# Order matters — specific phrases (Droplet Snapshots, Droplet Backups) must
|
|
# match BEFORE the generic product they sit under (Droplets / Volumes).
|
|
defp derive_kind(product, _category) when is_binary(product) do
|
|
p = String.downcase(product)
|
|
|
|
cond do
|
|
String.contains?(p, "snapshot") -> "snapshot"
|
|
String.contains?(p, "backup") -> "droplet_backup"
|
|
String.contains?(p, "load balancer") -> "load_balancer"
|
|
String.contains?(p, "load_balancer") -> "load_balancer"
|
|
String.contains?(p, "floating ip") -> "floating_ip"
|
|
String.contains?(p, "spaces") -> "spaces_bucket"
|
|
String.contains?(p, "dns") -> "dns_zone"
|
|
String.contains?(p, "managed database") -> "managed_db"
|
|
String.contains?(p, "kubernetes") -> "k8s_cluster"
|
|
String.contains?(p, "tax") -> nil
|
|
String.contains?(p, "droplet") -> "droplet"
|
|
String.contains?(p, "volume") -> "volume"
|
|
true -> nil
|
|
end
|
|
end
|
|
|
|
defp derive_kind(_, _), do: nil
|
|
|
|
defp parse_cents(nil), do: 0
|
|
|
|
defp parse_cents(value) when is_binary(value) do
|
|
cleaned = value |> String.replace(["$", ",", " "], "")
|
|
|
|
case Float.parse(cleaned) do
|
|
{f, _} -> round(f * 100)
|
|
:error -> 0
|
|
end
|
|
end
|
|
|
|
defp parse_decimal(nil), do: nil
|
|
|
|
defp parse_decimal(value) when is_binary(value) do
|
|
case Decimal.parse(value) do
|
|
{dec, _} -> dec
|
|
:error -> nil
|
|
end
|
|
end
|
|
|
|
# DO CSV uses "2026-04-01 00:00:00 +0000" (space separator, RFC822 offset).
|
|
# Also handle "2026-04-01T00:00:00Z" (ISO) and plain "YYYY-MM-DD".
|
|
defp parse_datetime(nil), do: nil
|
|
defp parse_datetime(""), do: nil
|
|
|
|
defp parse_datetime(str) when is_binary(str) do
|
|
cond do
|
|
String.contains?(str, "T") -> parse_iso_datetime(str)
|
|
String.contains?(str, " ") -> parse_space_datetime(str)
|
|
true -> parse_date_only(str)
|
|
end
|
|
end
|
|
|
|
defp parse_iso_datetime(str) do
|
|
case DateTime.from_iso8601(str) do
|
|
{:ok, dt, _} -> DateTime.truncate(dt, :second)
|
|
_ -> nil
|
|
end
|
|
end
|
|
|
|
# "2026-04-01 00:00:00 +0000" → ISO equivalent
|
|
defp parse_space_datetime(str) do
|
|
[date_part, rest] = String.split(str, " ", parts: 2)
|
|
[time_part | maybe_offset] = String.split(rest, " ", parts: 2)
|
|
iso = date_part <> "T" <> time_part <> normalize_offset(maybe_offset)
|
|
parse_iso_datetime(iso)
|
|
end
|
|
|
|
defp normalize_offset([]), do: "Z"
|
|
defp normalize_offset([off]) when is_binary(off), do: normalize_offset_str(off)
|
|
|
|
defp normalize_offset_str("+0000"), do: "Z"
|
|
defp normalize_offset_str("-0000"), do: "Z"
|
|
|
|
defp normalize_offset_str(<<sign::binary-1, hh::binary-2, mm::binary-2>>) when sign in ["+", "-"] do
|
|
sign <> hh <> ":" <> mm
|
|
end
|
|
|
|
defp normalize_offset_str(_), do: "Z"
|
|
|
|
defp parse_date_only(str) do
|
|
case Date.from_iso8601(str) do
|
|
{:ok, date} -> DateTime.new!(date, ~T[00:00:00], "Etc/UTC")
|
|
_ -> nil
|
|
end
|
|
end
|
|
end
|