defmodule ArcadiaCloud.Provisioning.Runner do @moduledoc """ Compensation-based saga executor. Walks `saga.step_modules` in order. Each step gets a `%SagaState{}` and returns `{:ok, new_state}` or `{:error, reason}`. On failure, the runner flips status to `compensating` and walks already-completed steps in REVERSE, calling `compensate/1` on any that implement it. Crash safety: this worker is enqueued via Oban with `max_attempts: 1` because retry semantics are step-level, not job-level. On crash, the saga row stays in whatever status it was last persisted with; a separate `resume_stuck_sagas` workflow (phase 2.5) handles cleanup. For phase 2 first chunk, you'd re-enqueue manually. Cancellation: `Provisioning.cancel_saga/1` flips `cancel_requested=true`. The runner checks this between steps and transitions to compensating instead of advancing. """ use Oban.Worker, queue: :provisioning, max_attempts: 1 require Logger alias ArcadiaCloud.Provisioning alias ArcadiaCloud.Provisioning.{SagaRun, SagaState} alias ArcadiaCloud.Repo @impl Oban.Worker def perform(%Oban.Job{args: %{"saga_id" => saga_id}}) do saga = Repo.get!(SagaRun, saga_id) case saga.status do "pending" -> kick_off(saga) "running" -> resume(saga) "compensating" -> resume_compensation(saga) _terminal -> :ok end end # ---- forward path --------------------------------------------------------- defp kick_off(saga) do now = DateTime.utc_now() |> DateTime.truncate(:second) {:ok, saga} = Provisioning.update_saga(saga, %{status: "running", started_at: now}) run_step(saga, 0) end defp resume(saga), do: run_step(saga, saga.current_step_idx) defp run_step(saga, idx) do cond do saga.cancel_requested -> Logger.info("[saga #{saga.id}] cancellation requested; compensating from step #{idx - 1}") start_compensation(saga, idx - 1, "cancelled") idx >= length(saga.step_modules) -> complete(saga) true -> do_run_step(saga, idx) end end defp do_run_step(saga, idx) do {:ok, saga} = Provisioning.update_saga(saga, %{current_step_idx: idx}) module_name = Enum.at(saga.step_modules, idx) module = resolve_module(module_name) started_at = DateTime.utc_now() |> DateTime.truncate(:second) {:ok, _} = Provisioning.upsert_step_result(saga.id, idx, %{ step_name: module.name(), status: "running", started_at: started_at, attempts: 1 }) state = SagaState.from(saga) |> Map.put(:step_idx, idx) case safe_execute(module, state) do {:ok, %SagaState{} = new_state} -> finished = DateTime.utc_now() |> DateTime.truncate(:second) {:ok, _} = Provisioning.upsert_step_result(saga.id, idx, %{ step_name: module.name(), status: "completed", output: extract_output(new_state), completed_at: finished }) {:ok, saga} = Provisioning.update_saga(saga, %{context: new_state.context}) run_step(saga, idx + 1) {:error, reason} -> finished = DateTime.utc_now() |> DateTime.truncate(:second) {:ok, _} = Provisioning.upsert_step_result(saga.id, idx, %{ step_name: module.name(), status: "failed", error: error_to_map(reason), completed_at: finished }) Logger.warning("[saga #{saga.id}] step #{idx} (#{module.name()}) failed: #{inspect(reason)}") start_compensation(saga, idx - 1, "step_failed:#{module.name()}") end end defp safe_execute(module, state) do try do module.execute(state) rescue e -> {:error, {:exception, Exception.message(e)}} catch kind, payload -> {:error, {kind, payload}} end end # ---- compensation path ---------------------------------------------------- defp start_compensation(saga, from_idx, error_reason) do error = %{"reason" => error_reason, "compensate_from_idx" => from_idx} {:ok, saga} = Provisioning.update_saga(saga, %{ status: "compensating", current_step_idx: from_idx, error: error }) compensate_step(saga, from_idx) end defp resume_compensation(saga), do: compensate_step(saga, saga.current_step_idx) defp compensate_step(saga, idx) when idx < 0 do finished = DateTime.utc_now() |> DateTime.truncate(:second) {:ok, _} = Provisioning.update_saga(saga, %{status: "rolled_back", completed_at: finished}) :ok end defp compensate_step(saga, idx) do {:ok, saga} = Provisioning.update_saga(saga, %{current_step_idx: idx}) module_name = Enum.at(saga.step_modules, idx) module = resolve_module(module_name) state = SagaState.from(saga) |> Map.put(:step_idx, idx) if function_exported?(module, :compensate, 1) do result = try do module.compensate(state) rescue e -> Logger.error( "[saga #{saga.id}] compensate raised at step #{idx}: #{Exception.message(e)}" ) {:error, {:exception, Exception.message(e)}} end case result do :ok -> mark_compensated(saga.id, idx, module.name()) {:error, reason} -> Logger.warning( "[saga #{saga.id}] compensate failed at step #{idx} (#{module.name()}): #{inspect(reason)} — continuing" ) mark_compensated(saga.id, idx, module.name(), error_to_map(reason)) end else mark_compensated(saga.id, idx, module.name()) end compensate_step(saga, idx - 1) end defp mark_compensated(saga_id, idx, name, err \\ nil) do attrs = %{ step_name: name, status: "compensated", completed_at: DateTime.utc_now() |> DateTime.truncate(:second) } attrs = if err, do: Map.put(attrs, :error, err), else: attrs {:ok, _} = Provisioning.upsert_step_result(saga_id, idx, attrs) end # ---- completion ----------------------------------------------------------- defp complete(saga) do finished = DateTime.utc_now() |> DateTime.truncate(:second) {:ok, _} = Provisioning.update_saga(saga, %{status: "completed", completed_at: finished}) :ok end # ---- helpers -------------------------------------------------------------- defp resolve_module(name) when is_binary(name) do String.to_existing_atom("Elixir." <> String.trim_leading(name, "Elixir.")) end defp extract_output(%SagaState{context: ctx}) do Map.drop(ctx, ["__inputs__"]) end defp error_to_map({kind, payload}) when is_atom(kind), do: %{"kind" => to_string(kind), "payload" => inspect(payload)} defp error_to_map(other), do: %{"reason" => inspect(other)} end