Files
Giuliano Silvestro 3274a4adab Phase 2 saga engine: compensation-based runner + step contract
The architectural spine of every write workflow phase 2+ — provisioning,
suspension, offboarding, updates, rollback — all ride this engine. Each
becomes a step list, not new orchestration code.

Schemas:
- saga_runs           — kind, status (pending/running/completed/failed/
                        compensating/rolled_back), step_modules, current
                        step idx, accumulating context (jsonb), cancel
                        flag, error.
- saga_step_results   — per-step audit ledger: status (running/completed/
                        failed/compensated), output, attempts, timings,
                        unique (saga_id, step_idx).
- cloud_provisioned   — desired-state for resources WE provisioned. spec
                        + spec_version + saga_id; FK to cloud_resources.
                        Phase 2's drift-detection diff lands here.

Step contract (ArcadiaCloud.Provisioning.Step):
- execute(state) -> {:ok, state} | {:error, reason}
- compensate(state) -> :ok | {:error, _}   (optional)
- name() -> String.t()

SagaState carries the live execution state — accumulating context,
immutable inputs, current step_idx. Helpers: get_output/put_output
(context r/w), get_input (inputs read-only).

Runner (Oban worker, queue: provisioning, max_attempts: 1):
- kick_off: pending -> running, run_step(0)
- run_step: idempotent re-entry on saga.current_step_idx; persists step
  result + saga context after each step; recursive forward walk through
  the whole step list within one perform/1 call.
- safe_execute: try/rescue/catch around module.execute so a raised
  exception triggers compensation rather than blowing up the worker.
- start_compensation: status=compensating, walk from idx-1 down to 0,
  calling compensate/1 where it's exported; logs but doesn't halt on
  compensate failure (best-effort + audit log).
- cancellation: checked between steps; cancel_requested=true -> trigger
  compensation from current idx.
- crash recovery: max_attempts: 1 + run_step keyed on
  saga.current_step_idx means Oban requeue picks up at the right place,
  but full crash-resume infra is deferred to phase 2.5 (manual re-enqueue
  works for now).

Two proof-of-concept steps (Steps.Echo, Steps.Fail) demonstrate the
engine without any DO API exposure. First real DO write step lands in
the next chunk.

Provisioning context provides start_saga/1, list_sagas/1,
list_step_results/1, cancel_saga/1, upsert_step_result/3.

Live smoke verified end-to-end:
- [Echo, Echo, Echo] happy path: all 3 completed, context accumulated
  echoed_at_step_0/1/2 = "hello".
- [Echo, Echo, Fail] failure path: step 2 failed, compensation walked
  back through step 1 then step 0; final status rolled_back with error
  {compensate_from_idx: 1, reason: "step_failed:fail"}; ledger shows
  echo/echo/fail with statuses compensated/compensated/failed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-20 07:24:30 +10:00

219 lines
6.6 KiB
Elixir

