Phase 2 saga engine: compensation-based runner + step contract

The architectural spine of every write workflow phase 2+ — provisioning,
suspension, offboarding, updates, rollback — all ride this engine. Each
becomes a step list, not new orchestration code.

Schemas:
- saga_runs           — kind, status (pending/running/completed/failed/
                        compensating/rolled_back), step_modules, current
                        step idx, accumulating context (jsonb), cancel
                        flag, error.
- saga_step_results   — per-step audit ledger: status (running/completed/
                        failed/compensated), output, attempts, timings,
                        unique (saga_id, step_idx).
- cloud_provisioned   — desired-state for resources WE provisioned. spec
                        + spec_version + saga_id; FK to cloud_resources.
                        Phase 2's drift-detection diff lands here.

Step contract (ArcadiaCloud.Provisioning.Step):
- execute(state) -> {:ok, state} | {:error, reason}
- compensate(state) -> :ok | {:error, _}   (optional)
- name() -> String.t()

SagaState carries the live execution state — accumulating context,
immutable inputs, current step_idx. Helpers: get_output/put_output
(context r/w), get_input (inputs read-only).

Runner (Oban worker, queue: provisioning, max_attempts: 1):
- kick_off: pending -> running, run_step(0)
- run_step: idempotent re-entry on saga.current_step_idx; persists step
  result + saga context after each step; recursive forward walk through
  the whole step list within one perform/1 call.
- safe_execute: try/rescue/catch around module.execute so a raised
  exception triggers compensation rather than blowing up the worker.
- start_compensation: status=compensating, walk from idx-1 down to 0,
  calling compensate/1 where it's exported; logs but doesn't halt on
  compensate failure (best-effort + audit log).
- cancellation: checked between steps; cancel_requested=true -> trigger
  compensation from current idx.
- crash recovery: max_attempts: 1 + run_step keyed on
  saga.current_step_idx means Oban requeue picks up at the right place,
  but full crash-resume infra is deferred to phase 2.5 (manual re-enqueue
  works for now).

Two proof-of-concept steps (Steps.Echo, Steps.Fail) demonstrate the
engine without any DO API exposure. First real DO write step lands in
the next chunk.

Provisioning context provides start_saga/1, list_sagas/1,
list_step_results/1, cancel_saga/1, upsert_step_result/3.

Live smoke verified end-to-end:
- [Echo, Echo, Echo] happy path: all 3 completed, context accumulated
  echoed_at_step_0/1/2 = "hello".
- [Echo, Echo, Fail] failure path: step 2 failed, compensation walked
  back through step 1 then step 0; final status rolled_back with error
  {compensate_from_idx: 1, reason: "step_failed:fail"}; ledger shows
  echo/echo/fail with statuses compensated/compensated/failed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-20 07:24:30 +10:00
parent ea3101ca2f
commit 3274a4adab
10 changed files with 628 additions and 0 deletions

View File

@@ -0,0 +1,29 @@
defmodule ArcadiaCloud.Provisioning.CloudProvisioned do
use Ecto.Schema
import Ecto.Changeset
@primary_key {:id, :binary_id, autogenerate: true}
@foreign_key_type :binary_id
schema "cloud_provisioned" do
field :spec, :map, default: %{}
field :spec_version, :integer, default: 1
field :provisioned_at, :utc_datetime
field :provisioned_by, :string
belongs_to :resource, ArcadiaCloud.Cloud.CloudResource
belongs_to :saga, ArcadiaCloud.Provisioning.SagaRun
timestamps(type: :utc_datetime)
end
@required ~w(resource_id spec provisioned_at)a
@optional ~w(spec_version provisioned_by saga_id)a
def changeset(prov, attrs) do
prov
|> cast(attrs, @required ++ @optional)
|> validate_required(@required)
|> unique_constraint(:resource_id)
end
end

View File

