Phase 1 continued: ProjectsWorker, DomainsWorker, Oban Cron schedule
ProjectsWorker mirrors DO Projects to cloud_projects table in a two-pass sweep: upsert projects, then walk each project's resource membership (list_project_resources) and update cloud_resources.cloud_project_id + tenant_id. DO URN kinds get normalized via normalize_kind/1 (domain → dns_zone, space → spaces_bucket) so attribution matches local naming. DomainsWorker syncs DNS zones (DO Domains). Same upsert chokepoint, same three-strike stale handling. Zones are global to the account; attribution happens via ProjectsWorker if a domain is in a DO project, else stays NULL pending operator classification. Oban.Plugins.Cron added with 15-minute schedules for ProjectsWorker, DropletsWorker, DomainsWorker — workers run automatically once a token is configured. Phase 0/1 cadence; phase 2 moves droplets to cloud_sync_fast (1-min) for real-time status visibility. DigitalOcean.Client gains list_domains / list_volumes / list_floating_ips. Volumes and floating IPs not yet wired to workers; trivial follow-on. Live smoke test: 5 droplets + 7 DNS zones discovered, all attributed to their existing DO projects via membership lookup (skyai-internal becomes the fallback only for genuinely orphan resources). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -48,6 +48,15 @@ config :arcadia_cloud, Oban,
|
|||||||
metering: 2,
|
metering: 2,
|
||||||
default: 5
|
default: 5
|
||||||
],
|
],
|
||||||
|
plugins: [
|
||||||
|
{Oban.Plugins.Cron,
|
||||||
|
crontab: [
|
||||||
|
# ProjectsWorker first so attribution is fresh before resource syncs
|
||||||
|
{"*/15 * * * *", ArcadiaCloud.Sync.ProjectsWorker},
|
||||||
|
{"*/15 * * * *", ArcadiaCloud.Sync.DropletsWorker},
|
||||||
|
{"*/15 * * * *", ArcadiaCloud.Sync.DomainsWorker}
|
||||||
|
]}
|
||||||
|
],
|
||||||
repo: ArcadiaCloud.Repo
|
repo: ArcadiaCloud.Repo
|
||||||
|
|
||||||
# Import environment specific config. This must remain at the bottom
|
# Import environment specific config. This must remain at the bottom
|
||||||
|
|||||||
@@ -17,6 +17,9 @@ defmodule ArcadiaCloud.DigitalOcean.Client do
|
|||||||
|
|
||||||
def list_droplets(opts \\ []), do: list_paginated("/droplets", "droplets", opts)
|
def list_droplets(opts \\ []), do: list_paginated("/droplets", "droplets", opts)
|
||||||
def list_projects(opts \\ []), do: list_paginated("/projects", "projects", opts)
|
def list_projects(opts \\ []), do: list_paginated("/projects", "projects", opts)
|
||||||
|
def list_domains(opts \\ []), do: list_paginated("/domains", "domains", opts)
|
||||||
|
def list_volumes(opts \\ []), do: list_paginated("/volumes", "volumes", opts)
|
||||||
|
def list_floating_ips(opts \\ []), do: list_paginated("/floating_ips", "floating_ips", opts)
|
||||||
|
|
||||||
def create_project(name, purpose, description \\ "", opts \\ []) do
|
def create_project(name, purpose, description \\ "", opts \\ []) do
|
||||||
body = %{
|
body = %{
|
||||||
|
|||||||
53
lib/arcadia_cloud/sync/domains_worker.ex
Normal file
53
lib/arcadia_cloud/sync/domains_worker.ex
Normal file
@@ -0,0 +1,53 @@
|
|||||||
|
defmodule ArcadiaCloud.Sync.DomainsWorker do
|
||||||
|
@moduledoc """
|
||||||
|
Full sync of DigitalOcean Domains (DNS zones).
|
||||||
|
|
||||||
|
Domains are global to the DO account (no region, no project membership
|
||||||
|
via URN in the same way droplets have). They are typically attributed
|
||||||
|
manually by operator action; first sync leaves them with cloud_project_id
|
||||||
|
nil + tenant_id nil. ProjectsWorker's attribution pass picks them up if
|
||||||
|
they appear in project memberships.
|
||||||
|
"""
|
||||||
|
|
||||||
|
use Oban.Worker, queue: :cloud_sync_full, max_attempts: 3
|
||||||
|
|
||||||
|
alias ArcadiaCloud.Cloud
|
||||||
|
alias ArcadiaCloud.DigitalOcean.Client
|
||||||
|
|
||||||
|
@kind "dns_zone"
|
||||||
|
@provider "digitalocean"
|
||||||
|
|
||||||
|
@impl Oban.Worker
|
||||||
|
def perform(_job) do
|
||||||
|
sync_started_at = DateTime.utc_now() |> DateTime.truncate(:second)
|
||||||
|
|
||||||
|
with {:ok, domains} <- Client.list_domains() do
|
||||||
|
Enum.each(domains, fn d ->
|
||||||
|
Cloud.upsert_resource(normalize(d, sync_started_at))
|
||||||
|
end)
|
||||||
|
|
||||||
|
Cloud.mark_stale(@kind, sync_started_at)
|
||||||
|
:ok
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp normalize(d, now) do
|
||||||
|
name = d["name"]
|
||||||
|
|
||||||
|
%{
|
||||||
|
provider: @provider,
|
||||||
|
provider_id: name,
|
||||||
|
kind: @kind,
|
||||||
|
name: name,
|
||||||
|
region: nil,
|
||||||
|
status: "active",
|
||||||
|
tags: [],
|
||||||
|
attrs: %{
|
||||||
|
ttl: d["ttl"],
|
||||||
|
zone_file: d["zone_file"]
|
||||||
|
},
|
||||||
|
first_seen_at: now,
|
||||||
|
last_seen_at: now
|
||||||
|
}
|
||||||
|
end
|
||||||
|
end
|
||||||
105
lib/arcadia_cloud/sync/projects_worker.ex
Normal file
105
lib/arcadia_cloud/sync/projects_worker.ex
Normal file
@@ -0,0 +1,105 @@
|
|||||||
|
defmodule ArcadiaCloud.Sync.ProjectsWorker do
|
||||||
|
@moduledoc """
|
||||||
|
Sync of DigitalOcean Projects → cloud_projects table.
|
||||||
|
|
||||||
|
Two-pass:
|
||||||
|
1. Upsert every DO project locally (purpose derives from name pattern).
|
||||||
|
2. For each known project, fetch its resource memberships and update
|
||||||
|
cloud_resources.cloud_project_id + tenant_id accordingly.
|
||||||
|
|
||||||
|
Tenant attribution: a DO project named `tenant-<uuid>` maps to that
|
||||||
|
tenant. `skyai-internal` is the platform tenant (tenant_id = nil).
|
||||||
|
Everything else has tenant_id = nil and is operator-classified later.
|
||||||
|
"""
|
||||||
|
|
||||||
|
use Oban.Worker, queue: :cloud_sync_full, max_attempts: 3
|
||||||
|
|
||||||
|
import Ecto.Query
|
||||||
|
|
||||||
|
alias ArcadiaCloud.Cloud
|
||||||
|
alias ArcadiaCloud.Cloud.CloudResource
|
||||||
|
alias ArcadiaCloud.DigitalOcean.Client
|
||||||
|
alias ArcadiaCloud.Repo
|
||||||
|
|
||||||
|
@impl Oban.Worker
|
||||||
|
def perform(_job) do
|
||||||
|
with {:ok, do_projects} <- Client.list_projects() do
|
||||||
|
Enum.each(do_projects, &sync_project/1)
|
||||||
|
attribute_resources(do_projects)
|
||||||
|
:ok
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp sync_project(do_project) do
|
||||||
|
Cloud.ensure_project(%{
|
||||||
|
provider: "digitalocean",
|
||||||
|
provider_id: do_project["id"],
|
||||||
|
name: do_project["name"],
|
||||||
|
purpose: derive_purpose(do_project["name"]),
|
||||||
|
metadata: %{
|
||||||
|
do_purpose: do_project["purpose"],
|
||||||
|
environment: do_project["environment"],
|
||||||
|
description: do_project["description"],
|
||||||
|
is_default: do_project["is_default"]
|
||||||
|
}
|
||||||
|
})
|
||||||
|
end
|
||||||
|
|
||||||
|
defp derive_purpose("skyai-internal"), do: "skyai-infra"
|
||||||
|
defp derive_purpose("tenant-" <> _rest), do: "tenant-workload"
|
||||||
|
defp derive_purpose(_), do: "shared-services"
|
||||||
|
|
||||||
|
# ---- attribution ----------------------------------------------------------
|
||||||
|
|
||||||
|
defp attribute_resources(do_projects) do
|
||||||
|
Enum.each(do_projects, fn do_project ->
|
||||||
|
local = Cloud.get_project_by_provider("digitalocean", do_project["id"])
|
||||||
|
|
||||||
|
if local do
|
||||||
|
case Client.list_project_resources(do_project["id"]) do
|
||||||
|
{:ok, resources} -> attribute_urns(resources, local)
|
||||||
|
_ -> :noop
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp attribute_urns(urns, %{id: project_id} = local) do
|
||||||
|
tenant_id = tenant_id_for(local)
|
||||||
|
|
||||||
|
Enum.each(urns, fn %{"urn" => urn} ->
|
||||||
|
case parse_urn(urn) do
|
||||||
|
{kind, provider_id} -> update_resource_attribution(kind, provider_id, project_id, tenant_id)
|
||||||
|
_ -> :skip
|
||||||
|
end
|
||||||
|
end)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp update_resource_attribution(kind, provider_id, project_id, tenant_id) do
|
||||||
|
from(r in CloudResource,
|
||||||
|
where:
|
||||||
|
r.provider == "digitalocean" and r.kind == ^kind and r.provider_id == ^provider_id and
|
||||||
|
is_nil(r.deleted_at)
|
||||||
|
)
|
||||||
|
|> Repo.update_all(set: [cloud_project_id: project_id, tenant_id: tenant_id])
|
||||||
|
end
|
||||||
|
|
||||||
|
# "do:droplet:567897199" → {"droplet", "567897199"}
|
||||||
|
defp parse_urn("do:" <> rest) do
|
||||||
|
case String.split(rest, ":", parts: 2) do
|
||||||
|
[kind, id] -> {normalize_kind(kind), id}
|
||||||
|
_ -> nil
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp parse_urn(_), do: nil
|
||||||
|
|
||||||
|
# DO URN uses singular common nouns; our cloud_resources.kind names some
|
||||||
|
# things more explicitly to disambiguate (e.g. dns_zone vs dns_record).
|
||||||
|
defp normalize_kind("domain"), do: "dns_zone"
|
||||||
|
defp normalize_kind("space"), do: "spaces_bucket"
|
||||||
|
defp normalize_kind(other), do: other
|
||||||
|
|
||||||
|
defp tenant_id_for(%{name: "tenant-" <> tenant_uuid}), do: tenant_uuid
|
||||||
|
defp tenant_id_for(_), do: nil
|
||||||
|
end
|
||||||
Reference in New Issue
Block a user