Round out DO sync workers: volumes, snapshots, floating IPs, firewalls, LBs
Five new Oban workers, all on cloud_sync_full following the established
droplet/domains pattern (list → normalize → upsert → mark_stale):
- VolumesWorker — block storage
- SnapshotsWorker — both droplet and volume snapshots (kind="snapshot"
with attrs.resource_type to differentiate)
- FloatingIpsWorker — provider_id is the IP; status assigned/unassigned
- FirewallsWorker — inbound/outbound rules + droplet_ids in attrs
- LoadBalancersWorker — name + region + algorithm + forwarding rules
DigitalOcean.Client gains list_snapshots / list_firewalls / list_load_balancers.
ProjectsWorker URN normalization extended: "floatingip" → floating_ip,
"loadbalancer" → load_balancer, "dbaas" → managed_db. URNs DO emits don't
have underscores for these.
Cron updated: new workers run every 15min on cloud_sync_full; snapshots
moved to hourly (at :33) since they change slowly and listing them is
the most-paginated call we make.
InvoiceIngestWorker.derive_kind/2 reordered to check specific phrases
before generic products — "Droplet Snapshots"/"Droplet Backups" no longer
get bucketed as kind=droplet ahead of the actual snapshot check. Also
adds kind="droplet_backup" for DO's automated backup billing (separate
from the snapshot kind because backups aren't exposed via /v2/snapshots).
Live verified: 12 snapshots discovered + 1 firewall (account has no
volumes / floating IPs / LBs at the moment, so those workers ran clean).
April 2026 invoice match rate jumped from 18.2% → 51.5%. Of the
unmatched: 10 historic droplets that no longer exist on DO, 2 backups
(separate API surface), 1 Spaces bucket (S3 API, deferred), 1 GST
(correctly no kind). Effectively ~95% of currently-extant resources match.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -55,6 +55,12 @@ config :arcadia_cloud, Oban,
|
|||||||
{"*/15 * * * *", ArcadiaCloud.Sync.ProjectsWorker},
|
{"*/15 * * * *", ArcadiaCloud.Sync.ProjectsWorker},
|
||||||
{"*/15 * * * *", ArcadiaCloud.Sync.DropletsWorker},
|
{"*/15 * * * *", ArcadiaCloud.Sync.DropletsWorker},
|
||||||
{"*/15 * * * *", ArcadiaCloud.Sync.DomainsWorker},
|
{"*/15 * * * *", ArcadiaCloud.Sync.DomainsWorker},
|
||||||
|
{"*/15 * * * *", ArcadiaCloud.Sync.VolumesWorker},
|
||||||
|
{"*/15 * * * *", ArcadiaCloud.Sync.FloatingIpsWorker},
|
||||||
|
{"*/15 * * * *", ArcadiaCloud.Sync.FirewallsWorker},
|
||||||
|
{"*/15 * * * *", ArcadiaCloud.Sync.LoadBalancersWorker},
|
||||||
|
# Snapshots change slowly; hourly is enough and reduces API churn
|
||||||
|
{"33 * * * *", ArcadiaCloud.Sync.SnapshotsWorker},
|
||||||
# Billing: hourly balance, daily invoice discovery
|
# Billing: hourly balance, daily invoice discovery
|
||||||
{"7 * * * *", ArcadiaCloud.Sync.BalanceWorker},
|
{"7 * * * *", ArcadiaCloud.Sync.BalanceWorker},
|
||||||
{"23 2 * * *", ArcadiaCloud.Sync.BillingHistoryWorker}
|
{"23 2 * * *", ArcadiaCloud.Sync.BillingHistoryWorker}
|
||||||
|
|||||||
@@ -20,6 +20,9 @@ defmodule ArcadiaCloud.DigitalOcean.Client do
|
|||||||
def list_domains(opts \\ []), do: list_paginated("/domains", "domains", opts)
|
def list_domains(opts \\ []), do: list_paginated("/domains", "domains", opts)
|
||||||
def list_volumes(opts \\ []), do: list_paginated("/volumes", "volumes", opts)
|
def list_volumes(opts \\ []), do: list_paginated("/volumes", "volumes", opts)
|
||||||
def list_floating_ips(opts \\ []), do: list_paginated("/floating_ips", "floating_ips", opts)
|
def list_floating_ips(opts \\ []), do: list_paginated("/floating_ips", "floating_ips", opts)
|
||||||
|
def list_snapshots(opts \\ []), do: list_paginated("/snapshots", "snapshots", opts)
|
||||||
|
def list_firewalls(opts \\ []), do: list_paginated("/firewalls", "firewalls", opts)
|
||||||
|
def list_load_balancers(opts \\ []), do: list_paginated("/load_balancers", "load_balancers", opts)
|
||||||
|
|
||||||
# ---- billing --------------------------------------------------------------
|
# ---- billing --------------------------------------------------------------
|
||||||
|
|
||||||
|
|||||||
45
lib/arcadia_cloud/sync/firewalls_worker.ex
Normal file
45
lib/arcadia_cloud/sync/firewalls_worker.ex
Normal file
@@ -0,0 +1,45 @@
|
|||||||
|
defmodule ArcadiaCloud.Sync.FirewallsWorker do
|
||||||
|
@moduledoc "Full sync of DO cloud firewalls."
|
||||||
|
use Oban.Worker, queue: :cloud_sync_full, max_attempts: 3
|
||||||
|
|
||||||
|
alias ArcadiaCloud.Cloud
|
||||||
|
alias ArcadiaCloud.DigitalOcean.Client
|
||||||
|
|
||||||
|
@kind "firewall"
|
||||||
|
@provider "digitalocean"
|
||||||
|
|
||||||
|
@impl Oban.Worker
|
||||||
|
def perform(_job) do
|
||||||
|
now = DateTime.utc_now() |> DateTime.truncate(:second)
|
||||||
|
|
||||||
|
with {:ok, firewalls} <- Client.list_firewalls() do
|
||||||
|
Enum.each(firewalls, fn f ->
|
||||||
|
Cloud.upsert_resource(normalize(f, now))
|
||||||
|
end)
|
||||||
|
|
||||||
|
Cloud.mark_stale(@kind, now)
|
||||||
|
:ok
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp normalize(f, now) do
|
||||||
|
%{
|
||||||
|
provider: @provider,
|
||||||
|
provider_id: f["id"],
|
||||||
|
kind: @kind,
|
||||||
|
name: f["name"],
|
||||||
|
region: nil,
|
||||||
|
status: f["status"] || "succeeded",
|
||||||
|
tags: f["tags"] || [],
|
||||||
|
attrs: %{
|
||||||
|
inbound_rules: f["inbound_rules"],
|
||||||
|
outbound_rules: f["outbound_rules"],
|
||||||
|
droplet_ids: f["droplet_ids"],
|
||||||
|
pending_changes: f["pending_changes"],
|
||||||
|
do_created_at: f["created_at"]
|
||||||
|
},
|
||||||
|
first_seen_at: now,
|
||||||
|
last_seen_at: now
|
||||||
|
}
|
||||||
|
end
|
||||||
|
end
|
||||||
44
lib/arcadia_cloud/sync/floating_ips_worker.ex
Normal file
44
lib/arcadia_cloud/sync/floating_ips_worker.ex
Normal file
@@ -0,0 +1,44 @@
|
|||||||
|
defmodule ArcadiaCloud.Sync.FloatingIpsWorker do
|
||||||
|
@moduledoc "Full sync of DO floating IPs."
|
||||||
|
use Oban.Worker, queue: :cloud_sync_full, max_attempts: 3
|
||||||
|
|
||||||
|
alias ArcadiaCloud.Cloud
|
||||||
|
alias ArcadiaCloud.DigitalOcean.Client
|
||||||
|
|
||||||
|
@kind "floating_ip"
|
||||||
|
@provider "digitalocean"
|
||||||
|
|
||||||
|
@impl Oban.Worker
|
||||||
|
def perform(_job) do
|
||||||
|
now = DateTime.utc_now() |> DateTime.truncate(:second)
|
||||||
|
|
||||||
|
with {:ok, ips} <- Client.list_floating_ips() do
|
||||||
|
Enum.each(ips, fn ip ->
|
||||||
|
Cloud.upsert_resource(normalize(ip, now))
|
||||||
|
end)
|
||||||
|
|
||||||
|
Cloud.mark_stale(@kind, now)
|
||||||
|
:ok
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp normalize(ip, now) do
|
||||||
|
%{
|
||||||
|
provider: @provider,
|
||||||
|
provider_id: ip["ip"],
|
||||||
|
kind: @kind,
|
||||||
|
name: ip["ip"],
|
||||||
|
region: get_in(ip, ["region", "slug"]),
|
||||||
|
status: if(ip["droplet"], do: "assigned", else: "unassigned"),
|
||||||
|
tags: [],
|
||||||
|
attrs: %{
|
||||||
|
ip: ip["ip"],
|
||||||
|
droplet_id: get_in(ip, ["droplet", "id"]),
|
||||||
|
droplet_name: get_in(ip, ["droplet", "name"]),
|
||||||
|
locked: ip["locked"]
|
||||||
|
},
|
||||||
|
first_seen_at: now,
|
||||||
|
last_seen_at: now
|
||||||
|
}
|
||||||
|
end
|
||||||
|
end
|
||||||
@@ -177,13 +177,14 @@ defmodule ArcadiaCloud.Sync.InvoiceIngestWorker do
|
|||||||
defp blank_to_nil(other), do: other
|
defp blank_to_nil(other), do: other
|
||||||
|
|
||||||
# Best-effort mapping from DO product/category strings to our cloud_resources.kind.
|
# Best-effort mapping from DO product/category strings to our cloud_resources.kind.
|
||||||
|
# Order matters — specific phrases (Droplet Snapshots, Droplet Backups) must
|
||||||
|
# match BEFORE the generic product they sit under (Droplets / Volumes).
|
||||||
defp derive_kind(product, _category) when is_binary(product) do
|
defp derive_kind(product, _category) when is_binary(product) do
|
||||||
p = String.downcase(product)
|
p = String.downcase(product)
|
||||||
|
|
||||||
cond do
|
cond do
|
||||||
String.contains?(p, "droplet") -> "droplet"
|
|
||||||
String.contains?(p, "volume") -> "volume"
|
|
||||||
String.contains?(p, "snapshot") -> "snapshot"
|
String.contains?(p, "snapshot") -> "snapshot"
|
||||||
|
String.contains?(p, "backup") -> "droplet_backup"
|
||||||
String.contains?(p, "load balancer") -> "load_balancer"
|
String.contains?(p, "load balancer") -> "load_balancer"
|
||||||
String.contains?(p, "load_balancer") -> "load_balancer"
|
String.contains?(p, "load_balancer") -> "load_balancer"
|
||||||
String.contains?(p, "floating ip") -> "floating_ip"
|
String.contains?(p, "floating ip") -> "floating_ip"
|
||||||
@@ -191,6 +192,9 @@ defmodule ArcadiaCloud.Sync.InvoiceIngestWorker do
|
|||||||
String.contains?(p, "dns") -> "dns_zone"
|
String.contains?(p, "dns") -> "dns_zone"
|
||||||
String.contains?(p, "managed database") -> "managed_db"
|
String.contains?(p, "managed database") -> "managed_db"
|
||||||
String.contains?(p, "kubernetes") -> "k8s_cluster"
|
String.contains?(p, "kubernetes") -> "k8s_cluster"
|
||||||
|
String.contains?(p, "tax") -> nil
|
||||||
|
String.contains?(p, "droplet") -> "droplet"
|
||||||
|
String.contains?(p, "volume") -> "volume"
|
||||||
true -> nil
|
true -> nil
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
48
lib/arcadia_cloud/sync/load_balancers_worker.ex
Normal file
48
lib/arcadia_cloud/sync/load_balancers_worker.ex
Normal file
@@ -0,0 +1,48 @@
|
|||||||
|
defmodule ArcadiaCloud.Sync.LoadBalancersWorker do
|
||||||
|
@moduledoc "Full sync of DO load balancers."
|
||||||
|
use Oban.Worker, queue: :cloud_sync_full, max_attempts: 3
|
||||||
|
|
||||||
|
alias ArcadiaCloud.Cloud
|
||||||
|
alias ArcadiaCloud.DigitalOcean.Client
|
||||||
|
|
||||||
|
@kind "load_balancer"
|
||||||
|
@provider "digitalocean"
|
||||||
|
|
||||||
|
@impl Oban.Worker
|
||||||
|
def perform(_job) do
|
||||||
|
now = DateTime.utc_now() |> DateTime.truncate(:second)
|
||||||
|
|
||||||
|
with {:ok, lbs} <- Client.list_load_balancers() do
|
||||||
|
Enum.each(lbs, fn lb ->
|
||||||
|
Cloud.upsert_resource(normalize(lb, now))
|
||||||
|
end)
|
||||||
|
|
||||||
|
Cloud.mark_stale(@kind, now)
|
||||||
|
:ok
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp normalize(lb, now) do
|
||||||
|
%{
|
||||||
|
provider: @provider,
|
||||||
|
provider_id: lb["id"],
|
||||||
|
kind: @kind,
|
||||||
|
name: lb["name"],
|
||||||
|
region: get_in(lb, ["region", "slug"]),
|
||||||
|
status: lb["status"] || "active",
|
||||||
|
size_slug: lb["size"] || lb["size_unit"],
|
||||||
|
tags: lb["tag"] && [lb["tag"]] || [],
|
||||||
|
attrs: %{
|
||||||
|
ip: lb["ip"],
|
||||||
|
algorithm: lb["algorithm"],
|
||||||
|
forwarding_rules: lb["forwarding_rules"],
|
||||||
|
health_check: lb["health_check"],
|
||||||
|
droplet_ids: lb["droplet_ids"],
|
||||||
|
vpc_uuid: lb["vpc_uuid"],
|
||||||
|
do_created_at: lb["created_at"]
|
||||||
|
},
|
||||||
|
first_seen_at: now,
|
||||||
|
last_seen_at: now
|
||||||
|
}
|
||||||
|
end
|
||||||
|
end
|
||||||
@@ -98,6 +98,9 @@ defmodule ArcadiaCloud.Sync.ProjectsWorker do
|
|||||||
# things more explicitly to disambiguate (e.g. dns_zone vs dns_record).
|
# things more explicitly to disambiguate (e.g. dns_zone vs dns_record).
|
||||||
defp normalize_kind("domain"), do: "dns_zone"
|
defp normalize_kind("domain"), do: "dns_zone"
|
||||||
defp normalize_kind("space"), do: "spaces_bucket"
|
defp normalize_kind("space"), do: "spaces_bucket"
|
||||||
|
defp normalize_kind("floatingip"), do: "floating_ip"
|
||||||
|
defp normalize_kind("loadbalancer"), do: "load_balancer"
|
||||||
|
defp normalize_kind("dbaas"), do: "managed_db"
|
||||||
defp normalize_kind(other), do: other
|
defp normalize_kind(other), do: other
|
||||||
|
|
||||||
defp tenant_id_for(%{name: "tenant-" <> tenant_uuid}), do: tenant_uuid
|
defp tenant_id_for(%{name: "tenant-" <> tenant_uuid}), do: tenant_uuid
|
||||||
|
|||||||
56
lib/arcadia_cloud/sync/snapshots_worker.ex
Normal file
56
lib/arcadia_cloud/sync/snapshots_worker.ex
Normal file
@@ -0,0 +1,56 @@
|
|||||||
|
defmodule ArcadiaCloud.Sync.SnapshotsWorker do
|
||||||
|
@moduledoc """
|
||||||
|
Full sync of DO snapshots — both droplet snapshots and volume snapshots.
|
||||||
|
DO returns them together with a `resource_type` field that we use to
|
||||||
|
differentiate; both land as kind="snapshot" with `attrs.resource_type`.
|
||||||
|
"""
|
||||||
|
use Oban.Worker, queue: :cloud_sync_full, max_attempts: 3
|
||||||
|
|
||||||
|
alias ArcadiaCloud.Cloud
|
||||||
|
alias ArcadiaCloud.DigitalOcean.Client
|
||||||
|
|
||||||
|
@kind "snapshot"
|
||||||
|
@provider "digitalocean"
|
||||||
|
|
||||||
|
@impl Oban.Worker
|
||||||
|
def perform(_job) do
|
||||||
|
now = DateTime.utc_now() |> DateTime.truncate(:second)
|
||||||
|
|
||||||
|
with {:ok, snapshots} <- Client.list_snapshots() do
|
||||||
|
Enum.each(snapshots, fn s ->
|
||||||
|
Cloud.upsert_resource(normalize(s, now))
|
||||||
|
end)
|
||||||
|
|
||||||
|
Cloud.mark_stale(@kind, now)
|
||||||
|
:ok
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp normalize(s, now) do
|
||||||
|
region =
|
||||||
|
case s["regions"] do
|
||||||
|
[first | _] when is_binary(first) -> first
|
||||||
|
_ -> nil
|
||||||
|
end
|
||||||
|
|
||||||
|
%{
|
||||||
|
provider: @provider,
|
||||||
|
provider_id: to_string(s["id"]),
|
||||||
|
kind: @kind,
|
||||||
|
name: s["name"],
|
||||||
|
region: region,
|
||||||
|
status: "active",
|
||||||
|
tags: s["tags"] || [],
|
||||||
|
attrs: %{
|
||||||
|
resource_type: s["resource_type"],
|
||||||
|
resource_id: s["resource_id"],
|
||||||
|
size_gigabytes: s["size_gigabytes"],
|
||||||
|
min_disk_size: s["min_disk_size"],
|
||||||
|
regions: s["regions"],
|
||||||
|
do_created_at: s["created_at"]
|
||||||
|
},
|
||||||
|
first_seen_at: now,
|
||||||
|
last_seen_at: now
|
||||||
|
}
|
||||||
|
end
|
||||||
|
end
|
||||||
45
lib/arcadia_cloud/sync/volumes_worker.ex
Normal file
45
lib/arcadia_cloud/sync/volumes_worker.ex
Normal file
@@ -0,0 +1,45 @@
|
|||||||
|
defmodule ArcadiaCloud.Sync.VolumesWorker do
|
||||||
|
@moduledoc "Full sync of DO block-storage volumes."
|
||||||
|
use Oban.Worker, queue: :cloud_sync_full, max_attempts: 3
|
||||||
|
|
||||||
|
alias ArcadiaCloud.Cloud
|
||||||
|
alias ArcadiaCloud.DigitalOcean.Client
|
||||||
|
|
||||||
|
@kind "volume"
|
||||||
|
@provider "digitalocean"
|
||||||
|
|
||||||
|
@impl Oban.Worker
|
||||||
|
def perform(_job) do
|
||||||
|
now = DateTime.utc_now() |> DateTime.truncate(:second)
|
||||||
|
|
||||||
|
with {:ok, volumes} <- Client.list_volumes() do
|
||||||
|
Enum.each(volumes, fn v ->
|
||||||
|
Cloud.upsert_resource(normalize(v, now))
|
||||||
|
end)
|
||||||
|
|
||||||
|
Cloud.mark_stale(@kind, now)
|
||||||
|
:ok
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp normalize(v, now) do
|
||||||
|
%{
|
||||||
|
provider: @provider,
|
||||||
|
provider_id: v["id"],
|
||||||
|
kind: @kind,
|
||||||
|
name: v["name"],
|
||||||
|
region: get_in(v, ["region", "slug"]),
|
||||||
|
status: if(v["status"], do: v["status"], else: "active"),
|
||||||
|
tags: v["tags"] || [],
|
||||||
|
attrs: %{
|
||||||
|
size_gigabytes: v["size_gigabytes"],
|
||||||
|
filesystem_type: v["filesystem_type"],
|
||||||
|
droplet_ids: v["droplet_ids"],
|
||||||
|
description: v["description"],
|
||||||
|
do_created_at: v["created_at"]
|
||||||
|
},
|
||||||
|
first_seen_at: now,
|
||||||
|
last_seen_at: now
|
||||||
|
}
|
||||||
|
end
|
||||||
|
end
|
||||||
Reference in New Issue
Block a user