Phase 1 first chunk: inventory schema + DO droplet sync
Models: - cloud_projects: arcadia-cloud's mirror of DO Projects, indexed by (provider, provider_id); tenant_id + purpose classify each project. - cloud_resources: single unified resource table; kind-specific bits in attrs JSONB; first_seen_at / last_seen_at / stale_strike_count drive three-strike deletion. - cloud_resource_events: append-only audit (discovered, updated, deleted, drift_detected, tagged, restored). ArcadiaCloud.Cloud context owns the single upsert chokepoint that: - inserts new with `discovered` event - updates existing only when meaningful fields change - restores tombstoned rows seen again - bumps last_seen_at and resets strike count mark_stale/3 implements the three-strike rule. ArcadiaCloud.DigitalOcean.Client is a Req wrapper with auto-pagination. Per-purpose token resolution via .Tokens (phase 1: env DO_API_TOKEN; phase 2: vault). Per project_arcadia_cloud memory the long-term shape is one PAT per queue purpose for rate-limit isolation. ArcadiaCloud.Sync.Bootstrap ensures the skyai-internal DO Project exists on first sync, idempotent thereafter. ArcadiaCloud.Sync.DropletsWorker runs full droplet sync on the cloud_sync_full Oban queue. InventoryController wired to real data: platform_admin sees all, tenants see only their scope. Live smoke test against real DO: 5 droplets synced; skyai-internal project auto-created; events written; endpoint returns scoped results. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
184
lib/arcadia_cloud/cloud.ex
Normal file
184
lib/arcadia_cloud/cloud.ex
Normal file
@@ -0,0 +1,184 @@
|
||||
defmodule ArcadiaCloud.Cloud do
|
||||
@moduledoc """
|
||||
Context for cloud resource inventory.
|
||||
|
||||
Single upsert chokepoint (`upsert_resource/2`) handles:
|
||||
- insert new (emit `discovered` event)
|
||||
- update existing (diff meaningful fields, emit `updated` event only when changed)
|
||||
- bump last_seen_at + reset stale_strike_count
|
||||
- restore previously-deleted resources
|
||||
|
||||
Stale handling is in `mark_stale/3` — three-strike deletion across syncs.
|
||||
"""
|
||||
|
||||
import Ecto.Query, warn: false
|
||||
|
||||
alias ArcadiaCloud.Repo
|
||||
alias ArcadiaCloud.Cloud.{CloudProject, CloudResource, CloudResourceEvent}
|
||||
|
||||
@meaningful_fields ~w(status name region size_slug tags attrs cloud_project_id
|
||||
tenant_id deployment_id)a
|
||||
|
||||
# ---- projects -------------------------------------------------------------
|
||||
|
||||
def get_project_by_provider(provider, provider_id) do
|
||||
Repo.get_by(CloudProject, provider: provider, provider_id: provider_id)
|
||||
end
|
||||
|
||||
def ensure_project(attrs) when is_map(attrs) do
|
||||
provider = attrs[:provider] || attrs["provider"]
|
||||
provider_id = attrs[:provider_id] || attrs["provider_id"]
|
||||
|
||||
case get_project_by_provider(provider, provider_id) do
|
||||
nil ->
|
||||
%CloudProject{}
|
||||
|> CloudProject.changeset(attrs)
|
||||
|> Repo.insert()
|
||||
|
||||
project ->
|
||||
project
|
||||
|> CloudProject.changeset(attrs)
|
||||
|> Repo.update()
|
||||
end
|
||||
end
|
||||
|
||||
def list_projects, do: Repo.all(CloudProject)
|
||||
|
||||
def skyai_internal_project do
|
||||
Repo.get_by(CloudProject, purpose: "skyai-infra", name: "skyai-internal")
|
||||
end
|
||||
|
||||
# ---- resources ------------------------------------------------------------
|
||||
|
||||
def upsert_resource(attrs, opts \\ []) when is_map(attrs) do
|
||||
source = Keyword.get(opts, :source, "sync")
|
||||
provider = attrs[:provider] || attrs["provider"]
|
||||
provider_id = attrs[:provider_id] || attrs["provider_id"]
|
||||
now = DateTime.utc_now() |> DateTime.truncate(:second)
|
||||
|
||||
attrs =
|
||||
attrs
|
||||
|> Map.put_new(:first_seen_at, now)
|
||||
|> Map.put(:last_seen_at, now)
|
||||
|> Map.put(:stale_strike_count, 0)
|
||||
|
||||
Repo.transaction(fn ->
|
||||
case Repo.get_by(CloudResource, provider: provider, provider_id: provider_id) do
|
||||
nil ->
|
||||
{:ok, resource} =
|
||||
%CloudResource{}
|
||||
|> CloudResource.changeset(attrs)
|
||||
|> Repo.insert()
|
||||
|
||||
write_event(resource, "discovered", nil, snapshot(resource), source)
|
||||
resource
|
||||
|
||||
existing ->
|
||||
before_snap = snapshot(existing)
|
||||
|
||||
{:ok, updated} =
|
||||
existing
|
||||
|> CloudResource.changeset(Map.delete(attrs, :first_seen_at))
|
||||
|> Repo.update()
|
||||
|
||||
cond do
|
||||
existing.deleted_at != nil ->
|
||||
# Was tombstoned; now seen again. Clear deleted_at and emit restore.
|
||||
{:ok, restored} =
|
||||
updated
|
||||
|> Ecto.Changeset.change(deleted_at: nil)
|
||||
|> Repo.update()
|
||||
|
||||
write_event(restored, "restored", before_snap, snapshot(restored), source)
|
||||
restored
|
||||
|
||||
meaningful_change?(before_snap, snapshot(updated)) ->
|
||||
write_event(updated, "updated", before_snap, snapshot(updated), source)
|
||||
updated
|
||||
|
||||
true ->
|
||||
updated
|
||||
end
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
@doc """
|
||||
Three-strike deletion: any resource of `kind` whose `last_seen_at` is older
|
||||
than `sync_started_at` gets its strike count bumped. At 3 strikes, mark
|
||||
deleted. Returns count of strikes + deletions.
|
||||
"""
|
||||
def mark_stale(kind, sync_started_at, opts \\ []) do
|
||||
threshold = Keyword.get(opts, :threshold, 3)
|
||||
|
||||
stale_query =
|
||||
from r in CloudResource,
|
||||
where: r.kind == ^kind,
|
||||
where: r.last_seen_at < ^sync_started_at,
|
||||
where: is_nil(r.deleted_at)
|
||||
|
||||
# bump strikes
|
||||
{struck, _} =
|
||||
Repo.update_all(stale_query, inc: [stale_strike_count: 1])
|
||||
|
||||
# mark deleted those at/past threshold
|
||||
now = DateTime.utc_now() |> DateTime.truncate(:second)
|
||||
|
||||
delete_query =
|
||||
from r in CloudResource,
|
||||
where: r.kind == ^kind,
|
||||
where: r.stale_strike_count >= ^threshold,
|
||||
where: is_nil(r.deleted_at)
|
||||
|
||||
deleted = Repo.all(delete_query)
|
||||
|
||||
Enum.each(deleted, fn r ->
|
||||
r
|
||||
|> Ecto.Changeset.change(deleted_at: now)
|
||||
|> Repo.update!()
|
||||
|
||||
write_event(r, "deleted", snapshot(r), nil, "sync_stale")
|
||||
end)
|
||||
|
||||
%{struck: struck, deleted: length(deleted)}
|
||||
end
|
||||
|
||||
def list_resources(opts \\ []) do
|
||||
base = from r in CloudResource, where: is_nil(r.deleted_at), order_by: [desc: r.updated_at]
|
||||
|
||||
base
|
||||
|> filter_by(:kind, opts[:kind])
|
||||
|> filter_by(:tenant_id, opts[:tenant_id])
|
||||
|> filter_by(:deployment_id, opts[:deployment_id])
|
||||
|> Repo.all()
|
||||
end
|
||||
|
||||
defp filter_by(query, _field, nil), do: query
|
||||
|
||||
defp filter_by(query, field, value) do
|
||||
from(r in query, where: field(r, ^field) == ^value)
|
||||
end
|
||||
|
||||
# ---- internals ------------------------------------------------------------
|
||||
|
||||
defp snapshot(%CloudResource{} = r) do
|
||||
Map.take(r, @meaningful_fields ++ [:provider_id, :name, :id])
|
||||
end
|
||||
|
||||
defp meaningful_change?(a, b) do
|
||||
Map.take(a, @meaningful_fields) != Map.take(b, @meaningful_fields)
|
||||
end
|
||||
|
||||
defp write_event(%CloudResource{id: id}, event, before, after_, source) do
|
||||
%CloudResourceEvent{}
|
||||
|> CloudResourceEvent.changeset(%{
|
||||
resource_id: id,
|
||||
event: event,
|
||||
before: before,
|
||||
after: after_,
|
||||
source: source,
|
||||
occurred_at: DateTime.utc_now() |> DateTime.truncate(:second)
|
||||
})
|
||||
|> Repo.insert!()
|
||||
end
|
||||
end
|
||||
Reference in New Issue
Block a user