The architectural spine of every write workflow phase 2+ — provisioning,
suspension, offboarding, updates, rollback — all ride this engine. Each
becomes a step list, not new orchestration code.
Schemas:
- saga_runs — kind, status (pending/running/completed/failed/
compensating/rolled_back), step_modules, current
step idx, accumulating context (jsonb), cancel
flag, error.
- saga_step_results — per-step audit ledger: status (running/completed/
failed/compensated), output, attempts, timings,
unique (saga_id, step_idx).
- cloud_provisioned — desired-state for resources WE provisioned. spec
+ spec_version + saga_id; FK to cloud_resources.
Phase 2's drift-detection diff lands here.
Step contract (ArcadiaCloud.Provisioning.Step):
- execute(state) -> {:ok, state} | {:error, reason}
- compensate(state) -> :ok | {:error, _} (optional)
- name() -> String.t()
SagaState carries the live execution state — accumulating context,
immutable inputs, current step_idx. Helpers: get_output/put_output
(context r/w), get_input (inputs read-only).
Runner (Oban worker, queue: provisioning, max_attempts: 1):
- kick_off: pending -> running, run_step(0)
- run_step: idempotent re-entry on saga.current_step_idx; persists step
result + saga context after each step; recursive forward walk through
the whole step list within one perform/1 call.
- safe_execute: try/rescue/catch around module.execute so a raised
exception triggers compensation rather than blowing up the worker.
- start_compensation: status=compensating, walk from idx-1 down to 0,
calling compensate/1 where it's exported; logs but doesn't halt on
compensate failure (best-effort + audit log).
- cancellation: checked between steps; cancel_requested=true -> trigger
compensation from current idx.
- crash recovery: max_attempts: 1 + run_step keyed on
saga.current_step_idx means Oban requeue picks up at the right place,
but full crash-resume infra is deferred to phase 2.5 (manual re-enqueue
works for now).
Two proof-of-concept steps (Steps.Echo, Steps.Fail) demonstrate the
engine without any DO API exposure. First real DO write step lands in
the next chunk.
Provisioning context provides start_saga/1, list_sagas/1,
list_step_results/1, cancel_saga/1, upsert_step_result/3.
Live smoke verified end-to-end:
- [Echo, Echo, Echo] happy path: all 3 completed, context accumulated
echoed_at_step_0/1/2 = "hello".
- [Echo, Echo, Fail] failure path: step 2 failed, compensation walked
back through step 1 then step 0; final status rolled_back with error
{compensate_from_idx: 1, reason: "step_failed:fail"}; ledger shows
echo/echo/fail with statuses compensated/compensated/failed.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
61 lines
2.3 KiB
Elixir
61 lines
2.3 KiB
Elixir
defmodule ArcadiaCloud.Repo.Migrations.CreateSagas do
|
|
use Ecto.Migration
|
|
|
|
def change do
|
|
create table(:saga_runs, primary_key: false) do
|
|
add :id, :binary_id, primary_key: true
|
|
add :deployment_id, :binary_id
|
|
add :kind, :string, null: false
|
|
add :status, :string, null: false, default: "pending"
|
|
add :step_modules, {:array, :string}, null: false
|
|
add :current_step_idx, :integer, null: false, default: 0
|
|
add :context, :map, null: false, default: %{}
|
|
add :started_at, :utc_datetime
|
|
add :completed_at, :utc_datetime
|
|
add :cancel_requested, :boolean, null: false, default: false
|
|
add :error, :map
|
|
add :triggered_by, :string
|
|
|
|
timestamps(type: :utc_datetime)
|
|
end
|
|
|
|
create index(:saga_runs, [:status])
|
|
create index(:saga_runs, [:kind])
|
|
create index(:saga_runs, [:deployment_id])
|
|
|
|
create table(:saga_step_results, primary_key: false) do
|
|
add :id, :binary_id, primary_key: true
|
|
add :saga_id, references(:saga_runs, type: :binary_id, on_delete: :delete_all), null: false
|
|
add :step_idx, :integer, null: false
|
|
add :step_name, :string, null: false
|
|
add :status, :string, null: false
|
|
add :output, :map, default: %{}, null: false
|
|
add :attempts, :integer, default: 0, null: false
|
|
add :started_at, :utc_datetime
|
|
add :completed_at, :utc_datetime
|
|
add :error, :map
|
|
end
|
|
|
|
create unique_index(:saga_step_results, [:saga_id, :step_idx])
|
|
create index(:saga_step_results, [:saga_id, :status])
|
|
|
|
# Desired-state for everything WE provisioned. Lets us detect drift
|
|
# between cloud_resources (actual) and this (intended).
|
|
create table(:cloud_provisioned, primary_key: false) do
|
|
add :id, :binary_id, primary_key: true
|
|
add :resource_id, references(:cloud_resources, type: :binary_id, on_delete: :delete_all),
|
|
null: false
|
|
add :spec, :map, null: false, default: %{}
|
|
add :spec_version, :integer, null: false, default: 1
|
|
add :provisioned_at, :utc_datetime, null: false
|
|
add :provisioned_by, :string
|
|
add :saga_id, references(:saga_runs, type: :binary_id, on_delete: :nilify_all)
|
|
|
|
timestamps(type: :utc_datetime)
|
|
end
|
|
|
|
create unique_index(:cloud_provisioned, [:resource_id])
|
|
create index(:cloud_provisioned, [:saga_id])
|
|
end
|
|
end
|