defmodule ArcadiaCloud.Provisioning.Runner do
@moduledoc """
Compensation-based saga executor.
Walks `saga.step_modules` in order. Each step gets a `%SagaState{}` and
returns `{:ok, new_state}` or `{:error, reason}`. On failure, the runner
flips status to `compensating` and walks already-completed steps in
REVERSE, calling `compensate/1` on any that implement it.
Crash safety: this worker is enqueued via Oban with `max_attempts: 1`
because retry semantics are step-level, not job-level. On crash, the
saga row stays in whatever status it was last persisted with; a
separate `resume_stuck_sagas` workflow (phase 2.5) handles cleanup.
For phase 2 first chunk, you'd re-enqueue manually.
Cancellation: `Provisioning.cancel_saga/1` flips `cancel_requested=true`.
The runner checks this between steps and transitions to compensating
instead of advancing.
"""
use Oban.Worker, queue: :provisioning, max_attempts: 1
require Logger
alias ArcadiaCloud.Provisioning
alias ArcadiaCloud.Provisioning.{SagaRun, SagaState}
alias ArcadiaCloud.Repo
@impl Oban.Worker
def perform(%Oban.Job{args: %{"saga_id" => saga_id}}) do
saga = Repo.get!(SagaRun, saga_id)
case saga.status do
"pending" -> kick_off(saga)
"running" -> resume(saga)
"compensating" -> resume_compensation(saga)
_terminal -> :ok
end
end
# ---- forward path ---------------------------------------------------------
defp kick_off(saga) do
now = DateTime.utc_now() |> DateTime.truncate(:second)
{:ok, saga} = Provisioning.update_saga(saga, %{status: "running", started_at: now})
run_step(saga, 0)
end
defp resume(saga), do: run_step(saga, saga.current_step_idx)
defp run_step(saga, idx) do
cond do
saga.cancel_requested ->
Logger.info("[saga #{saga.id}] cancellation requested; compensating from step #{idx - 1}")
start_compensation(saga, idx - 1, "cancelled")
idx >= length(saga.step_modules) ->
complete(saga)
true ->
do_run_step(saga, idx)
end
end
defp do_run_step(saga, idx) do
{:ok, saga} = Provisioning.update_saga(saga, %{current_step_idx: idx})
module_name = Enum.at(saga.step_modules, idx)
module = resolve_module(module_name)
started_at = DateTime.utc_now() |> DateTime.truncate(:second)
{:ok, _} =
Provisioning.upsert_step_result(saga.id, idx, %{
step_name: module.name(),
status: "running",
started_at: started_at,
attempts: 1
})
state = SagaState.from(saga) |> Map.put(:step_idx, idx)
case safe_execute(module, state) do
{:ok, %SagaState{} = new_state} ->
finished = DateTime.utc_now() |> DateTime.truncate(:second)
{:ok, _} =
Provisioning.upsert_step_result(saga.id, idx, %{
step_name: module.name(),
status: "completed",
output: extract_output(new_state),
completed_at: finished
})
{:ok, saga} =
Provisioning.update_saga(saga, %{context: new_state.context})
run_step(saga, idx + 1)
{:error, reason} ->
finished = DateTime.utc_now() |> DateTime.truncate(:second)
{:ok, _} =
Provisioning.upsert_step_result(saga.id, idx, %{
step_name: module.name(),
status: "failed",
error: error_to_map(reason),
completed_at: finished
})
Logger.warning("[saga #{saga.id}] step #{idx} (#{module.name()}) failed: #{inspect(reason)}")
start_compensation(saga, idx - 1, "step_failed:#{module.name()}")
end
end
defp safe_execute(module, state) do
try do
module.execute(state)
rescue
e -> {:error, {:exception, Exception.message(e)}}
catch
kind, payload -> {:error, {kind, payload}}
end
end
# ---- compensation path ----------------------------------------------------
defp start_compensation(saga, from_idx, error_reason) do
error = %{"reason" => error_reason, "compensate_from_idx" => from_idx}
{:ok, saga} =
Provisioning.update_saga(saga, %{
status: "compensating",
current_step_idx: from_idx,
error: error
})
compensate_step(saga, from_idx)
end
defp resume_compensation(saga), do: compensate_step(saga, saga.current_step_idx)
defp compensate_step(saga, idx) when idx < 0 do
finished = DateTime.utc_now() |> DateTime.truncate(:second)
{:ok, _} = Provisioning.update_saga(saga, %{status: "rolled_back", completed_at: finished})
:ok
end
defp compensate_step(saga, idx) do
{:ok, saga} = Provisioning.update_saga(saga, %{current_step_idx: idx})
module_name = Enum.at(saga.step_modules, idx)
module = resolve_module(module_name)
state = SagaState.from(saga) |> Map.put(:step_idx, idx)
if function_exported?(module, :compensate, 1) do
result =
try do
module.compensate(state)
rescue
e ->
Logger.error(
"[saga #{saga.id}] compensate raised at step #{idx}: #{Exception.message(e)}"
)
{:error, {:exception, Exception.message(e)}}
end
case result do
:ok ->
mark_compensated(saga.id, idx, module.name())
{:error, reason} ->
Logger.warning(
"[saga #{saga.id}] compensate failed at step #{idx} (#{module.name()}): #{inspect(reason)} — continuing"
)
mark_compensated(saga.id, idx, module.name(), error_to_map(reason))
end
else
mark_compensated(saga.id, idx, module.name())
end
compensate_step(saga, idx - 1)
end
defp mark_compensated(saga_id, idx, name, err \\ nil) do
attrs = %{
step_name: name,
status: "compensated",
completed_at: DateTime.utc_now() |> DateTime.truncate(:second)
}
attrs = if err, do: Map.put(attrs, :error, err), else: attrs
{:ok, _} = Provisioning.upsert_step_result(saga_id, idx, attrs)
end
# ---- completion -----------------------------------------------------------
defp complete(saga) do
finished = DateTime.utc_now() |> DateTime.truncate(:second)
{:ok, _} = Provisioning.update_saga(saga, %{status: "completed", completed_at: finished})
:ok
end
# ---- helpers --------------------------------------------------------------
defp resolve_module(name) when is_binary(name) do
String.to_existing_atom("Elixir." <> String.trim_leading(name, "Elixir."))
end
defp extract_output(%SagaState{context: ctx}) do
Map.drop(ctx, ["__inputs__"])
end
defp error_to_map({kind, payload}) when is_atom(kind),
do: %{"kind" => to_string(kind), "payload" => inspect(payload)}
defp error_to_map(other), do: %{"reason" => inspect(other)}
end