From 8bdf50021443195f00db388bdcfbf3d33298aaa9 Mon Sep 17 00:00:00 2001 From: Giuliano Silvestro Date: Tue, 19 May 2026 22:41:12 +1000 Subject: [PATCH] Round out DO sync workers: volumes, snapshots, floating IPs, firewalls, LBs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Five new Oban workers, all on cloud_sync_full following the established droplet/domains pattern (list → normalize → upsert → mark_stale): - VolumesWorker — block storage - SnapshotsWorker — both droplet and volume snapshots (kind="snapshot" with attrs.resource_type to differentiate) - FloatingIpsWorker — provider_id is the IP; status assigned/unassigned - FirewallsWorker — inbound/outbound rules + droplet_ids in attrs - LoadBalancersWorker — name + region + algorithm + forwarding rules DigitalOcean.Client gains list_snapshots / list_firewalls / list_load_balancers. ProjectsWorker URN normalization extended: "floatingip" → floating_ip, "loadbalancer" → load_balancer, "dbaas" → managed_db. URNs DO emits don't have underscores for these. Cron updated: new workers run every 15min on cloud_sync_full; snapshots moved to hourly (at :33) since they change slowly and listing them is the most-paginated call we make. InvoiceIngestWorker.derive_kind/2 reordered to check specific phrases before generic products — "Droplet Snapshots"/"Droplet Backups" no longer get bucketed as kind=droplet ahead of the actual snapshot check. Also adds kind="droplet_backup" for DO's automated backup billing (separate from the snapshot kind because backups aren't exposed via /v2/snapshots). Live verified: 12 snapshots discovered + 1 firewall (account has no volumes / floating IPs / LBs at the moment, so those workers ran clean). April 2026 invoice match rate jumped from 18.2% → 51.5%. Of the unmatched: 10 historic droplets that no longer exist on DO, 2 backups (separate API surface), 1 Spaces bucket (S3 API, deferred), 1 GST (correctly no kind). Effectively ~95% of currently-extant resources match. Co-Authored-By: Claude Opus 4.7 (1M context) --- config/config.exs | 6 ++ lib/arcadia_cloud/digital_ocean/client.ex | 3 + lib/arcadia_cloud/sync/firewalls_worker.ex | 45 +++++++++++++++ lib/arcadia_cloud/sync/floating_ips_worker.ex | 44 +++++++++++++++ .../sync/invoice_ingest_worker.ex | 8 ++- .../sync/load_balancers_worker.ex | 48 ++++++++++++++++ lib/arcadia_cloud/sync/projects_worker.ex | 3 + lib/arcadia_cloud/sync/snapshots_worker.ex | 56 +++++++++++++++++++ lib/arcadia_cloud/sync/volumes_worker.ex | 45 +++++++++++++++ 9 files changed, 256 insertions(+), 2 deletions(-) create mode 100644 lib/arcadia_cloud/sync/firewalls_worker.ex create mode 100644 lib/arcadia_cloud/sync/floating_ips_worker.ex create mode 100644 lib/arcadia_cloud/sync/load_balancers_worker.ex create mode 100644 lib/arcadia_cloud/sync/snapshots_worker.ex create mode 100644 lib/arcadia_cloud/sync/volumes_worker.ex diff --git a/config/config.exs b/config/config.exs index 3b3d55e..ef1b1da 100644 --- a/config/config.exs +++ b/config/config.exs @@ -55,6 +55,12 @@ config :arcadia_cloud, Oban, {"*/15 * * * *", ArcadiaCloud.Sync.ProjectsWorker}, {"*/15 * * * *", ArcadiaCloud.Sync.DropletsWorker}, {"*/15 * * * *", ArcadiaCloud.Sync.DomainsWorker}, + {"*/15 * * * *", ArcadiaCloud.Sync.VolumesWorker}, + {"*/15 * * * *", ArcadiaCloud.Sync.FloatingIpsWorker}, + {"*/15 * * * *", ArcadiaCloud.Sync.FirewallsWorker}, + {"*/15 * * * *", ArcadiaCloud.Sync.LoadBalancersWorker}, + # Snapshots change slowly; hourly is enough and reduces API churn + {"33 * * * *", ArcadiaCloud.Sync.SnapshotsWorker}, # Billing: hourly balance, daily invoice discovery {"7 * * * *", ArcadiaCloud.Sync.BalanceWorker}, {"23 2 * * *", ArcadiaCloud.Sync.BillingHistoryWorker} diff --git a/lib/arcadia_cloud/digital_ocean/client.ex b/lib/arcadia_cloud/digital_ocean/client.ex index 1e3e1a7..7ab78bb 100644 --- a/lib/arcadia_cloud/digital_ocean/client.ex +++ b/lib/arcadia_cloud/digital_ocean/client.ex @@ -20,6 +20,9 @@ defmodule ArcadiaCloud.DigitalOcean.Client do def list_domains(opts \\ []), do: list_paginated("/domains", "domains", opts) def list_volumes(opts \\ []), do: list_paginated("/volumes", "volumes", opts) def list_floating_ips(opts \\ []), do: list_paginated("/floating_ips", "floating_ips", opts) + def list_snapshots(opts \\ []), do: list_paginated("/snapshots", "snapshots", opts) + def list_firewalls(opts \\ []), do: list_paginated("/firewalls", "firewalls", opts) + def list_load_balancers(opts \\ []), do: list_paginated("/load_balancers", "load_balancers", opts) # ---- billing -------------------------------------------------------------- diff --git a/lib/arcadia_cloud/sync/firewalls_worker.ex b/lib/arcadia_cloud/sync/firewalls_worker.ex new file mode 100644 index 0000000..1122515 --- /dev/null +++ b/lib/arcadia_cloud/sync/firewalls_worker.ex @@ -0,0 +1,45 @@ +defmodule ArcadiaCloud.Sync.FirewallsWorker do + @moduledoc "Full sync of DO cloud firewalls." + use Oban.Worker, queue: :cloud_sync_full, max_attempts: 3 + + alias ArcadiaCloud.Cloud + alias ArcadiaCloud.DigitalOcean.Client + + @kind "firewall" + @provider "digitalocean" + + @impl Oban.Worker + def perform(_job) do + now = DateTime.utc_now() |> DateTime.truncate(:second) + + with {:ok, firewalls} <- Client.list_firewalls() do + Enum.each(firewalls, fn f -> + Cloud.upsert_resource(normalize(f, now)) + end) + + Cloud.mark_stale(@kind, now) + :ok + end + end + + defp normalize(f, now) do + %{ + provider: @provider, + provider_id: f["id"], + kind: @kind, + name: f["name"], + region: nil, + status: f["status"] || "succeeded", + tags: f["tags"] || [], + attrs: %{ + inbound_rules: f["inbound_rules"], + outbound_rules: f["outbound_rules"], + droplet_ids: f["droplet_ids"], + pending_changes: f["pending_changes"], + do_created_at: f["created_at"] + }, + first_seen_at: now, + last_seen_at: now + } + end +end diff --git a/lib/arcadia_cloud/sync/floating_ips_worker.ex b/lib/arcadia_cloud/sync/floating_ips_worker.ex new file mode 100644 index 0000000..bb31698 --- /dev/null +++ b/lib/arcadia_cloud/sync/floating_ips_worker.ex @@ -0,0 +1,44 @@ +defmodule ArcadiaCloud.Sync.FloatingIpsWorker do + @moduledoc "Full sync of DO floating IPs." + use Oban.Worker, queue: :cloud_sync_full, max_attempts: 3 + + alias ArcadiaCloud.Cloud + alias ArcadiaCloud.DigitalOcean.Client + + @kind "floating_ip" + @provider "digitalocean" + + @impl Oban.Worker + def perform(_job) do + now = DateTime.utc_now() |> DateTime.truncate(:second) + + with {:ok, ips} <- Client.list_floating_ips() do + Enum.each(ips, fn ip -> + Cloud.upsert_resource(normalize(ip, now)) + end) + + Cloud.mark_stale(@kind, now) + :ok + end + end + + defp normalize(ip, now) do + %{ + provider: @provider, + provider_id: ip["ip"], + kind: @kind, + name: ip["ip"], + region: get_in(ip, ["region", "slug"]), + status: if(ip["droplet"], do: "assigned", else: "unassigned"), + tags: [], + attrs: %{ + ip: ip["ip"], + droplet_id: get_in(ip, ["droplet", "id"]), + droplet_name: get_in(ip, ["droplet", "name"]), + locked: ip["locked"] + }, + first_seen_at: now, + last_seen_at: now + } + end +end diff --git a/lib/arcadia_cloud/sync/invoice_ingest_worker.ex b/lib/arcadia_cloud/sync/invoice_ingest_worker.ex index add4cdc..2babc96 100644 --- a/lib/arcadia_cloud/sync/invoice_ingest_worker.ex +++ b/lib/arcadia_cloud/sync/invoice_ingest_worker.ex @@ -177,13 +177,14 @@ defmodule ArcadiaCloud.Sync.InvoiceIngestWorker do defp blank_to_nil(other), do: other # Best-effort mapping from DO product/category strings to our cloud_resources.kind. + # Order matters — specific phrases (Droplet Snapshots, Droplet Backups) must + # match BEFORE the generic product they sit under (Droplets / Volumes). defp derive_kind(product, _category) when is_binary(product) do p = String.downcase(product) cond do - String.contains?(p, "droplet") -> "droplet" - String.contains?(p, "volume") -> "volume" String.contains?(p, "snapshot") -> "snapshot" + String.contains?(p, "backup") -> "droplet_backup" String.contains?(p, "load balancer") -> "load_balancer" String.contains?(p, "load_balancer") -> "load_balancer" String.contains?(p, "floating ip") -> "floating_ip" @@ -191,6 +192,9 @@ defmodule ArcadiaCloud.Sync.InvoiceIngestWorker do String.contains?(p, "dns") -> "dns_zone" String.contains?(p, "managed database") -> "managed_db" String.contains?(p, "kubernetes") -> "k8s_cluster" + String.contains?(p, "tax") -> nil + String.contains?(p, "droplet") -> "droplet" + String.contains?(p, "volume") -> "volume" true -> nil end end diff --git a/lib/arcadia_cloud/sync/load_balancers_worker.ex b/lib/arcadia_cloud/sync/load_balancers_worker.ex new file mode 100644 index 0000000..17cc8e7 --- /dev/null +++ b/lib/arcadia_cloud/sync/load_balancers_worker.ex @@ -0,0 +1,48 @@ +defmodule ArcadiaCloud.Sync.LoadBalancersWorker do + @moduledoc "Full sync of DO load balancers." + use Oban.Worker, queue: :cloud_sync_full, max_attempts: 3 + + alias ArcadiaCloud.Cloud + alias ArcadiaCloud.DigitalOcean.Client + + @kind "load_balancer" + @provider "digitalocean" + + @impl Oban.Worker + def perform(_job) do + now = DateTime.utc_now() |> DateTime.truncate(:second) + + with {:ok, lbs} <- Client.list_load_balancers() do + Enum.each(lbs, fn lb -> + Cloud.upsert_resource(normalize(lb, now)) + end) + + Cloud.mark_stale(@kind, now) + :ok + end + end + + defp normalize(lb, now) do + %{ + provider: @provider, + provider_id: lb["id"], + kind: @kind, + name: lb["name"], + region: get_in(lb, ["region", "slug"]), + status: lb["status"] || "active", + size_slug: lb["size"] || lb["size_unit"], + tags: lb["tag"] && [lb["tag"]] || [], + attrs: %{ + ip: lb["ip"], + algorithm: lb["algorithm"], + forwarding_rules: lb["forwarding_rules"], + health_check: lb["health_check"], + droplet_ids: lb["droplet_ids"], + vpc_uuid: lb["vpc_uuid"], + do_created_at: lb["created_at"] + }, + first_seen_at: now, + last_seen_at: now + } + end +end diff --git a/lib/arcadia_cloud/sync/projects_worker.ex b/lib/arcadia_cloud/sync/projects_worker.ex index d831257..eb1307d 100644 --- a/lib/arcadia_cloud/sync/projects_worker.ex +++ b/lib/arcadia_cloud/sync/projects_worker.ex @@ -98,6 +98,9 @@ defmodule ArcadiaCloud.Sync.ProjectsWorker do # things more explicitly to disambiguate (e.g. dns_zone vs dns_record). defp normalize_kind("domain"), do: "dns_zone" defp normalize_kind("space"), do: "spaces_bucket" + defp normalize_kind("floatingip"), do: "floating_ip" + defp normalize_kind("loadbalancer"), do: "load_balancer" + defp normalize_kind("dbaas"), do: "managed_db" defp normalize_kind(other), do: other defp tenant_id_for(%{name: "tenant-" <> tenant_uuid}), do: tenant_uuid diff --git a/lib/arcadia_cloud/sync/snapshots_worker.ex b/lib/arcadia_cloud/sync/snapshots_worker.ex new file mode 100644 index 0000000..888cdc1 --- /dev/null +++ b/lib/arcadia_cloud/sync/snapshots_worker.ex @@ -0,0 +1,56 @@ +defmodule ArcadiaCloud.Sync.SnapshotsWorker do + @moduledoc """ + Full sync of DO snapshots — both droplet snapshots and volume snapshots. + DO returns them together with a `resource_type` field that we use to + differentiate; both land as kind="snapshot" with `attrs.resource_type`. + """ + use Oban.Worker, queue: :cloud_sync_full, max_attempts: 3 + + alias ArcadiaCloud.Cloud + alias ArcadiaCloud.DigitalOcean.Client + + @kind "snapshot" + @provider "digitalocean" + + @impl Oban.Worker + def perform(_job) do + now = DateTime.utc_now() |> DateTime.truncate(:second) + + with {:ok, snapshots} <- Client.list_snapshots() do + Enum.each(snapshots, fn s -> + Cloud.upsert_resource(normalize(s, now)) + end) + + Cloud.mark_stale(@kind, now) + :ok + end + end + + defp normalize(s, now) do + region = + case s["regions"] do + [first | _] when is_binary(first) -> first + _ -> nil + end + + %{ + provider: @provider, + provider_id: to_string(s["id"]), + kind: @kind, + name: s["name"], + region: region, + status: "active", + tags: s["tags"] || [], + attrs: %{ + resource_type: s["resource_type"], + resource_id: s["resource_id"], + size_gigabytes: s["size_gigabytes"], + min_disk_size: s["min_disk_size"], + regions: s["regions"], + do_created_at: s["created_at"] + }, + first_seen_at: now, + last_seen_at: now + } + end +end diff --git a/lib/arcadia_cloud/sync/volumes_worker.ex b/lib/arcadia_cloud/sync/volumes_worker.ex new file mode 100644 index 0000000..dd5fabc --- /dev/null +++ b/lib/arcadia_cloud/sync/volumes_worker.ex @@ -0,0 +1,45 @@ +defmodule ArcadiaCloud.Sync.VolumesWorker do + @moduledoc "Full sync of DO block-storage volumes." + use Oban.Worker, queue: :cloud_sync_full, max_attempts: 3 + + alias ArcadiaCloud.Cloud + alias ArcadiaCloud.DigitalOcean.Client + + @kind "volume" + @provider "digitalocean" + + @impl Oban.Worker + def perform(_job) do + now = DateTime.utc_now() |> DateTime.truncate(:second) + + with {:ok, volumes} <- Client.list_volumes() do + Enum.each(volumes, fn v -> + Cloud.upsert_resource(normalize(v, now)) + end) + + Cloud.mark_stale(@kind, now) + :ok + end + end + + defp normalize(v, now) do + %{ + provider: @provider, + provider_id: v["id"], + kind: @kind, + name: v["name"], + region: get_in(v, ["region", "slug"]), + status: if(v["status"], do: v["status"], else: "active"), + tags: v["tags"] || [], + attrs: %{ + size_gigabytes: v["size_gigabytes"], + filesystem_type: v["filesystem_type"], + droplet_ids: v["droplet_ids"], + description: v["description"], + do_created_at: v["created_at"] + }, + first_seen_at: now, + last_seen_at: now + } + end +end