From 3274a4adabf967234c8363945b3d3dd33742e64f Mon Sep 17 00:00:00 2001 From: Giuliano Silvestro Date: Wed, 20 May 2026 07:24:30 +1000 Subject: [PATCH] Phase 2 saga engine: compensation-based runner + step contract MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The architectural spine of every write workflow phase 2+ — provisioning, suspension, offboarding, updates, rollback — all ride this engine. Each becomes a step list, not new orchestration code. Schemas: - saga_runs — kind, status (pending/running/completed/failed/ compensating/rolled_back), step_modules, current step idx, accumulating context (jsonb), cancel flag, error. - saga_step_results — per-step audit ledger: status (running/completed/ failed/compensated), output, attempts, timings, unique (saga_id, step_idx). - cloud_provisioned — desired-state for resources WE provisioned. spec + spec_version + saga_id; FK to cloud_resources. Phase 2's drift-detection diff lands here. Step contract (ArcadiaCloud.Provisioning.Step): - execute(state) -> {:ok, state} | {:error, reason} - compensate(state) -> :ok | {:error, _} (optional) - name() -> String.t() SagaState carries the live execution state — accumulating context, immutable inputs, current step_idx. Helpers: get_output/put_output (context r/w), get_input (inputs read-only). Runner (Oban worker, queue: provisioning, max_attempts: 1): - kick_off: pending -> running, run_step(0) - run_step: idempotent re-entry on saga.current_step_idx; persists step result + saga context after each step; recursive forward walk through the whole step list within one perform/1 call. - safe_execute: try/rescue/catch around module.execute so a raised exception triggers compensation rather than blowing up the worker. - start_compensation: status=compensating, walk from idx-1 down to 0, calling compensate/1 where it's exported; logs but doesn't halt on compensate failure (best-effort + audit log). - cancellation: checked between steps; cancel_requested=true -> trigger compensation from current idx. - crash recovery: max_attempts: 1 + run_step keyed on saga.current_step_idx means Oban requeue picks up at the right place, but full crash-resume infra is deferred to phase 2.5 (manual re-enqueue works for now). Two proof-of-concept steps (Steps.Echo, Steps.Fail) demonstrate the engine without any DO API exposure. First real DO write step lands in the next chunk. Provisioning context provides start_saga/1, list_sagas/1, list_step_results/1, cancel_saga/1, upsert_step_result/3. Live smoke verified end-to-end: - [Echo, Echo, Echo] happy path: all 3 completed, context accumulated echoed_at_step_0/1/2 = "hello". - [Echo, Echo, Fail] failure path: step 2 failed, compensation walked back through step 1 then step 0; final status rolled_back with error {compensate_from_idx: 1, reason: "step_failed:fail"}; ledger shows echo/echo/fail with statuses compensated/compensated/failed. Co-Authored-By: Claude Opus 4.7 (1M context) --- lib/arcadia_cloud/provisioning.ex | 112 +++++++++ .../provisioning/cloud_provisioned.ex | 29 +++ lib/arcadia_cloud/provisioning/runner.ex | 218 ++++++++++++++++++ lib/arcadia_cloud/provisioning/saga_run.ex | 38 +++ lib/arcadia_cloud/provisioning/saga_state.ex | 65 ++++++ .../provisioning/saga_step_result.ex | 33 +++ lib/arcadia_cloud/provisioning/step.ex | 29 +++ lib/arcadia_cloud/provisioning/steps/echo.ex | 28 +++ lib/arcadia_cloud/provisioning/steps/fail.ex | 16 ++ .../20260520120000_create_sagas.exs | 60 +++++ 10 files changed, 628 insertions(+) create mode 100644 lib/arcadia_cloud/provisioning.ex create mode 100644 lib/arcadia_cloud/provisioning/cloud_provisioned.ex create mode 100644 lib/arcadia_cloud/provisioning/runner.ex create mode 100644 lib/arcadia_cloud/provisioning/saga_run.ex create mode 100644 lib/arcadia_cloud/provisioning/saga_state.ex create mode 100644 lib/arcadia_cloud/provisioning/saga_step_result.ex create mode 100644 lib/arcadia_cloud/provisioning/step.ex create mode 100644 lib/arcadia_cloud/provisioning/steps/echo.ex create mode 100644 lib/arcadia_cloud/provisioning/steps/fail.ex create mode 100644 priv/repo/migrations/20260520120000_create_sagas.exs diff --git a/lib/arcadia_cloud/provisioning.ex b/lib/arcadia_cloud/provisioning.ex new file mode 100644 index 0000000..86c14db --- /dev/null +++ b/lib/arcadia_cloud/provisioning.ex @@ -0,0 +1,112 @@ +defmodule ArcadiaCloud.Provisioning do + @moduledoc """ + Context for saga orchestration — provisioning, suspension, offboarding, + updates, anything that wants compensation-based rollback over Oban. + + Pattern: caller assembles a step list (per template or hand-rolled), + calls `start_saga/1`, the Runner Oban worker walks the steps, + persisting results and rolling back on failure. + """ + + import Ecto.Query, warn: false + + alias ArcadiaCloud.Repo + alias ArcadiaCloud.Provisioning.{SagaRun, SagaStepResult} + + @doc """ + Inserts a saga_runs row + enqueues the Runner job. + + Required: + :kind — provision | suspend | offboard | update | rollback | test + :step_modules — ordered list of step module atoms or fully-qualified strings + :inputs — map of saga inputs (stored in context.__inputs__) + + Optional: + :deployment_id — links the saga to a deployment (nil for skyai-internal) + :triggered_by — user_id or "system:" + """ + def start_saga(opts) when is_list(opts) do + start_saga(Map.new(opts)) + end + + def start_saga(%{} = attrs) do + step_modules = Enum.map(attrs[:step_modules] || [], &to_string/1) + inputs = attrs[:inputs] || %{} + + saga_attrs = %{ + kind: attrs[:kind], + step_modules: step_modules, + deployment_id: attrs[:deployment_id], + triggered_by: attrs[:triggered_by], + context: %{"__inputs__" => inputs} + } + + with {:ok, saga} <- create_saga(saga_attrs), + {:ok, _job} <- + %{"saga_id" => saga.id} + |> ArcadiaCloud.Provisioning.Runner.new() + |> Oban.insert() do + {:ok, saga} + end + end + + def create_saga(attrs) do + %SagaRun{} + |> SagaRun.changeset(attrs) + |> Repo.insert() + end + + def get_saga(id), do: Repo.get(SagaRun, id) + def get_saga!(id), do: Repo.get!(SagaRun, id) + + def update_saga(%SagaRun{} = saga, attrs) do + saga + |> SagaRun.changeset(attrs) + |> Repo.update() + end + + def list_sagas(opts \\ []) do + base = from(s in SagaRun, order_by: [desc: s.inserted_at]) + + base + |> maybe_filter(:status, opts[:status]) + |> maybe_filter(:kind, opts[:kind]) + |> maybe_filter(:deployment_id, opts[:deployment_id]) + |> maybe_limit(opts[:limit]) + |> Repo.all() + end + + def list_step_results(saga_id) do + from(r in SagaStepResult, + where: r.saga_id == ^saga_id, + order_by: [asc: r.step_idx] + ) + |> Repo.all() + end + + def cancel_saga(%SagaRun{} = saga) do + saga + |> SagaRun.changeset(%{cancel_requested: true}) + |> Repo.update() + end + + def upsert_step_result(saga_id, step_idx, attrs) do + case Repo.get_by(SagaStepResult, saga_id: saga_id, step_idx: step_idx) do + nil -> + %SagaStepResult{} + |> SagaStepResult.changeset(Map.merge(attrs, %{saga_id: saga_id, step_idx: step_idx})) + |> Repo.insert() + + existing -> + existing + |> SagaStepResult.changeset(attrs) + |> Repo.update() + end + end + + defp maybe_filter(q, _f, nil), do: q + defp maybe_filter(q, field, value), do: from(s in q, where: field(s, ^field) == ^value) + + defp maybe_limit(q, nil), do: q + defp maybe_limit(q, n), do: from(s in q, limit: ^n) +end diff --git a/lib/arcadia_cloud/provisioning/cloud_provisioned.ex b/lib/arcadia_cloud/provisioning/cloud_provisioned.ex new file mode 100644 index 0000000..5df7b72 --- /dev/null +++ b/lib/arcadia_cloud/provisioning/cloud_provisioned.ex @@ -0,0 +1,29 @@ +defmodule ArcadiaCloud.Provisioning.CloudProvisioned do + use Ecto.Schema + import Ecto.Changeset + + @primary_key {:id, :binary_id, autogenerate: true} + @foreign_key_type :binary_id + + schema "cloud_provisioned" do + field :spec, :map, default: %{} + field :spec_version, :integer, default: 1 + field :provisioned_at, :utc_datetime + field :provisioned_by, :string + + belongs_to :resource, ArcadiaCloud.Cloud.CloudResource + belongs_to :saga, ArcadiaCloud.Provisioning.SagaRun + + timestamps(type: :utc_datetime) + end + + @required ~w(resource_id spec provisioned_at)a + @optional ~w(spec_version provisioned_by saga_id)a + + def changeset(prov, attrs) do + prov + |> cast(attrs, @required ++ @optional) + |> validate_required(@required) + |> unique_constraint(:resource_id) + end +end diff --git a/lib/arcadia_cloud/provisioning/runner.ex b/lib/arcadia_cloud/provisioning/runner.ex new file mode 100644 index 0000000..403c203 --- /dev/null +++ b/lib/arcadia_cloud/provisioning/runner.ex @@ -0,0 +1,218 @@ +defmodule ArcadiaCloud.Provisioning.Runner do + @moduledoc """ + Compensation-based saga executor. + + Walks `saga.step_modules` in order. Each step gets a `%SagaState{}` and + returns `{:ok, new_state}` or `{:error, reason}`. On failure, the runner + flips status to `compensating` and walks already-completed steps in + REVERSE, calling `compensate/1` on any that implement it. + + Crash safety: this worker is enqueued via Oban with `max_attempts: 1` + because retry semantics are step-level, not job-level. On crash, the + saga row stays in whatever status it was last persisted with; a + separate `resume_stuck_sagas` workflow (phase 2.5) handles cleanup. + For phase 2 first chunk, you'd re-enqueue manually. + + Cancellation: `Provisioning.cancel_saga/1` flips `cancel_requested=true`. + The runner checks this between steps and transitions to compensating + instead of advancing. + """ + + use Oban.Worker, queue: :provisioning, max_attempts: 1 + + require Logger + + alias ArcadiaCloud.Provisioning + alias ArcadiaCloud.Provisioning.{SagaRun, SagaState} + alias ArcadiaCloud.Repo + + @impl Oban.Worker + def perform(%Oban.Job{args: %{"saga_id" => saga_id}}) do + saga = Repo.get!(SagaRun, saga_id) + + case saga.status do + "pending" -> kick_off(saga) + "running" -> resume(saga) + "compensating" -> resume_compensation(saga) + _terminal -> :ok + end + end + + # ---- forward path --------------------------------------------------------- + + defp kick_off(saga) do + now = DateTime.utc_now() |> DateTime.truncate(:second) + + {:ok, saga} = Provisioning.update_saga(saga, %{status: "running", started_at: now}) + run_step(saga, 0) + end + + defp resume(saga), do: run_step(saga, saga.current_step_idx) + + defp run_step(saga, idx) do + cond do + saga.cancel_requested -> + Logger.info("[saga #{saga.id}] cancellation requested; compensating from step #{idx - 1}") + start_compensation(saga, idx - 1, "cancelled") + + idx >= length(saga.step_modules) -> + complete(saga) + + true -> + do_run_step(saga, idx) + end + end + + defp do_run_step(saga, idx) do + {:ok, saga} = Provisioning.update_saga(saga, %{current_step_idx: idx}) + + module_name = Enum.at(saga.step_modules, idx) + module = resolve_module(module_name) + started_at = DateTime.utc_now() |> DateTime.truncate(:second) + + {:ok, _} = + Provisioning.upsert_step_result(saga.id, idx, %{ + step_name: module.name(), + status: "running", + started_at: started_at, + attempts: 1 + }) + + state = SagaState.from(saga) |> Map.put(:step_idx, idx) + + case safe_execute(module, state) do + {:ok, %SagaState{} = new_state} -> + finished = DateTime.utc_now() |> DateTime.truncate(:second) + + {:ok, _} = + Provisioning.upsert_step_result(saga.id, idx, %{ + step_name: module.name(), + status: "completed", + output: extract_output(new_state), + completed_at: finished + }) + + {:ok, saga} = + Provisioning.update_saga(saga, %{context: new_state.context}) + + run_step(saga, idx + 1) + + {:error, reason} -> + finished = DateTime.utc_now() |> DateTime.truncate(:second) + + {:ok, _} = + Provisioning.upsert_step_result(saga.id, idx, %{ + step_name: module.name(), + status: "failed", + error: error_to_map(reason), + completed_at: finished + }) + + Logger.warning("[saga #{saga.id}] step #{idx} (#{module.name()}) failed: #{inspect(reason)}") + start_compensation(saga, idx - 1, "step_failed:#{module.name()}") + end + end + + defp safe_execute(module, state) do + try do + module.execute(state) + rescue + e -> {:error, {:exception, Exception.message(e)}} + catch + kind, payload -> {:error, {kind, payload}} + end + end + + # ---- compensation path ---------------------------------------------------- + + defp start_compensation(saga, from_idx, error_reason) do + error = %{"reason" => error_reason, "compensate_from_idx" => from_idx} + + {:ok, saga} = + Provisioning.update_saga(saga, %{ + status: "compensating", + current_step_idx: from_idx, + error: error + }) + + compensate_step(saga, from_idx) + end + + defp resume_compensation(saga), do: compensate_step(saga, saga.current_step_idx) + + defp compensate_step(saga, idx) when idx < 0 do + finished = DateTime.utc_now() |> DateTime.truncate(:second) + {:ok, _} = Provisioning.update_saga(saga, %{status: "rolled_back", completed_at: finished}) + :ok + end + + defp compensate_step(saga, idx) do + {:ok, saga} = Provisioning.update_saga(saga, %{current_step_idx: idx}) + + module_name = Enum.at(saga.step_modules, idx) + module = resolve_module(module_name) + state = SagaState.from(saga) |> Map.put(:step_idx, idx) + + if function_exported?(module, :compensate, 1) do + result = + try do + module.compensate(state) + rescue + e -> + Logger.error( + "[saga #{saga.id}] compensate raised at step #{idx}: #{Exception.message(e)}" + ) + {:error, {:exception, Exception.message(e)}} + end + + case result do + :ok -> + mark_compensated(saga.id, idx, module.name()) + + {:error, reason} -> + Logger.warning( + "[saga #{saga.id}] compensate failed at step #{idx} (#{module.name()}): #{inspect(reason)} — continuing" + ) + mark_compensated(saga.id, idx, module.name(), error_to_map(reason)) + end + else + mark_compensated(saga.id, idx, module.name()) + end + + compensate_step(saga, idx - 1) + end + + defp mark_compensated(saga_id, idx, name, err \\ nil) do + attrs = %{ + step_name: name, + status: "compensated", + completed_at: DateTime.utc_now() |> DateTime.truncate(:second) + } + + attrs = if err, do: Map.put(attrs, :error, err), else: attrs + {:ok, _} = Provisioning.upsert_step_result(saga_id, idx, attrs) + end + + # ---- completion ----------------------------------------------------------- + + defp complete(saga) do + finished = DateTime.utc_now() |> DateTime.truncate(:second) + {:ok, _} = Provisioning.update_saga(saga, %{status: "completed", completed_at: finished}) + :ok + end + + # ---- helpers -------------------------------------------------------------- + + defp resolve_module(name) when is_binary(name) do + String.to_existing_atom("Elixir." <> String.trim_leading(name, "Elixir.")) + end + + defp extract_output(%SagaState{context: ctx}) do + Map.drop(ctx, ["__inputs__"]) + end + + defp error_to_map({kind, payload}) when is_atom(kind), + do: %{"kind" => to_string(kind), "payload" => inspect(payload)} + + defp error_to_map(other), do: %{"reason" => inspect(other)} +end diff --git a/lib/arcadia_cloud/provisioning/saga_run.ex b/lib/arcadia_cloud/provisioning/saga_run.ex new file mode 100644 index 0000000..84cfd45 --- /dev/null +++ b/lib/arcadia_cloud/provisioning/saga_run.ex @@ -0,0 +1,38 @@ +defmodule ArcadiaCloud.Provisioning.SagaRun do + use Ecto.Schema + import Ecto.Changeset + + @primary_key {:id, :binary_id, autogenerate: true} + @foreign_key_type :binary_id + + @kinds ~w(provision suspend unsuspend offboard update rollback test) + @statuses ~w(pending running completed failed compensating rolled_back) + + schema "saga_runs" do + field :deployment_id, :binary_id + field :kind, :string + field :status, :string, default: "pending" + field :step_modules, {:array, :string}, default: [] + field :current_step_idx, :integer, default: 0 + field :context, :map, default: %{} + field :started_at, :utc_datetime + field :completed_at, :utc_datetime + field :cancel_requested, :boolean, default: false + field :error, :map + field :triggered_by, :string + + timestamps(type: :utc_datetime) + end + + @required ~w(kind step_modules)a + @optional ~w(deployment_id status current_step_idx context started_at + completed_at cancel_requested error triggered_by)a + + def changeset(saga, attrs) do + saga + |> cast(attrs, @required ++ @optional) + |> validate_required(@required) + |> validate_inclusion(:kind, @kinds) + |> validate_inclusion(:status, @statuses) + end +end diff --git a/lib/arcadia_cloud/provisioning/saga_state.ex b/lib/arcadia_cloud/provisioning/saga_state.ex new file mode 100644 index 0000000..8b9df5c --- /dev/null +++ b/lib/arcadia_cloud/provisioning/saga_state.ex @@ -0,0 +1,65 @@ +defmodule ArcadiaCloud.Provisioning.SagaState do + @moduledoc """ + Carries the live saga context across step calls. Steps receive this + struct, mutate it via the helpers, return a new value. + + `context` is the accumulating bag of created-resource IDs and other + per-saga state. Outputs from a step (e.g. `droplet_id` after + CreateDroplet) live here so later steps and compensation can find + what to act on. + + `inputs` are the immutable arguments the saga was started with + (e.g. `template_id`, `deployment_slug`). Steps may read these but + never write to them. + """ + + alias ArcadiaCloud.Provisioning.SagaRun + + defstruct [:saga_id, :saga, :context, :inputs, :step_idx] + + @type t :: %__MODULE__{ + saga_id: binary(), + saga: SagaRun.t() | nil, + context: map(), + inputs: map(), + step_idx: non_neg_integer() | nil + } + + def from(%SagaRun{} = saga) do + %__MODULE__{ + saga_id: saga.id, + saga: saga, + context: saga.context || %{}, + inputs: (saga.context || %{})["__inputs__"] || %{}, + step_idx: saga.current_step_idx + } + end + + @doc """ + Idempotency hook: was a value written by a prior attempt of this same step? + Returns the value or nil. + """ + def get_output(%__MODULE__{context: ctx}, key) when is_atom(key) do + get_output(%__MODULE__{context: ctx}, Atom.to_string(key)) + end + + def get_output(%__MODULE__{context: ctx}, key) when is_binary(key) do + Map.get(ctx, key) + end + + def put_output(%__MODULE__{} = state, key, value) when is_atom(key) do + put_output(state, Atom.to_string(key), value) + end + + def put_output(%__MODULE__{context: ctx} = state, key, value) when is_binary(key) do + %{state | context: Map.put(ctx, key, value)} + end + + def get_input(%__MODULE__{inputs: inputs}, key) when is_atom(key) do + Map.get(inputs, Atom.to_string(key)) + end + + def get_input(%__MODULE__{inputs: inputs}, key) when is_binary(key) do + Map.get(inputs, key) + end +end diff --git a/lib/arcadia_cloud/provisioning/saga_step_result.ex b/lib/arcadia_cloud/provisioning/saga_step_result.ex new file mode 100644 index 0000000..a0bfd1c --- /dev/null +++ b/lib/arcadia_cloud/provisioning/saga_step_result.ex @@ -0,0 +1,33 @@ +defmodule ArcadiaCloud.Provisioning.SagaStepResult do + use Ecto.Schema + import Ecto.Changeset + + @primary_key {:id, :binary_id, autogenerate: true} + @foreign_key_type :binary_id + + @statuses ~w(running completed failed compensated) + + schema "saga_step_results" do + field :step_idx, :integer + field :step_name, :string + field :status, :string + field :output, :map, default: %{} + field :attempts, :integer, default: 0 + field :started_at, :utc_datetime + field :completed_at, :utc_datetime + field :error, :map + + belongs_to :saga, ArcadiaCloud.Provisioning.SagaRun + end + + @required ~w(saga_id step_idx step_name status)a + @optional ~w(output attempts started_at completed_at error)a + + def changeset(result, attrs) do + result + |> cast(attrs, @required ++ @optional) + |> validate_required(@required) + |> validate_inclusion(:status, @statuses) + |> unique_constraint([:saga_id, :step_idx]) + end +end diff --git a/lib/arcadia_cloud/provisioning/step.ex b/lib/arcadia_cloud/provisioning/step.ex new file mode 100644 index 0000000..e27663d --- /dev/null +++ b/lib/arcadia_cloud/provisioning/step.ex @@ -0,0 +1,29 @@ +defmodule ArcadiaCloud.Provisioning.Step do + @moduledoc """ + Contract every saga step module implements. + + Steps MUST be: + 1. Idempotent — re-running produces the same effect; check context + for prior outputs (`SagaState.get_output/2`) before doing work. + 2. Compensable — has an undo or explicitly declares it doesn't need + one (compensate is optional; default = noop). + 3. Self-describing — writes its result into the saga context via + `SagaState.put_output/3` so later steps + compensation can find + what to act on. + + Failure modes: + {:ok, %SagaState{}} — step succeeded, advance. + {:error, reason} — step failed, runner triggers compensation + of all completed steps in reverse order. + """ + + alias ArcadiaCloud.Provisioning.SagaState + + @callback execute(SagaState.t()) :: {:ok, SagaState.t()} | {:error, term()} + + @callback compensate(SagaState.t()) :: :ok | {:error, term()} + + @callback name() :: String.t() + + @optional_callbacks [compensate: 1] +end diff --git a/lib/arcadia_cloud/provisioning/steps/echo.ex b/lib/arcadia_cloud/provisioning/steps/echo.ex new file mode 100644 index 0000000..a63d1bd --- /dev/null +++ b/lib/arcadia_cloud/provisioning/steps/echo.ex @@ -0,0 +1,28 @@ +defmodule ArcadiaCloud.Provisioning.Steps.Echo do + @moduledoc """ + Trivial proof-of-concept step. Reads `:echo_value` from saga inputs and + copies it to context under a step-specific key so subsequent steps can + see it. Used by engine smoke tests only — not for real workloads. + """ + + @behaviour ArcadiaCloud.Provisioning.Step + + alias ArcadiaCloud.Provisioning.SagaState + + @impl true + def name, do: "echo" + + @impl true + def execute(state) do + value = SagaState.get_input(state, :echo_value) || "default" + {:ok, SagaState.put_output(state, "echoed_at_step_#{state.step_idx}", value)} + end + + @impl true + def compensate(state) do + # Echo is reversible: we can simply forget what we wrote. No real + # side-effects to undo. + _ = state + :ok + end +end diff --git a/lib/arcadia_cloud/provisioning/steps/fail.ex b/lib/arcadia_cloud/provisioning/steps/fail.ex new file mode 100644 index 0000000..45925ce --- /dev/null +++ b/lib/arcadia_cloud/provisioning/steps/fail.ex @@ -0,0 +1,16 @@ +defmodule ArcadiaCloud.Provisioning.Steps.Fail do + @moduledoc """ + Always-fails step. Used by engine smoke tests to verify compensation + triggers correctly when a step errors. + """ + + @behaviour ArcadiaCloud.Provisioning.Step + + @impl true + def name, do: "fail" + + @impl true + def execute(_state) do + {:error, :intentional_failure} + end +end diff --git a/priv/repo/migrations/20260520120000_create_sagas.exs b/priv/repo/migrations/20260520120000_create_sagas.exs new file mode 100644 index 0000000..d105a4a --- /dev/null +++ b/priv/repo/migrations/20260520120000_create_sagas.exs @@ -0,0 +1,60 @@ +defmodule ArcadiaCloud.Repo.Migrations.CreateSagas do + use Ecto.Migration + + def change do + create table(:saga_runs, primary_key: false) do + add :id, :binary_id, primary_key: true + add :deployment_id, :binary_id + add :kind, :string, null: false + add :status, :string, null: false, default: "pending" + add :step_modules, {:array, :string}, null: false + add :current_step_idx, :integer, null: false, default: 0 + add :context, :map, null: false, default: %{} + add :started_at, :utc_datetime + add :completed_at, :utc_datetime + add :cancel_requested, :boolean, null: false, default: false + add :error, :map + add :triggered_by, :string + + timestamps(type: :utc_datetime) + end + + create index(:saga_runs, [:status]) + create index(:saga_runs, [:kind]) + create index(:saga_runs, [:deployment_id]) + + create table(:saga_step_results, primary_key: false) do + add :id, :binary_id, primary_key: true + add :saga_id, references(:saga_runs, type: :binary_id, on_delete: :delete_all), null: false + add :step_idx, :integer, null: false + add :step_name, :string, null: false + add :status, :string, null: false + add :output, :map, default: %{}, null: false + add :attempts, :integer, default: 0, null: false + add :started_at, :utc_datetime + add :completed_at, :utc_datetime + add :error, :map + end + + create unique_index(:saga_step_results, [:saga_id, :step_idx]) + create index(:saga_step_results, [:saga_id, :status]) + + # Desired-state for everything WE provisioned. Lets us detect drift + # between cloud_resources (actual) and this (intended). + create table(:cloud_provisioned, primary_key: false) do + add :id, :binary_id, primary_key: true + add :resource_id, references(:cloud_resources, type: :binary_id, on_delete: :delete_all), + null: false + add :spec, :map, null: false, default: %{} + add :spec_version, :integer, null: false, default: 1 + add :provisioned_at, :utc_datetime, null: false + add :provisioned_by, :string + add :saga_id, references(:saga_runs, type: :binary_id, on_delete: :nilify_all) + + timestamps(type: :utc_datetime) + end + + create unique_index(:cloud_provisioned, [:resource_id]) + create index(:cloud_provisioned, [:saga_id]) + end +end