@@ -0,0 +1,218 @@
defmodule ArcadiaCloud.Provisioning.Runner do
@moduledoc """
Compensation-based saga executor.
Walks `saga.step_modules` in order. Each step gets a `%SagaState{}` and
returns `{:ok, new_state}` or `{:error, reason}`. On failure, the runner
flips status to `compensating` and walks already-completed steps in
REVERSE, calling `compensate/1` on any that implement it.
Crash safety: this worker is enqueued via Oban with `max_attempts: 1`
because retry semantics are step-level, not job-level. On crash, the
saga row stays in whatever status it was last persisted with; a
separate `resume_stuck_sagas` workflow (phase 2.5) handles cleanup.
For phase 2 first chunk, you'd re-enqueue manually.
Cancellation: `Provisioning.cancel_saga/1` flips `cancel_requested=true`.
The runner checks this between steps and transitions to compensating
instead of advancing.
"""
use Oban.Worker, queue: :provisioning, max_attempts: 1
require Logger
alias ArcadiaCloud.Provisioning
alias ArcadiaCloud.Provisioning.{SagaRun, SagaState}
alias ArcadiaCloud.Repo
@impl Oban.Worker
def perform(%Oban.Job{args: %{"saga_id" => saga_id}}) do
saga = Repo.get!(SagaRun, saga_id)
case saga.status do
"pending" -> kick_off(saga)
"running" -> resume(saga)
"compensating" -> resume_compensation(saga)
_terminal -> :ok
end
end
# ---- forward path ---------------------------------------------------------
defp kick_off(saga) do
now = DateTime.utc_now() |> DateTime.truncate(:second)
{:ok, saga} = Provisioning.update_saga(saga, %{status: "running", started_at: now})
run_step(saga, 0)
end
defp resume(saga), do: run_step(saga, saga.current_step_idx)
defp run_step(saga, idx) do
cond do
saga.cancel_requested ->
Logger.info("[saga #{saga.id}] cancellation requested; compensating from step #{idx - 1}")
start_compensation(saga, idx - 1, "cancelled")
idx >= length(saga.step_modules) ->
complete(saga)
true ->
do_run_step(saga, idx)
end
end
defp do_run_step(saga, idx) do
{:ok, saga} = Provisioning.update_saga(saga, %{current_step_idx: idx})
module_name = Enum.at(saga.step_modules, idx)
module = resolve_module(module_name)
started_at = DateTime.utc_now() |> DateTime.truncate(:second)
{:ok, _} =
Provisioning.upsert_step_result(saga.id, idx, %{
step_name: module.name(),
status: "running",
started_at: started_at,
attempts: 1
})
state = SagaState.from(saga) |> Map.put(:step_idx, idx)
case safe_execute(module, state) do
{:ok, %SagaState{} = new_state} ->
finished = DateTime.utc_now() |> DateTime.truncate(:second)
{:ok, _} =
Provisioning.upsert_step_result(saga.id, idx, %{
step_name: module.name(),
status: "completed",
output: extract_output(new_state),
completed_at: finished
})
{:ok, saga} =
Provisioning.update_saga(saga, %{context: new_state.context})
run_step(saga, idx + 1)
{:error, reason} ->
finished = DateTime.utc_now() |> DateTime.truncate(:second)
{:ok, _} =
Provisioning.upsert_step_result(saga.id, idx, %{
step_name: module.name(),
status: "failed",
error: error_to_map(reason),
completed_at: finished
})
Logger.warning("[saga #{saga.id}] step #{idx} (#{module.name()}) failed: #{inspect(reason)}")
start_compensation(saga, idx - 1, "step_failed:#{module.name()}")
end
end
defp safe_execute(module, state) do
try do
module.execute(state)
rescue
e -> {:error, {:exception, Exception.message(e)}}
catch
kind, payload -> {:error, {kind, payload}}
end
end
# ---- compensation path ----------------------------------------------------
defp start_compensation(saga, from_idx, error_reason) do
error = %{"reason" => error_reason, "compensate_from_idx" => from_idx}
{:ok, saga} =
Provisioning.update_saga(saga, %{
status: "compensating",
current_step_idx: from_idx,
error: error
})
compensate_step(saga, from_idx)
end
defp resume_compensation(saga), do: compensate_step(saga, saga.current_step_idx)
defp compensate_step(saga, idx) when idx < 0 do
finished = DateTime.utc_now() |> DateTime.truncate(:second)
{:ok, _} = Provisioning.update_saga(saga, %{status: "rolled_back", completed_at: finished})
:ok
end
defp compensate_step(saga, idx) do
{:ok, saga} = Provisioning.update_saga(saga, %{current_step_idx: idx})
module_name = Enum.at(saga.step_modules, idx)
module = resolve_module(module_name)
state = SagaState.from(saga) |> Map.put(:step_idx, idx)
if function_exported?(module, :compensate, 1) do
result =
try do
module.compensate(state)
rescue
e ->
Logger.error(
"[saga #{saga.id}] compensate raised at step #{idx}: #{Exception.message(e)}"
)
{:error, {:exception, Exception.message(e)}}
end
case result do
:ok ->
mark_compensated(saga.id, idx, module.name())
{:error, reason} ->
Logger.warning(
"[saga #{saga.id}] compensate failed at step #{idx} (#{module.name()}): #{inspect(reason)} — continuing"
)
mark_compensated(saga.id, idx, module.name(), error_to_map(reason))
end
else
mark_compensated(saga.id, idx, module.name())
end
compensate_step(saga, idx - 1)
end
defp mark_compensated(saga_id, idx, name, err \\ nil) do
attrs = %{
step_name: name,
status: "compensated",
completed_at: DateTime.utc_now() |> DateTime.truncate(:second)
}
attrs = if err, do: Map.put(attrs, :error, err), else: attrs
{:ok, _} = Provisioning.upsert_step_result(saga_id, idx, attrs)
end
# ---- completion -----------------------------------------------------------
defp complete(saga) do
finished = DateTime.utc_now() |> DateTime.truncate(:second)
{:ok, _} = Provisioning.update_saga(saga, %{status: "completed", completed_at: finished})
:ok
end
# ---- helpers --------------------------------------------------------------
defp resolve_module(name) when is_binary(name) do
String.to_existing_atom("Elixir." <> String.trim_leading(name, "Elixir."))
end
defp extract_output(%SagaState{context: ctx}) do
Map.drop(ctx, ["__inputs__"])
end
defp error_to_map({kind, payload}) when is_atom(kind),
do: %{"kind" => to_string(kind), "payload" => inspect(payload)}
defp error_to_map(other), do: %{"reason" => inspect(other)}
end

