The architectural spine of every write workflow phase 2+ — provisioning,
suspension, offboarding, updates, rollback — all ride this engine. Each
becomes a step list, not new orchestration code.
Schemas:
- saga_runs — kind, status (pending/running/completed/failed/
compensating/rolled_back), step_modules, current
step idx, accumulating context (jsonb), cancel
flag, error.
- saga_step_results — per-step audit ledger: status (running/completed/
failed/compensated), output, attempts, timings,
unique (saga_id, step_idx).
- cloud_provisioned — desired-state for resources WE provisioned. spec
+ spec_version + saga_id; FK to cloud_resources.
Phase 2's drift-detection diff lands here.
Step contract (ArcadiaCloud.Provisioning.Step):
- execute(state) -> {:ok, state} | {:error, reason}
- compensate(state) -> :ok | {:error, _} (optional)
- name() -> String.t()
SagaState carries the live execution state — accumulating context,
immutable inputs, current step_idx. Helpers: get_output/put_output
(context r/w), get_input (inputs read-only).
Runner (Oban worker, queue: provisioning, max_attempts: 1):
- kick_off: pending -> running, run_step(0)
- run_step: idempotent re-entry on saga.current_step_idx; persists step
result + saga context after each step; recursive forward walk through
the whole step list within one perform/1 call.
- safe_execute: try/rescue/catch around module.execute so a raised
exception triggers compensation rather than blowing up the worker.
- start_compensation: status=compensating, walk from idx-1 down to 0,
calling compensate/1 where it's exported; logs but doesn't halt on
compensate failure (best-effort + audit log).
- cancellation: checked between steps; cancel_requested=true -> trigger
compensation from current idx.
- crash recovery: max_attempts: 1 + run_step keyed on
saga.current_step_idx means Oban requeue picks up at the right place,
but full crash-resume infra is deferred to phase 2.5 (manual re-enqueue
works for now).
Two proof-of-concept steps (Steps.Echo, Steps.Fail) demonstrate the
engine without any DO API exposure. First real DO write step lands in
the next chunk.
Provisioning context provides start_saga/1, list_sagas/1,
list_step_results/1, cancel_saga/1, upsert_step_result/3.
Live smoke verified end-to-end:
- [Echo, Echo, Echo] happy path: all 3 completed, context accumulated
echoed_at_step_0/1/2 = "hello".
- [Echo, Echo, Fail] failure path: step 2 failed, compensation walked
back through step 1 then step 0; final status rolled_back with error
{compensate_from_idx: 1, reason: "step_failed:fail"}; ledger shows
echo/echo/fail with statuses compensated/compensated/failed.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
219 lines
6.6 KiB
Elixir
219 lines
6.6 KiB
Elixir
defmodule ArcadiaCloud.Provisioning.Runner do
|
|
@moduledoc """
|
|
Compensation-based saga executor.
|
|
|
|
Walks `saga.step_modules` in order. Each step gets a `%SagaState{}` and
|
|
returns `{:ok, new_state}` or `{:error, reason}`. On failure, the runner
|
|
flips status to `compensating` and walks already-completed steps in
|
|
REVERSE, calling `compensate/1` on any that implement it.
|
|
|
|
Crash safety: this worker is enqueued via Oban with `max_attempts: 1`
|
|
because retry semantics are step-level, not job-level. On crash, the
|
|
saga row stays in whatever status it was last persisted with; a
|
|
separate `resume_stuck_sagas` workflow (phase 2.5) handles cleanup.
|
|
For phase 2 first chunk, you'd re-enqueue manually.
|
|
|
|
Cancellation: `Provisioning.cancel_saga/1` flips `cancel_requested=true`.
|
|
The runner checks this between steps and transitions to compensating
|
|
instead of advancing.
|
|
"""
|
|
|
|
use Oban.Worker, queue: :provisioning, max_attempts: 1
|
|
|
|
require Logger
|
|
|
|
alias ArcadiaCloud.Provisioning
|
|
alias ArcadiaCloud.Provisioning.{SagaRun, SagaState}
|
|
alias ArcadiaCloud.Repo
|
|
|
|
@impl Oban.Worker
|
|
def perform(%Oban.Job{args: %{"saga_id" => saga_id}}) do
|
|
saga = Repo.get!(SagaRun, saga_id)
|
|
|
|
case saga.status do
|
|
"pending" -> kick_off(saga)
|
|
"running" -> resume(saga)
|
|
"compensating" -> resume_compensation(saga)
|
|
_terminal -> :ok
|
|
end
|
|
end
|
|
|
|
# ---- forward path ---------------------------------------------------------
|
|
|
|
defp kick_off(saga) do
|
|
now = DateTime.utc_now() |> DateTime.truncate(:second)
|
|
|
|
{:ok, saga} = Provisioning.update_saga(saga, %{status: "running", started_at: now})
|
|
run_step(saga, 0)
|
|
end
|
|
|
|
defp resume(saga), do: run_step(saga, saga.current_step_idx)
|
|
|
|
defp run_step(saga, idx) do
|
|
cond do
|
|
saga.cancel_requested ->
|
|
Logger.info("[saga #{saga.id}] cancellation requested; compensating from step #{idx - 1}")
|
|
start_compensation(saga, idx - 1, "cancelled")
|
|
|
|
idx >= length(saga.step_modules) ->
|
|
complete(saga)
|
|
|
|
true ->
|
|
do_run_step(saga, idx)
|
|
end
|
|
end
|
|
|
|
defp do_run_step(saga, idx) do
|
|
{:ok, saga} = Provisioning.update_saga(saga, %{current_step_idx: idx})
|
|
|
|
module_name = Enum.at(saga.step_modules, idx)
|
|
module = resolve_module(module_name)
|
|
started_at = DateTime.utc_now() |> DateTime.truncate(:second)
|
|
|
|
{:ok, _} =
|
|
Provisioning.upsert_step_result(saga.id, idx, %{
|
|
step_name: module.name(),
|
|
status: "running",
|
|
started_at: started_at,
|
|
attempts: 1
|
|
})
|
|
|
|
state = SagaState.from(saga) |> Map.put(:step_idx, idx)
|
|
|
|
case safe_execute(module, state) do
|
|
{:ok, %SagaState{} = new_state} ->
|
|
finished = DateTime.utc_now() |> DateTime.truncate(:second)
|
|
|
|
{:ok, _} =
|
|
Provisioning.upsert_step_result(saga.id, idx, %{
|
|
step_name: module.name(),
|
|
status: "completed",
|
|
output: extract_output(new_state),
|
|
completed_at: finished
|
|
})
|
|
|
|
{:ok, saga} =
|
|
Provisioning.update_saga(saga, %{context: new_state.context})
|
|
|
|
run_step(saga, idx + 1)
|
|
|
|
{:error, reason} ->
|
|
finished = DateTime.utc_now() |> DateTime.truncate(:second)
|
|
|
|
{:ok, _} =
|
|
Provisioning.upsert_step_result(saga.id, idx, %{
|
|
step_name: module.name(),
|
|
status: "failed",
|
|
error: error_to_map(reason),
|
|
completed_at: finished
|
|
})
|
|
|
|
Logger.warning("[saga #{saga.id}] step #{idx} (#{module.name()}) failed: #{inspect(reason)}")
|
|
start_compensation(saga, idx - 1, "step_failed:#{module.name()}")
|
|
end
|
|
end
|
|
|
|
defp safe_execute(module, state) do
|
|
try do
|
|
module.execute(state)
|
|
rescue
|
|
e -> {:error, {:exception, Exception.message(e)}}
|
|
catch
|
|
kind, payload -> {:error, {kind, payload}}
|
|
end
|
|
end
|
|
|
|
# ---- compensation path ----------------------------------------------------
|
|
|
|
defp start_compensation(saga, from_idx, error_reason) do
|
|
error = %{"reason" => error_reason, "compensate_from_idx" => from_idx}
|
|
|
|
{:ok, saga} =
|
|
Provisioning.update_saga(saga, %{
|
|
status: "compensating",
|
|
current_step_idx: from_idx,
|
|
error: error
|
|
})
|
|
|
|
compensate_step(saga, from_idx)
|
|
end
|
|
|
|
defp resume_compensation(saga), do: compensate_step(saga, saga.current_step_idx)
|
|
|
|
defp compensate_step(saga, idx) when idx < 0 do
|
|
finished = DateTime.utc_now() |> DateTime.truncate(:second)
|
|
{:ok, _} = Provisioning.update_saga(saga, %{status: "rolled_back", completed_at: finished})
|
|
:ok
|
|
end
|
|
|
|
defp compensate_step(saga, idx) do
|
|
{:ok, saga} = Provisioning.update_saga(saga, %{current_step_idx: idx})
|
|
|
|
module_name = Enum.at(saga.step_modules, idx)
|
|
module = resolve_module(module_name)
|
|
state = SagaState.from(saga) |> Map.put(:step_idx, idx)
|
|
|
|
if function_exported?(module, :compensate, 1) do
|
|
result =
|
|
try do
|
|
module.compensate(state)
|
|
rescue
|
|
e ->
|
|
Logger.error(
|
|
"[saga #{saga.id}] compensate raised at step #{idx}: #{Exception.message(e)}"
|
|
)
|
|
{:error, {:exception, Exception.message(e)}}
|
|
end
|
|
|
|
case result do
|
|
:ok ->
|
|
mark_compensated(saga.id, idx, module.name())
|
|
|
|
{:error, reason} ->
|
|
Logger.warning(
|
|
"[saga #{saga.id}] compensate failed at step #{idx} (#{module.name()}): #{inspect(reason)} — continuing"
|
|
)
|
|
mark_compensated(saga.id, idx, module.name(), error_to_map(reason))
|
|
end
|
|
else
|
|
mark_compensated(saga.id, idx, module.name())
|
|
end
|
|
|
|
compensate_step(saga, idx - 1)
|
|
end
|
|
|
|
defp mark_compensated(saga_id, idx, name, err \\ nil) do
|
|
attrs = %{
|
|
step_name: name,
|
|
status: "compensated",
|
|
completed_at: DateTime.utc_now() |> DateTime.truncate(:second)
|
|
}
|
|
|
|
attrs = if err, do: Map.put(attrs, :error, err), else: attrs
|
|
{:ok, _} = Provisioning.upsert_step_result(saga_id, idx, attrs)
|
|
end
|
|
|
|
# ---- completion -----------------------------------------------------------
|
|
|
|
defp complete(saga) do
|
|
finished = DateTime.utc_now() |> DateTime.truncate(:second)
|
|
{:ok, _} = Provisioning.update_saga(saga, %{status: "completed", completed_at: finished})
|
|
:ok
|
|
end
|
|
|
|
# ---- helpers --------------------------------------------------------------
|
|
|
|
defp resolve_module(name) when is_binary(name) do
|
|
String.to_existing_atom("Elixir." <> String.trim_leading(name, "Elixir."))
|
|
end
|
|
|
|
defp extract_output(%SagaState{context: ctx}) do
|
|
Map.drop(ctx, ["__inputs__"])
|
|
end
|
|
|
|
defp error_to_map({kind, payload}) when is_atom(kind),
|
|
do: %{"kind" => to_string(kind), "payload" => inspect(payload)}
|
|
|
|
defp error_to_map(other), do: %{"reason" => inspect(other)}
|
|
end
|