Files
arcadia-cloud/lib/arcadia_cloud/drift.ex
Giuliano Silvestro 445b7b60d4 Phase 2: drift detection
Compares cloud_provisioned.spec (what we asked DO for) against the live
cloud_resources row (what DO actually has). Any divergence becomes an
operator-resolvable drift record.

cloud_drift table: one row per drifted field. status open/accepted/
reverted/stale. Partial unique index keeps at most one OPEN drift per
(resource, field); resolved rows are retained as history.

ArcadiaCloud.Drift context:
- detect_all/0 — sweeps every provisioned resource. Per field in spec,
  resolves the actual value (top-level schema field first, then attrs),
  compares with loose equality (stringified scalars; lists as sets so
  JSON round-trips don't false-positive). Mismatches upsert a cloud_drift
  row + emit a drift_detected event in the resource event log.
- close_stale_drift — an open drift whose field no longer mismatches
  (fixed elsewhere) closes as "stale" on the next sweep.
- accept_drift/2 — the live value becomes the new desired-state: parent
  cloud_provisioned.spec is updated, spec_version bumped, drift closed
  "accepted". Revert (mutating live infra back to spec) is intentionally
  NOT here — it needs a saga and lands with the droplet-resize work.

DriftDetectionWorker — Oban cron at :20 past the hour, offset past the
:15 resource syncs so it compares against fresh inventory.

Provisioning.record_provisioned/3 — populates cloud_provisioned desired-
state (upsert on resource_id, bumps spec_version). Future provisioning
sagas call this; for now it's how drift gets something to detect.

API (platform_admin only):
- GET  /api/v1/drift            — open drift inbox
- POST /api/v1/drift/:id/accept — adopt live value as desired-state

Smoke verified: recorded a droplet's desired spec with a deliberately
wrong size_slug + a correct region; detect_all flagged only size_slug,
wrote the drift_detected event; accept updated the spec to the live
value and closed the drift; re-detect found zero drift.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-20 11:08:27 +10:00

236 lines
7.3 KiB
Elixir

defmodule ArcadiaCloud.Drift do
@moduledoc """
Drift detection — compares `cloud_provisioned.spec` (what we asked DO
for) against the live `cloud_resources` row (what DO actually has).
A spec is a flat map of field => expected value. Each key is resolved
against the resource: first as a top-level schema field, then as a key
in `attrs`. Any mismatch becomes a `cloud_drift` row (status "open")
plus a `drift_detected` event in the resource event log.
Resolution:
- accept_drift/2 — desired-state wins-the-other-way: spec is updated
to match the live value; the drift row closes as "accepted".
- revert is intentionally NOT auto-applied here — reverting means
mutating live infra, which must go through a saga. Phase 2 surfaces
revert as a flagged action; the saga wiring lands with the
droplet-resize saga.
Stale handling: an open drift whose field no longer mismatches (someone
fixed it, or accepted elsewhere) is closed as "stale" on the next sweep.
"""
import Ecto.Query, warn: false
alias ArcadiaCloud.Repo
alias ArcadiaCloud.Cloud.{CloudResource, CloudResourceEvent}
alias ArcadiaCloud.Provisioning.{CloudDrift, CloudProvisioned}
# ---- detection ------------------------------------------------------------
@doc """
Sweep every provisioned resource, (re)compute drift. Returns a summary
map: %{checked, drifted_fields, new_drift, resolved_stale}.
"""
def detect_all do
provisioned =
from(p in CloudProvisioned, preload: [:resource])
|> Repo.all()
Enum.reduce(provisioned, %{checked: 0, drifted_fields: 0, new_drift: 0, resolved_stale: 0},
fn prov, acc ->
result = detect_one(prov)
%{
checked: acc.checked + 1,
drifted_fields: acc.drifted_fields + result.drifted_fields,
new_drift: acc.new_drift + result.new_drift,
resolved_stale: acc.resolved_stale + result.resolved_stale
}
end)
end
@doc """
Detect drift for one provisioned resource. Returns
%{drifted_fields, new_drift, resolved_stale}.
"""
def detect_one(%CloudProvisioned{resource: %CloudResource{} = resource} = prov) do
mismatches = compute_mismatches(prov.spec, resource)
mismatched_fields = MapSet.new(mismatches, & &1.field)
new_drift =
Enum.reduce(mismatches, 0, fn m, count ->
case record_drift(prov, resource, m) do
:inserted -> count + 1
:exists -> count
end
end)
# Close any open drift rows whose field is no longer mismatching.
resolved_stale = close_stale_drift(resource.id, mismatched_fields)
%{drifted_fields: length(mismatches), new_drift: new_drift, resolved_stale: resolved_stale}
end
def detect_one(%CloudProvisioned{} = prov) do
detect_one(Repo.preload(prov, :resource))
end
# ---- comparison -----------------------------------------------------------
defp compute_mismatches(spec, %CloudResource{} = resource) when is_map(spec) do
spec
|> Enum.flat_map(fn {field, expected} ->
actual = resource_value(resource, field)
if values_equal?(expected, actual) do
[]
else
[%{field: to_string(field), expected: expected, actual: actual}]
end
end)
end
defp compute_mismatches(_spec, _resource), do: []
# Resolve a spec field against the resource: top-level schema field
# first, then attrs.
@schema_fields ~w(name region status size_slug tags)
defp resource_value(%CloudResource{} = resource, field) do
fstr = to_string(field)
if fstr in @schema_fields do
Map.get(resource, String.to_existing_atom(fstr))
else
get_in(resource.attrs || %{}, [fstr])
end
end
# Loose equality — JSON round-trips turn atoms/keywords into strings,
# so compare stringified forms for scalars; lists compared as sets.
defp values_equal?(a, b) when is_list(a) and is_list(b) do
MapSet.new(a) == MapSet.new(b)
end
defp values_equal?(a, b), do: to_string_safe(a) == to_string_safe(b)
defp to_string_safe(nil), do: ""
defp to_string_safe(v) when is_binary(v), do: v
defp to_string_safe(v), do: inspect(v)
# ---- drift rows -----------------------------------------------------------
defp record_drift(%CloudProvisioned{} = prov, %CloudResource{} = resource, mismatch) do
existing =
Repo.one(
from(d in CloudDrift,
where:
d.resource_id == ^resource.id and d.field == ^mismatch.field and
d.status == "open"
)
)
if existing do
:exists
else
now = DateTime.utc_now() |> DateTime.truncate(:second)
{:ok, _drift} =
%CloudDrift{}
|> CloudDrift.changeset(%{
resource_id: resource.id,
provisioned_id: prov.id,
field: mismatch.field,
expected: wrap(mismatch.expected),
actual: wrap(mismatch.actual),
status: "open",
detected_at: now
})
|> Repo.insert()
write_drift_event(resource, mismatch)
:inserted
end
end
defp close_stale_drift(resource_id, still_drifting_fields) do
open =
Repo.all(
from(d in CloudDrift, where: d.resource_id == ^resource_id and d.status == "open")
)
now = DateTime.utc_now() |> DateTime.truncate(:second)
Enum.reduce(open, 0, fn drift, count ->
if MapSet.member?(still_drifting_fields, drift.field) do
count
else
drift
|> CloudDrift.changeset(%{status: "stale", resolved_at: now})
|> Repo.update!()
count + 1
end
end)
end
defp write_drift_event(%CloudResource{id: id}, mismatch) do
%CloudResourceEvent{}
|> CloudResourceEvent.changeset(%{
resource_id: id,
event: "drift_detected",
before: %{"field" => mismatch.field, "expected" => wrap(mismatch.expected)},
after: %{"field" => mismatch.field, "actual" => wrap(mismatch.actual)},
source: "drift_detection",
occurred_at: DateTime.utc_now() |> DateTime.truncate(:second)
})
|> Repo.insert!()
end
# cloud_drift.expected/actual are :map columns; wrap scalars in a map.
defp wrap(v) when is_map(v), do: v
defp wrap(v), do: %{"value" => v}
defp unwrap(%{"value" => v}), do: v
defp unwrap(other), do: other
# ---- queries + resolution -------------------------------------------------
def list_open_drift do
from(d in CloudDrift,
where: d.status == "open",
order_by: [desc: d.detected_at],
preload: [:resource]
)
|> Repo.all()
end
def get_drift(id), do: Repo.get(CloudDrift, id)
@doc """
Accept a drift: the live value becomes the new desired-state. Updates
the parent cloud_provisioned.spec, bumps spec_version, closes the drift
as "accepted".
"""
def accept_drift(%CloudDrift{} = drift, opts \\ []) do
actor = Keyword.get(opts, :actor, "operator")
now = DateTime.utc_now() |> DateTime.truncate(:second)
prov = Repo.get!(CloudProvisioned, drift.provisioned_id)
new_spec = Map.put(prov.spec || %{}, drift.field, unwrap(drift.actual))
Repo.transaction(fn ->
prov
|> CloudProvisioned.changeset(%{
spec: new_spec,
spec_version: (prov.spec_version || 1) + 1
})
|> Repo.update!()
drift
|> CloudDrift.changeset(%{status: "accepted", resolved_at: now, resolved_by: actor})
|> Repo.update!()
end)
end
end