View File

@@ -0,0 +1,38 @@
defmodule ArcadiaCloud.Provisioning.SagaRun do
use Ecto.Schema
import Ecto.Changeset
@primary_key {:id, :binary_id, autogenerate: true}
@foreign_key_type :binary_id
@kinds ~w(provision suspend unsuspend offboard update rollback test)
@statuses ~w(pending running completed failed compensating rolled_back)
schema "saga_runs" do
field :deployment_id, :binary_id
field :kind, :string
field :status, :string, default: "pending"
field :step_modules, {:array, :string}, default: []
field :current_step_idx, :integer, default: 0
field :context, :map, default: %{}
field :started_at, :utc_datetime
field :completed_at, :utc_datetime
field :cancel_requested, :boolean, default: false
field :error, :map
field :triggered_by, :string
timestamps(type: :utc_datetime)
end
@required ~w(kind step_modules)a
@optional ~w(deployment_id status current_step_idx context started_at
completed_at cancel_requested error triggered_by)a
def changeset(saga, attrs) do
saga
|> cast(attrs, @required ++ @optional)
|> validate_required(@required)
|> validate_inclusion(:kind, @kinds)
|> validate_inclusion(:status, @statuses)
end
end

View File

@@ -0,0 +1,65 @@
defmodule ArcadiaCloud.Provisioning.SagaState do
@moduledoc """
Carries the live saga context across step calls. Steps receive this
struct, mutate it via the helpers, return a new value.
`context` is the accumulating bag of created-resource IDs and other
per-saga state. Outputs from a step (e.g. `droplet_id` after
CreateDroplet) live here so later steps and compensation can find
what to act on.
`inputs` are the immutable arguments the saga was started with
(e.g. `template_id`, `deployment_slug`). Steps may read these but
never write to them.
"""
alias ArcadiaCloud.Provisioning.SagaRun
defstruct [:saga_id, :saga, :context, :inputs, :step_idx]
@type t :: %__MODULE__{
saga_id: binary(),
saga: SagaRun.t() | nil,
context: map(),
inputs: map(),
step_idx: non_neg_integer() | nil
}
def from(%SagaRun{} = saga) do
%__MODULE__{
saga_id: saga.id,
saga: saga,
context: saga.context || %{},
inputs: (saga.context || %{})["__inputs__"] || %{},
step_idx: saga.current_step_idx
}
end
@doc """
Idempotency hook: was a value written by a prior attempt of this same step?
Returns the value or nil.
"""
def get_output(%__MODULE__{context: ctx}, key) when is_atom(key) do
get_output(%__MODULE__{context: ctx}, Atom.to_string(key))
end
def get_output(%__MODULE__{context: ctx}, key) when is_binary(key) do
Map.get(ctx, key)
end
def put_output(%__MODULE__{} = state, key, value) when is_atom(key) do
put_output(state, Atom.to_string(key), value)
end
def put_output(%__MODULE__{context: ctx} = state, key, value) when is_binary(key) do
%{state | context: Map.put(ctx, key, value)}
end
def get_input(%__MODULE__{inputs: inputs}, key) when is_atom(key) do
Map.get(inputs, Atom.to_string(key))
end
def get_input(%__MODULE__{inputs: inputs}, key) when is_binary(key) do
Map.get(inputs, key)
end
end

