From 445b7b60d4cea0517bc05bf24b3bc88697627e62 Mon Sep 17 00:00:00 2001 From: Giuliano Silvestro Date: Wed, 20 May 2026 11:08:27 +1000 Subject: [PATCH] Phase 2: drift detection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Compares cloud_provisioned.spec (what we asked DO for) against the live cloud_resources row (what DO actually has). Any divergence becomes an operator-resolvable drift record. cloud_drift table: one row per drifted field. status open/accepted/ reverted/stale. Partial unique index keeps at most one OPEN drift per (resource, field); resolved rows are retained as history. ArcadiaCloud.Drift context: - detect_all/0 — sweeps every provisioned resource. Per field in spec, resolves the actual value (top-level schema field first, then attrs), compares with loose equality (stringified scalars; lists as sets so JSON round-trips don't false-positive). Mismatches upsert a cloud_drift row + emit a drift_detected event in the resource event log. - close_stale_drift — an open drift whose field no longer mismatches (fixed elsewhere) closes as "stale" on the next sweep. - accept_drift/2 — the live value becomes the new desired-state: parent cloud_provisioned.spec is updated, spec_version bumped, drift closed "accepted". Revert (mutating live infra back to spec) is intentionally NOT here — it needs a saga and lands with the droplet-resize work. DriftDetectionWorker — Oban cron at :20 past the hour, offset past the :15 resource syncs so it compares against fresh inventory. Provisioning.record_provisioned/3 — populates cloud_provisioned desired- state (upsert on resource_id, bumps spec_version). Future provisioning sagas call this; for now it's how drift gets something to detect. API (platform_admin only): - GET /api/v1/drift — open drift inbox - POST /api/v1/drift/:id/accept — adopt live value as desired-state Smoke verified: recorded a droplet's desired spec with a deliberately wrong size_slug + a correct region; detect_all flagged only size_slug, wrote the drift_detected event; accept updated the spec to the live value and closed the drift; re-detect found zero drift. Co-Authored-By: Claude Opus 4.7 (1M context) --- config/config.exs | 2 + lib/arcadia_cloud/drift.ex | 235 ++++++++++++++++++ lib/arcadia_cloud/provisioning.ex | 35 +++ lib/arcadia_cloud/provisioning/cloud_drift.ex | 34 +++ .../sync/drift_detection_worker.ex | 25 ++ .../controllers/drift_controller.ex | 67 +++++ lib/arcadia_cloud_web/router.ex | 3 + .../20260520130000_create_drift.exs | 35 +++ 8 files changed, 436 insertions(+) create mode 100644 lib/arcadia_cloud/drift.ex create mode 100644 lib/arcadia_cloud/provisioning/cloud_drift.ex create mode 100644 lib/arcadia_cloud/sync/drift_detection_worker.ex create mode 100644 lib/arcadia_cloud_web/controllers/drift_controller.ex create mode 100644 priv/repo/migrations/20260520130000_create_drift.exs diff --git a/config/config.exs b/config/config.exs index 8a2a671..3b9bec8 100644 --- a/config/config.exs +++ b/config/config.exs @@ -63,6 +63,8 @@ config :arcadia_cloud, Oban, {"33 * * * *", ArcadiaCloud.Sync.SnapshotsWorker}, # Backups also slow-moving; hourly per-droplet walk {"41 * * * *", ArcadiaCloud.Sync.BackupsWorker}, + # Drift sweep — offset past the :15 resource syncs so it sees fresh data + {"20 * * * *", ArcadiaCloud.Sync.DriftDetectionWorker}, # Billing: hourly balance, daily invoice discovery {"7 * * * *", ArcadiaCloud.Sync.BalanceWorker}, {"23 2 * * *", ArcadiaCloud.Sync.BillingHistoryWorker} diff --git a/lib/arcadia_cloud/drift.ex b/lib/arcadia_cloud/drift.ex new file mode 100644 index 0000000..38d3faa --- /dev/null +++ b/lib/arcadia_cloud/drift.ex @@ -0,0 +1,235 @@ +defmodule ArcadiaCloud.Drift do + @moduledoc """ + Drift detection — compares `cloud_provisioned.spec` (what we asked DO + for) against the live `cloud_resources` row (what DO actually has). + + A spec is a flat map of field => expected value. Each key is resolved + against the resource: first as a top-level schema field, then as a key + in `attrs`. Any mismatch becomes a `cloud_drift` row (status "open") + plus a `drift_detected` event in the resource event log. + + Resolution: + - accept_drift/2 — desired-state wins-the-other-way: spec is updated + to match the live value; the drift row closes as "accepted". + - revert is intentionally NOT auto-applied here — reverting means + mutating live infra, which must go through a saga. Phase 2 surfaces + revert as a flagged action; the saga wiring lands with the + droplet-resize saga. + + Stale handling: an open drift whose field no longer mismatches (someone + fixed it, or accepted elsewhere) is closed as "stale" on the next sweep. + """ + + import Ecto.Query, warn: false + + alias ArcadiaCloud.Repo + alias ArcadiaCloud.Cloud.{CloudResource, CloudResourceEvent} + alias ArcadiaCloud.Provisioning.{CloudDrift, CloudProvisioned} + + # ---- detection ------------------------------------------------------------ + + @doc """ + Sweep every provisioned resource, (re)compute drift. Returns a summary + map: %{checked, drifted_fields, new_drift, resolved_stale}. + """ + def detect_all do + provisioned = + from(p in CloudProvisioned, preload: [:resource]) + |> Repo.all() + + Enum.reduce(provisioned, %{checked: 0, drifted_fields: 0, new_drift: 0, resolved_stale: 0}, + fn prov, acc -> + result = detect_one(prov) + + %{ + checked: acc.checked + 1, + drifted_fields: acc.drifted_fields + result.drifted_fields, + new_drift: acc.new_drift + result.new_drift, + resolved_stale: acc.resolved_stale + result.resolved_stale + } + end) + end + + @doc """ + Detect drift for one provisioned resource. Returns + %{drifted_fields, new_drift, resolved_stale}. + """ + def detect_one(%CloudProvisioned{resource: %CloudResource{} = resource} = prov) do + mismatches = compute_mismatches(prov.spec, resource) + mismatched_fields = MapSet.new(mismatches, & &1.field) + + new_drift = + Enum.reduce(mismatches, 0, fn m, count -> + case record_drift(prov, resource, m) do + :inserted -> count + 1 + :exists -> count + end + end) + + # Close any open drift rows whose field is no longer mismatching. + resolved_stale = close_stale_drift(resource.id, mismatched_fields) + + %{drifted_fields: length(mismatches), new_drift: new_drift, resolved_stale: resolved_stale} + end + + def detect_one(%CloudProvisioned{} = prov) do + detect_one(Repo.preload(prov, :resource)) + end + + # ---- comparison ----------------------------------------------------------- + + defp compute_mismatches(spec, %CloudResource{} = resource) when is_map(spec) do + spec + |> Enum.flat_map(fn {field, expected} -> + actual = resource_value(resource, field) + + if values_equal?(expected, actual) do + [] + else + [%{field: to_string(field), expected: expected, actual: actual}] + end + end) + end + + defp compute_mismatches(_spec, _resource), do: [] + + # Resolve a spec field against the resource: top-level schema field + # first, then attrs. + @schema_fields ~w(name region status size_slug tags) + defp resource_value(%CloudResource{} = resource, field) do + fstr = to_string(field) + + if fstr in @schema_fields do + Map.get(resource, String.to_existing_atom(fstr)) + else + get_in(resource.attrs || %{}, [fstr]) + end + end + + # Loose equality — JSON round-trips turn atoms/keywords into strings, + # so compare stringified forms for scalars; lists compared as sets. + defp values_equal?(a, b) when is_list(a) and is_list(b) do + MapSet.new(a) == MapSet.new(b) + end + + defp values_equal?(a, b), do: to_string_safe(a) == to_string_safe(b) + + defp to_string_safe(nil), do: "" + defp to_string_safe(v) when is_binary(v), do: v + defp to_string_safe(v), do: inspect(v) + + # ---- drift rows ----------------------------------------------------------- + + defp record_drift(%CloudProvisioned{} = prov, %CloudResource{} = resource, mismatch) do + existing = + Repo.one( + from(d in CloudDrift, + where: + d.resource_id == ^resource.id and d.field == ^mismatch.field and + d.status == "open" + ) + ) + + if existing do + :exists + else + now = DateTime.utc_now() |> DateTime.truncate(:second) + + {:ok, _drift} = + %CloudDrift{} + |> CloudDrift.changeset(%{ + resource_id: resource.id, + provisioned_id: prov.id, + field: mismatch.field, + expected: wrap(mismatch.expected), + actual: wrap(mismatch.actual), + status: "open", + detected_at: now + }) + |> Repo.insert() + + write_drift_event(resource, mismatch) + :inserted + end + end + + defp close_stale_drift(resource_id, still_drifting_fields) do + open = + Repo.all( + from(d in CloudDrift, where: d.resource_id == ^resource_id and d.status == "open") + ) + + now = DateTime.utc_now() |> DateTime.truncate(:second) + + Enum.reduce(open, 0, fn drift, count -> + if MapSet.member?(still_drifting_fields, drift.field) do + count + else + drift + |> CloudDrift.changeset(%{status: "stale", resolved_at: now}) + |> Repo.update!() + + count + 1 + end + end) + end + + defp write_drift_event(%CloudResource{id: id}, mismatch) do + %CloudResourceEvent{} + |> CloudResourceEvent.changeset(%{ + resource_id: id, + event: "drift_detected", + before: %{"field" => mismatch.field, "expected" => wrap(mismatch.expected)}, + after: %{"field" => mismatch.field, "actual" => wrap(mismatch.actual)}, + source: "drift_detection", + occurred_at: DateTime.utc_now() |> DateTime.truncate(:second) + }) + |> Repo.insert!() + end + + # cloud_drift.expected/actual are :map columns; wrap scalars in a map. + defp wrap(v) when is_map(v), do: v + defp wrap(v), do: %{"value" => v} + + defp unwrap(%{"value" => v}), do: v + defp unwrap(other), do: other + + # ---- queries + resolution ------------------------------------------------- + + def list_open_drift do + from(d in CloudDrift, + where: d.status == "open", + order_by: [desc: d.detected_at], + preload: [:resource] + ) + |> Repo.all() + end + + def get_drift(id), do: Repo.get(CloudDrift, id) + + @doc """ + Accept a drift: the live value becomes the new desired-state. Updates + the parent cloud_provisioned.spec, bumps spec_version, closes the drift + as "accepted". + """ + def accept_drift(%CloudDrift{} = drift, opts \\ []) do + actor = Keyword.get(opts, :actor, "operator") + now = DateTime.utc_now() |> DateTime.truncate(:second) + + prov = Repo.get!(CloudProvisioned, drift.provisioned_id) + new_spec = Map.put(prov.spec || %{}, drift.field, unwrap(drift.actual)) + + Repo.transaction(fn -> + prov + |> CloudProvisioned.changeset(%{ + spec: new_spec, + spec_version: (prov.spec_version || 1) + 1 + }) + |> Repo.update!() + + drift + |> CloudDrift.changeset(%{status: "accepted", resolved_at: now, resolved_by: actor}) + |> Repo.update!() + end) + end +end diff --git a/lib/arcadia_cloud/provisioning.ex b/lib/arcadia_cloud/provisioning.ex index 7455990..8c53ca0 100644 --- a/lib/arcadia_cloud/provisioning.ex +++ b/lib/arcadia_cloud/provisioning.ex @@ -56,6 +56,41 @@ defmodule ArcadiaCloud.Provisioning do |> Repo.insert() end + alias ArcadiaCloud.Provisioning.CloudProvisioned + + @doc """ + Records desired-state for a resource we provisioned. `spec` is a flat + map of field => expected value that drift detection later compares + against the live resource. Upserts on resource_id. + """ + def record_provisioned(resource_id, spec, opts \\ []) do + now = DateTime.utc_now() |> DateTime.truncate(:second) + + attrs = %{ + resource_id: resource_id, + spec: spec, + provisioned_at: now, + provisioned_by: opts[:provisioned_by] || "system", + saga_id: opts[:saga_id] + } + + case Repo.get_by(CloudProvisioned, resource_id: resource_id) do + nil -> + %CloudProvisioned{} + |> CloudProvisioned.changeset(attrs) + |> Repo.insert() + + existing -> + existing + |> CloudProvisioned.changeset(Map.put(attrs, :spec_version, existing.spec_version + 1)) + |> Repo.update() + end + end + + def get_provisioned(resource_id) do + Repo.get_by(CloudProvisioned, resource_id: resource_id) + end + @doc """ Starts a snapshot saga for a droplet. `droplet_provider_id` is the DO numeric droplet id (string). Optional `:snapshot_label` and diff --git a/lib/arcadia_cloud/provisioning/cloud_drift.ex b/lib/arcadia_cloud/provisioning/cloud_drift.ex new file mode 100644 index 0000000..5b90cba --- /dev/null +++ b/lib/arcadia_cloud/provisioning/cloud_drift.ex @@ -0,0 +1,34 @@ +defmodule ArcadiaCloud.Provisioning.CloudDrift do + use Ecto.Schema + import Ecto.Changeset + + @primary_key {:id, :binary_id, autogenerate: true} + @foreign_key_type :binary_id + + @statuses ~w(open accepted reverted stale) + + schema "cloud_drift" do + field :field, :string + field :expected, :map + field :actual, :map + field :status, :string, default: "open" + field :detected_at, :utc_datetime + field :resolved_at, :utc_datetime + field :resolved_by, :string + + belongs_to :resource, ArcadiaCloud.Cloud.CloudResource + belongs_to :provisioned, ArcadiaCloud.Provisioning.CloudProvisioned + + timestamps(type: :utc_datetime) + end + + @required ~w(resource_id provisioned_id field detected_at)a + @optional ~w(expected actual status resolved_at resolved_by)a + + def changeset(drift, attrs) do + drift + |> cast(attrs, @required ++ @optional) + |> validate_required(@required) + |> validate_inclusion(:status, @statuses) + end +end diff --git a/lib/arcadia_cloud/sync/drift_detection_worker.ex b/lib/arcadia_cloud/sync/drift_detection_worker.ex new file mode 100644 index 0000000..248d549 --- /dev/null +++ b/lib/arcadia_cloud/sync/drift_detection_worker.ex @@ -0,0 +1,25 @@ +defmodule ArcadiaCloud.Sync.DriftDetectionWorker do + @moduledoc """ + Periodic drift sweep — compares every provisioned resource's desired + spec against its live cloud_resources row. Runs after the inventory + sync workers so it compares against fresh data. + """ + + use Oban.Worker, queue: :cloud_sync_full, max_attempts: 3 + + require Logger + + alias ArcadiaCloud.Drift + + @impl Oban.Worker + def perform(_job) do + summary = Drift.detect_all() + + Logger.info( + "[drift] checked=#{summary.checked} drifted_fields=#{summary.drifted_fields} " <> + "new=#{summary.new_drift} stale_closed=#{summary.resolved_stale}" + ) + + :ok + end +end diff --git a/lib/arcadia_cloud_web/controllers/drift_controller.ex b/lib/arcadia_cloud_web/controllers/drift_controller.ex new file mode 100644 index 0000000..9e3415c --- /dev/null +++ b/lib/arcadia_cloud_web/controllers/drift_controller.ex @@ -0,0 +1,67 @@ +defmodule ArcadiaCloudWeb.DriftController do + @moduledoc """ + Drift inbox — operator surface for resources whose live state has + diverged from the desired-state we provisioned. platform_admin only. + + accept: live value becomes the new desired-state. + Revert (mutating live infra back to spec) is deferred — it requires a + saga and lands with the droplet-resize work. + """ + + use ArcadiaCloudWeb, :controller + + alias ArcadiaCloud.Drift + + def index(conn, _params) do + with :ok <- require_platform_admin(conn) do + drift = Drift.list_open_drift() |> Enum.map(&shape/1) + json(conn, %{drift: drift, count: length(drift)}) + end + end + + def accept(conn, %{"id" => id}) do + with :ok <- require_platform_admin(conn) do + case Drift.get_drift(id) do + nil -> + conn |> put_status(:not_found) |> json(%{error: "not_found"}) + + %{status: "open"} = drift -> + actor = conn.assigns.current_identity.email || "operator" + {:ok, _} = Drift.accept_drift(drift, actor: actor) + json(conn, %{status: "accepted", drift_id: id}) + + %{status: status} -> + conn + |> put_status(:conflict) + |> json(%{error: "already_resolved", current_status: status}) + end + end + end + + defp require_platform_admin(conn) do + identity = conn.assigns.current_identity + + if is_list(identity.roles) and "platform_admin" in identity.roles do + :ok + else + conn + |> put_status(:forbidden) + |> json(%{error: "platform_admin_required"}) + |> halt() + end + end + + defp shape(d) do + %{ + id: d.id, + resource_id: d.resource_id, + resource_name: d.resource && d.resource.name, + resource_kind: d.resource && d.resource.kind, + field: d.field, + expected: d.expected, + actual: d.actual, + status: d.status, + detected_at: d.detected_at + } + end +end diff --git a/lib/arcadia_cloud_web/router.ex b/lib/arcadia_cloud_web/router.ex index e1aab13..c516c6f 100644 --- a/lib/arcadia_cloud_web/router.ex +++ b/lib/arcadia_cloud_web/router.ex @@ -22,5 +22,8 @@ defmodule ArcadiaCloudWeb.Router do get "/billing/balance", BillingController, :balance get "/billing/cost-lines", BillingController, :cost_lines + + get "/drift", DriftController, :index + post "/drift/:id/accept", DriftController, :accept end end diff --git a/priv/repo/migrations/20260520130000_create_drift.exs b/priv/repo/migrations/20260520130000_create_drift.exs new file mode 100644 index 0000000..6e5990e --- /dev/null +++ b/priv/repo/migrations/20260520130000_create_drift.exs @@ -0,0 +1,35 @@ +defmodule ArcadiaCloud.Repo.Migrations.CreateDrift do + use Ecto.Migration + + def change do + # One row per drifted field of a provisioned resource. The operator + # resolves each: "accept" (desired-state := actual) or "revert" + # (schedule a saga to put actual back to desired). + create table(:cloud_drift, primary_key: false) do + add :id, :binary_id, primary_key: true + add :resource_id, references(:cloud_resources, type: :binary_id, on_delete: :delete_all), + null: false + add :provisioned_id, + references(:cloud_provisioned, type: :binary_id, on_delete: :delete_all), + null: false + add :field, :string, null: false + add :expected, :map + add :actual, :map + add :status, :string, null: false, default: "open" + add :detected_at, :utc_datetime, null: false + add :resolved_at, :utc_datetime + add :resolved_by, :string + + timestamps(type: :utc_datetime) + end + + # At most one OPEN drift per (resource, field); resolved ones are history. + create unique_index(:cloud_drift, [:resource_id, :field], + where: "status = 'open'", + name: :cloud_drift_open_unique + ) + + create index(:cloud_drift, [:status]) + create index(:cloud_drift, [:provisioned_id]) + end +end