Phase 2: drift detection
Compares cloud_provisioned.spec (what we asked DO for) against the live cloud_resources row (what DO actually has). Any divergence becomes an operator-resolvable drift record. cloud_drift table: one row per drifted field. status open/accepted/ reverted/stale. Partial unique index keeps at most one OPEN drift per (resource, field); resolved rows are retained as history. ArcadiaCloud.Drift context: - detect_all/0 — sweeps every provisioned resource. Per field in spec, resolves the actual value (top-level schema field first, then attrs), compares with loose equality (stringified scalars; lists as sets so JSON round-trips don't false-positive). Mismatches upsert a cloud_drift row + emit a drift_detected event in the resource event log. - close_stale_drift — an open drift whose field no longer mismatches (fixed elsewhere) closes as "stale" on the next sweep. - accept_drift/2 — the live value becomes the new desired-state: parent cloud_provisioned.spec is updated, spec_version bumped, drift closed "accepted". Revert (mutating live infra back to spec) is intentionally NOT here — it needs a saga and lands with the droplet-resize work. DriftDetectionWorker — Oban cron at :20 past the hour, offset past the :15 resource syncs so it compares against fresh inventory. Provisioning.record_provisioned/3 — populates cloud_provisioned desired- state (upsert on resource_id, bumps spec_version). Future provisioning sagas call this; for now it's how drift gets something to detect. API (platform_admin only): - GET /api/v1/drift — open drift inbox - POST /api/v1/drift/:id/accept — adopt live value as desired-state Smoke verified: recorded a droplet's desired spec with a deliberately wrong size_slug + a correct region; detect_all flagged only size_slug, wrote the drift_detected event; accept updated the spec to the live value and closed the drift; re-detect found zero drift. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -63,6 +63,8 @@ config :arcadia_cloud, Oban,
|
||||
{"33 * * * *", ArcadiaCloud.Sync.SnapshotsWorker},
|
||||
# Backups also slow-moving; hourly per-droplet walk
|
||||
{"41 * * * *", ArcadiaCloud.Sync.BackupsWorker},
|
||||
# Drift sweep — offset past the :15 resource syncs so it sees fresh data
|
||||
{"20 * * * *", ArcadiaCloud.Sync.DriftDetectionWorker},
|
||||
# Billing: hourly balance, daily invoice discovery
|
||||
{"7 * * * *", ArcadiaCloud.Sync.BalanceWorker},
|
||||
{"23 2 * * *", ArcadiaCloud.Sync.BillingHistoryWorker}
|
||||
|
||||
235
lib/arcadia_cloud/drift.ex
Normal file
235
lib/arcadia_cloud/drift.ex
Normal file
@@ -0,0 +1,235 @@
|
||||
defmodule ArcadiaCloud.Drift do
|
||||
@moduledoc """
|
||||
Drift detection — compares `cloud_provisioned.spec` (what we asked DO
|
||||
for) against the live `cloud_resources` row (what DO actually has).
|
||||
|
||||
A spec is a flat map of field => expected value. Each key is resolved
|
||||
against the resource: first as a top-level schema field, then as a key
|
||||
in `attrs`. Any mismatch becomes a `cloud_drift` row (status "open")
|
||||
plus a `drift_detected` event in the resource event log.
|
||||
|
||||
Resolution:
|
||||
- accept_drift/2 — desired-state wins-the-other-way: spec is updated
|
||||
to match the live value; the drift row closes as "accepted".
|
||||
- revert is intentionally NOT auto-applied here — reverting means
|
||||
mutating live infra, which must go through a saga. Phase 2 surfaces
|
||||
revert as a flagged action; the saga wiring lands with the
|
||||
droplet-resize saga.
|
||||
|
||||
Stale handling: an open drift whose field no longer mismatches (someone
|
||||
fixed it, or accepted elsewhere) is closed as "stale" on the next sweep.
|
||||
"""
|
||||
|
||||
import Ecto.Query, warn: false
|
||||
|
||||
alias ArcadiaCloud.Repo
|
||||
alias ArcadiaCloud.Cloud.{CloudResource, CloudResourceEvent}
|
||||
alias ArcadiaCloud.Provisioning.{CloudDrift, CloudProvisioned}
|
||||
|
||||
# ---- detection ------------------------------------------------------------
|
||||
|
||||
@doc """
|
||||
Sweep every provisioned resource, (re)compute drift. Returns a summary
|
||||
map: %{checked, drifted_fields, new_drift, resolved_stale}.
|
||||
"""
|
||||
def detect_all do
|
||||
provisioned =
|
||||
from(p in CloudProvisioned, preload: [:resource])
|
||||
|> Repo.all()
|
||||
|
||||
Enum.reduce(provisioned, %{checked: 0, drifted_fields: 0, new_drift: 0, resolved_stale: 0},
|
||||
fn prov, acc ->
|
||||
result = detect_one(prov)
|
||||
|
||||
%{
|
||||
checked: acc.checked + 1,
|
||||
drifted_fields: acc.drifted_fields + result.drifted_fields,
|
||||
new_drift: acc.new_drift + result.new_drift,
|
||||
resolved_stale: acc.resolved_stale + result.resolved_stale
|
||||
}
|
||||
end)
|
||||
end
|
||||
|
||||
@doc """
|
||||
Detect drift for one provisioned resource. Returns
|
||||
%{drifted_fields, new_drift, resolved_stale}.
|
||||
"""
|
||||
def detect_one(%CloudProvisioned{resource: %CloudResource{} = resource} = prov) do
|
||||
mismatches = compute_mismatches(prov.spec, resource)
|
||||
mismatched_fields = MapSet.new(mismatches, & &1.field)
|
||||
|
||||
new_drift =
|
||||
Enum.reduce(mismatches, 0, fn m, count ->
|
||||
case record_drift(prov, resource, m) do
|
||||
:inserted -> count + 1
|
||||
:exists -> count
|
||||
end
|
||||
end)
|
||||
|
||||
# Close any open drift rows whose field is no longer mismatching.
|
||||
resolved_stale = close_stale_drift(resource.id, mismatched_fields)
|
||||
|
||||
%{drifted_fields: length(mismatches), new_drift: new_drift, resolved_stale: resolved_stale}
|
||||
end
|
||||
|
||||
def detect_one(%CloudProvisioned{} = prov) do
|
||||
detect_one(Repo.preload(prov, :resource))
|
||||
end
|
||||
|
||||
# ---- comparison -----------------------------------------------------------
|
||||
|
||||
defp compute_mismatches(spec, %CloudResource{} = resource) when is_map(spec) do
|
||||
spec
|
||||
|> Enum.flat_map(fn {field, expected} ->
|
||||
actual = resource_value(resource, field)
|
||||
|
||||
if values_equal?(expected, actual) do
|
||||
[]
|
||||
else
|
||||
[%{field: to_string(field), expected: expected, actual: actual}]
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
defp compute_mismatches(_spec, _resource), do: []
|
||||
|
||||
# Resolve a spec field against the resource: top-level schema field
|
||||
# first, then attrs.
|
||||
@schema_fields ~w(name region status size_slug tags)
|
||||
defp resource_value(%CloudResource{} = resource, field) do
|
||||
fstr = to_string(field)
|
||||
|
||||
if fstr in @schema_fields do
|
||||
Map.get(resource, String.to_existing_atom(fstr))
|
||||
else
|
||||
get_in(resource.attrs || %{}, [fstr])
|
||||
end
|
||||
end
|
||||
|
||||
# Loose equality — JSON round-trips turn atoms/keywords into strings,
|
||||
# so compare stringified forms for scalars; lists compared as sets.
|
||||
defp values_equal?(a, b) when is_list(a) and is_list(b) do
|
||||
MapSet.new(a) == MapSet.new(b)
|
||||
end
|
||||
|
||||
defp values_equal?(a, b), do: to_string_safe(a) == to_string_safe(b)
|
||||
|
||||
defp to_string_safe(nil), do: ""
|
||||
defp to_string_safe(v) when is_binary(v), do: v
|
||||
defp to_string_safe(v), do: inspect(v)
|
||||
|
||||
# ---- drift rows -----------------------------------------------------------
|
||||
|
||||
defp record_drift(%CloudProvisioned{} = prov, %CloudResource{} = resource, mismatch) do
|
||||
existing =
|
||||
Repo.one(
|
||||
from(d in CloudDrift,
|
||||
where:
|
||||
d.resource_id == ^resource.id and d.field == ^mismatch.field and
|
||||
d.status == "open"
|
||||
)
|
||||
)
|
||||
|
||||
if existing do
|
||||
:exists
|
||||
else
|
||||
now = DateTime.utc_now() |> DateTime.truncate(:second)
|
||||
|
||||
{:ok, _drift} =
|
||||
%CloudDrift{}
|
||||
|> CloudDrift.changeset(%{
|
||||
resource_id: resource.id,
|
||||
provisioned_id: prov.id,
|
||||
field: mismatch.field,
|
||||
expected: wrap(mismatch.expected),
|
||||
actual: wrap(mismatch.actual),
|
||||
status: "open",
|
||||
detected_at: now
|
||||
})
|
||||
|> Repo.insert()
|
||||
|
||||
write_drift_event(resource, mismatch)
|
||||
:inserted
|
||||
end
|
||||
end
|
||||
|
||||
defp close_stale_drift(resource_id, still_drifting_fields) do
|
||||
open =
|
||||
Repo.all(
|
||||
from(d in CloudDrift, where: d.resource_id == ^resource_id and d.status == "open")
|
||||
)
|
||||
|
||||
now = DateTime.utc_now() |> DateTime.truncate(:second)
|
||||
|
||||
Enum.reduce(open, 0, fn drift, count ->
|
||||
if MapSet.member?(still_drifting_fields, drift.field) do
|
||||
count
|
||||
else
|
||||
drift
|
||||
|> CloudDrift.changeset(%{status: "stale", resolved_at: now})
|
||||
|> Repo.update!()
|
||||
|
||||
count + 1
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
defp write_drift_event(%CloudResource{id: id}, mismatch) do
|
||||
%CloudResourceEvent{}
|
||||
|> CloudResourceEvent.changeset(%{
|
||||
resource_id: id,
|
||||
event: "drift_detected",
|
||||
before: %{"field" => mismatch.field, "expected" => wrap(mismatch.expected)},
|
||||
after: %{"field" => mismatch.field, "actual" => wrap(mismatch.actual)},
|
||||
source: "drift_detection",
|
||||
occurred_at: DateTime.utc_now() |> DateTime.truncate(:second)
|
||||
})
|
||||
|> Repo.insert!()
|
||||
end
|
||||
|
||||
# cloud_drift.expected/actual are :map columns; wrap scalars in a map.
|
||||
defp wrap(v) when is_map(v), do: v
|
||||
defp wrap(v), do: %{"value" => v}
|
||||
|
||||
defp unwrap(%{"value" => v}), do: v
|
||||
defp unwrap(other), do: other
|
||||
|
||||
# ---- queries + resolution -------------------------------------------------
|
||||
|
||||
def list_open_drift do
|
||||
from(d in CloudDrift,
|
||||
where: d.status == "open",
|
||||
order_by: [desc: d.detected_at],
|
||||
preload: [:resource]
|
||||
)
|
||||
|> Repo.all()
|
||||
end
|
||||
|
||||
def get_drift(id), do: Repo.get(CloudDrift, id)
|
||||
|
||||
@doc """
|
||||
Accept a drift: the live value becomes the new desired-state. Updates
|
||||
the parent cloud_provisioned.spec, bumps spec_version, closes the drift
|
||||
as "accepted".
|
||||
"""
|
||||
def accept_drift(%CloudDrift{} = drift, opts \\ []) do
|
||||
actor = Keyword.get(opts, :actor, "operator")
|
||||
now = DateTime.utc_now() |> DateTime.truncate(:second)
|
||||
|
||||
prov = Repo.get!(CloudProvisioned, drift.provisioned_id)
|
||||
new_spec = Map.put(prov.spec || %{}, drift.field, unwrap(drift.actual))
|
||||
|
||||
Repo.transaction(fn ->
|
||||
prov
|
||||
|> CloudProvisioned.changeset(%{
|
||||
spec: new_spec,
|
||||
spec_version: (prov.spec_version || 1) + 1
|
||||
})
|
||||
|> Repo.update!()
|
||||
|
||||
drift
|
||||
|> CloudDrift.changeset(%{status: "accepted", resolved_at: now, resolved_by: actor})
|
||||
|> Repo.update!()
|
||||
end)
|
||||
end
|
||||
end
|
||||
@@ -56,6 +56,41 @@ defmodule ArcadiaCloud.Provisioning do
|
||||
|> Repo.insert()
|
||||
end
|
||||
|
||||
alias ArcadiaCloud.Provisioning.CloudProvisioned
|
||||
|
||||
@doc """
|
||||
Records desired-state for a resource we provisioned. `spec` is a flat
|
||||
map of field => expected value that drift detection later compares
|
||||
against the live resource. Upserts on resource_id.
|
||||
"""
|
||||
def record_provisioned(resource_id, spec, opts \\ []) do
|
||||
now = DateTime.utc_now() |> DateTime.truncate(:second)
|
||||
|
||||
attrs = %{
|
||||
resource_id: resource_id,
|
||||
spec: spec,
|
||||
provisioned_at: now,
|
||||
provisioned_by: opts[:provisioned_by] || "system",
|
||||
saga_id: opts[:saga_id]
|
||||
}
|
||||
|
||||
case Repo.get_by(CloudProvisioned, resource_id: resource_id) do
|
||||
nil ->
|
||||
%CloudProvisioned{}
|
||||
|> CloudProvisioned.changeset(attrs)
|
||||
|> Repo.insert()
|
||||
|
||||
existing ->
|
||||
existing
|
||||
|> CloudProvisioned.changeset(Map.put(attrs, :spec_version, existing.spec_version + 1))
|
||||
|> Repo.update()
|
||||
end
|
||||
end
|
||||
|
||||
def get_provisioned(resource_id) do
|
||||
Repo.get_by(CloudProvisioned, resource_id: resource_id)
|
||||
end
|
||||
|
||||
@doc """
|
||||
Starts a snapshot saga for a droplet. `droplet_provider_id` is the DO
|
||||
numeric droplet id (string). Optional `:snapshot_label` and
|
||||
|
||||
34
lib/arcadia_cloud/provisioning/cloud_drift.ex
Normal file
34
lib/arcadia_cloud/provisioning/cloud_drift.ex
Normal file
@@ -0,0 +1,34 @@
|
||||
defmodule ArcadiaCloud.Provisioning.CloudDrift do
|
||||
use Ecto.Schema
|
||||
import Ecto.Changeset
|
||||
|
||||
@primary_key {:id, :binary_id, autogenerate: true}
|
||||
@foreign_key_type :binary_id
|
||||
|
||||
@statuses ~w(open accepted reverted stale)
|
||||
|
||||
schema "cloud_drift" do
|
||||
field :field, :string
|
||||
field :expected, :map
|
||||
field :actual, :map
|
||||
field :status, :string, default: "open"
|
||||
field :detected_at, :utc_datetime
|
||||
field :resolved_at, :utc_datetime
|
||||
field :resolved_by, :string
|
||||
|
||||
belongs_to :resource, ArcadiaCloud.Cloud.CloudResource
|
||||
belongs_to :provisioned, ArcadiaCloud.Provisioning.CloudProvisioned
|
||||
|
||||
timestamps(type: :utc_datetime)
|
||||
end
|
||||
|
||||
@required ~w(resource_id provisioned_id field detected_at)a
|
||||
@optional ~w(expected actual status resolved_at resolved_by)a
|
||||
|
||||
def changeset(drift, attrs) do
|
||||
drift
|
||||
|> cast(attrs, @required ++ @optional)
|
||||
|> validate_required(@required)
|
||||
|> validate_inclusion(:status, @statuses)
|
||||
end
|
||||
end
|
||||
25
lib/arcadia_cloud/sync/drift_detection_worker.ex
Normal file
25
lib/arcadia_cloud/sync/drift_detection_worker.ex
Normal file
@@ -0,0 +1,25 @@
|
||||
defmodule ArcadiaCloud.Sync.DriftDetectionWorker do
|
||||
@moduledoc """
|
||||
Periodic drift sweep — compares every provisioned resource's desired
|
||||
spec against its live cloud_resources row. Runs after the inventory
|
||||
sync workers so it compares against fresh data.
|
||||
"""
|
||||
|
||||
use Oban.Worker, queue: :cloud_sync_full, max_attempts: 3
|
||||
|
||||
require Logger
|
||||
|
||||
alias ArcadiaCloud.Drift
|
||||
|
||||
@impl Oban.Worker
|
||||
def perform(_job) do
|
||||
summary = Drift.detect_all()
|
||||
|
||||
Logger.info(
|
||||
"[drift] checked=#{summary.checked} drifted_fields=#{summary.drifted_fields} " <>
|
||||
"new=#{summary.new_drift} stale_closed=#{summary.resolved_stale}"
|
||||
)
|
||||
|
||||
:ok
|
||||
end
|
||||
end
|
||||
67
lib/arcadia_cloud_web/controllers/drift_controller.ex
Normal file
67
lib/arcadia_cloud_web/controllers/drift_controller.ex
Normal file
@@ -0,0 +1,67 @@
|
||||
defmodule ArcadiaCloudWeb.DriftController do
|
||||
@moduledoc """
|
||||
Drift inbox — operator surface for resources whose live state has
|
||||
diverged from the desired-state we provisioned. platform_admin only.
|
||||
|
||||
accept: live value becomes the new desired-state.
|
||||
Revert (mutating live infra back to spec) is deferred — it requires a
|
||||
saga and lands with the droplet-resize work.
|
||||
"""
|
||||
|
||||
use ArcadiaCloudWeb, :controller
|
||||
|
||||
alias ArcadiaCloud.Drift
|
||||
|
||||
def index(conn, _params) do
|
||||
with :ok <- require_platform_admin(conn) do
|
||||
drift = Drift.list_open_drift() |> Enum.map(&shape/1)
|
||||
json(conn, %{drift: drift, count: length(drift)})
|
||||
end
|
||||
end
|
||||
|
||||
def accept(conn, %{"id" => id}) do
|
||||
with :ok <- require_platform_admin(conn) do
|
||||
case Drift.get_drift(id) do
|
||||
nil ->
|
||||
conn |> put_status(:not_found) |> json(%{error: "not_found"})
|
||||
|
||||
%{status: "open"} = drift ->
|
||||
actor = conn.assigns.current_identity.email || "operator"
|
||||
{:ok, _} = Drift.accept_drift(drift, actor: actor)
|
||||
json(conn, %{status: "accepted", drift_id: id})
|
||||
|
||||
%{status: status} ->
|
||||
conn
|
||||
|> put_status(:conflict)
|
||||
|> json(%{error: "already_resolved", current_status: status})
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
defp require_platform_admin(conn) do
|
||||
identity = conn.assigns.current_identity
|
||||
|
||||
if is_list(identity.roles) and "platform_admin" in identity.roles do
|
||||
:ok
|
||||
else
|
||||
conn
|
||||
|> put_status(:forbidden)
|
||||
|> json(%{error: "platform_admin_required"})
|
||||
|> halt()
|
||||
end
|
||||
end
|
||||
|
||||
defp shape(d) do
|
||||
%{
|
||||
id: d.id,
|
||||
resource_id: d.resource_id,
|
||||
resource_name: d.resource && d.resource.name,
|
||||
resource_kind: d.resource && d.resource.kind,
|
||||
field: d.field,
|
||||
expected: d.expected,
|
||||
actual: d.actual,
|
||||
status: d.status,
|
||||
detected_at: d.detected_at
|
||||
}
|
||||
end
|
||||
end
|
||||
@@ -22,5 +22,8 @@ defmodule ArcadiaCloudWeb.Router do
|
||||
|
||||
get "/billing/balance", BillingController, :balance
|
||||
get "/billing/cost-lines", BillingController, :cost_lines
|
||||
|
||||
get "/drift", DriftController, :index
|
||||
post "/drift/:id/accept", DriftController, :accept
|
||||
end
|
||||
end
|
||||
|
||||
35
priv/repo/migrations/20260520130000_create_drift.exs
Normal file
35
priv/repo/migrations/20260520130000_create_drift.exs
Normal file
@@ -0,0 +1,35 @@
|
||||
defmodule ArcadiaCloud.Repo.Migrations.CreateDrift do
|
||||
use Ecto.Migration
|
||||
|
||||
def change do
|
||||
# One row per drifted field of a provisioned resource. The operator
|
||||
# resolves each: "accept" (desired-state := actual) or "revert"
|
||||
# (schedule a saga to put actual back to desired).
|
||||
create table(:cloud_drift, primary_key: false) do
|
||||
add :id, :binary_id, primary_key: true
|
||||
add :resource_id, references(:cloud_resources, type: :binary_id, on_delete: :delete_all),
|
||||
null: false
|
||||
add :provisioned_id,
|
||||
references(:cloud_provisioned, type: :binary_id, on_delete: :delete_all),
|
||||
null: false
|
||||
add :field, :string, null: false
|
||||
add :expected, :map
|
||||
add :actual, :map
|
||||
add :status, :string, null: false, default: "open"
|
||||
add :detected_at, :utc_datetime, null: false
|
||||
add :resolved_at, :utc_datetime
|
||||
add :resolved_by, :string
|
||||
|
||||
timestamps(type: :utc_datetime)
|
||||
end
|
||||
|
||||
# At most one OPEN drift per (resource, field); resolved ones are history.
|
||||
create unique_index(:cloud_drift, [:resource_id, :field],
|
||||
where: "status = 'open'",
|
||||
name: :cloud_drift_open_unique
|
||||
)
|
||||
|
||||
create index(:cloud_drift, [:status])
|
||||
create index(:cloud_drift, [:provisioned_id])
|
||||
end
|
||||
end
|
||||
Reference in New Issue
Block a user