View File

@@ -0,0 +1,33 @@
defmodule ArcadiaCloud.Provisioning.SagaStepResult do
use Ecto.Schema
import Ecto.Changeset
@primary_key {:id, :binary_id, autogenerate: true}
@foreign_key_type :binary_id
@statuses ~w(running completed failed compensated)
schema "saga_step_results" do
field :step_idx, :integer
field :step_name, :string
field :status, :string
field :output, :map, default: %{}
field :attempts, :integer, default: 0
field :started_at, :utc_datetime
field :completed_at, :utc_datetime
field :error, :map
belongs_to :saga, ArcadiaCloud.Provisioning.SagaRun
end
@required ~w(saga_id step_idx step_name status)a
@optional ~w(output attempts started_at completed_at error)a
def changeset(result, attrs) do
result
|> cast(attrs, @required ++ @optional)
|> validate_required(@required)
|> validate_inclusion(:status, @statuses)
|> unique_constraint([:saga_id, :step_idx])
end
end

View File

@@ -0,0 +1,29 @@
defmodule ArcadiaCloud.Provisioning.Step do
@moduledoc """
Contract every saga step module implements.
Steps MUST be:
1. Idempotent — re-running produces the same effect; check context
for prior outputs (`SagaState.get_output/2`) before doing work.
2. Compensable — has an undo or explicitly declares it doesn't need
one (compensate is optional; default = noop).
3. Self-describing — writes its result into the saga context via
`SagaState.put_output/3` so later steps + compensation can find
what to act on.
Failure modes:
{:ok, %SagaState{}} — step succeeded, advance.
{:error, reason} — step failed, runner triggers compensation
of all completed steps in reverse order.
"""
alias ArcadiaCloud.Provisioning.SagaState
@callback execute(SagaState.t()) :: {:ok, SagaState.t()} | {:error, term()}
@callback compensate(SagaState.t()) :: :ok | {:error, term()}
@callback name() :: String.t()
@optional_callbacks [compensate: 1]
end

View File

@@ -0,0 +1,28 @@
defmodule ArcadiaCloud.Provisioning.Steps.Echo do
@moduledoc """
Trivial proof-of-concept step. Reads `:echo_value` from saga inputs and
copies it to context under a step-specific key so subsequent steps can
see it. Used by engine smoke tests only — not for real workloads.
"""
@behaviour ArcadiaCloud.Provisioning.Step
alias ArcadiaCloud.Provisioning.SagaState
@impl true
def name, do: "echo"
@impl true
def execute(state) do
value = SagaState.get_input(state, :echo_value) || "default"
{:ok, SagaState.put_output(state, "echoed_at_step_#{state.step_idx}", value)}
end
@impl true
def compensate(state) do
# Echo is reversible: we can simply forget what we wrote. No real
# side-effects to undo.
_ = state
:ok
end
end

View File

@@ -0,0 +1,16 @@
defmodule ArcadiaCloud.Provisioning.Steps.Fail do
@moduledoc """
Always-fails step. Used by engine smoke tests to verify compensation
triggers correctly when a step errors.
"""
@behaviour ArcadiaCloud.Provisioning.Step
@impl true
def name, do: "fail"
@impl true
def execute(_state) do
{:error, :intentional_failure}
end
end