defmodule ArcadiaCloud.Provisioning do @moduledoc """ Context for saga orchestration — provisioning, suspension, offboarding, updates, anything that wants compensation-based rollback over Oban. Pattern: caller assembles a step list (per template or hand-rolled), calls `start_saga/1`, the Runner Oban worker walks the steps, persisting results and rolling back on failure. """ import Ecto.Query, warn: false alias ArcadiaCloud.Repo alias ArcadiaCloud.Provisioning.{SagaRun, SagaStepResult} @doc """ Inserts a saga_runs row + enqueues the Runner job. Required: :kind — provision | suspend | offboard | update | rollback | test :step_modules — ordered list of step module atoms or fully-qualified strings :inputs — map of saga inputs (stored in context.__inputs__) Optional: :deployment_id — links the saga to a deployment (nil for skyai-internal) :triggered_by — user_id or "system:" """ def start_saga(opts) when is_list(opts) do start_saga(Map.new(opts)) end def start_saga(%{} = attrs) do step_modules = Enum.map(attrs[:step_modules] || [], &to_string/1) inputs = attrs[:inputs] || %{} saga_attrs = %{ kind: attrs[:kind], step_modules: step_modules, deployment_id: attrs[:deployment_id], triggered_by: attrs[:triggered_by], context: %{"__inputs__" => inputs} } with {:ok, saga} <- create_saga(saga_attrs), {:ok, _job} <- %{"saga_id" => saga.id} |> ArcadiaCloud.Provisioning.Runner.new() |> Oban.insert() do {:ok, saga} end end def create_saga(attrs) do %SagaRun{} |> SagaRun.changeset(attrs) |> Repo.insert() end alias ArcadiaCloud.Provisioning.CloudProvisioned @doc """ Records desired-state for a resource we provisioned. `spec` is a flat map of field => expected value that drift detection later compares against the live resource. Upserts on resource_id. """ def record_provisioned(resource_id, spec, opts \\ []) do now = DateTime.utc_now() |> DateTime.truncate(:second) attrs = %{ resource_id: resource_id, spec: spec, provisioned_at: now, provisioned_by: opts[:provisioned_by] || "system", saga_id: opts[:saga_id] } case Repo.get_by(CloudProvisioned, resource_id: resource_id) do nil -> %CloudProvisioned{} |> CloudProvisioned.changeset(attrs) |> Repo.insert() existing -> existing |> CloudProvisioned.changeset(Map.put(attrs, :spec_version, existing.spec_version + 1)) |> Repo.update() end end def get_provisioned(resource_id) do Repo.get_by(CloudProvisioned, resource_id: resource_id) end @doc """ Starts a snapshot saga for a droplet. `droplet_provider_id` is the DO numeric droplet id (string). Optional `:snapshot_label` and `:triggered_by`. """ def snapshot_droplet(droplet_provider_id, opts \\ []) do start_saga(%{ kind: "provision", step_modules: [ArcadiaCloud.Provisioning.Steps.CreateDropletSnapshot], inputs: %{ droplet_provider_id: to_string(droplet_provider_id), snapshot_label: opts[:snapshot_label] }, triggered_by: opts[:triggered_by] || "manual" }) end @doc """ Starts a droplet-provisioning saga: create → wait active → register in inventory + record desired-state. Required opts: :name, :region, :size, :image. Optional: :tags, :ssh_keys, :triggered_by. """ def provision_droplet(opts) do start_saga(%{ kind: "provision", step_modules: [ ArcadiaCloud.Provisioning.Steps.CreateDroplet, ArcadiaCloud.Provisioning.Steps.WaitDropletActive, ArcadiaCloud.Provisioning.Steps.RegisterDroplet ], inputs: %{ droplet_name: opts[:name], droplet_region: opts[:region], droplet_size: opts[:size], droplet_image: opts[:image], droplet_tags: opts[:tags] || [], droplet_ssh_keys: opts[:ssh_keys] || [] }, triggered_by: opts[:triggered_by] || "manual" }) end @doc """ Starts a droplet-destroy saga. `droplet_provider_id` is the DO numeric droplet id (string). """ def destroy_droplet(droplet_provider_id, opts \\ []) do start_saga(%{ kind: "offboard", step_modules: [ArcadiaCloud.Provisioning.Steps.DestroyDroplet], inputs: %{droplet_provider_id: to_string(droplet_provider_id)}, triggered_by: opts[:triggered_by] || "manual" }) end alias ArcadiaCloud.Provisioning.Steps @doc """ Assembles + starts the full deployment-provisioning choreography saga for a deployment that was created in the `provisioning` state. Steps: mark → create droplet → wait active → register in inventory → link to deployment → point DNS → activate. A failure anywhere rolls the whole thing back (droplet destroyed, DNS reverted, deployment moved to `cancelled`). Required opts: :size, :image. Optional: :region (falls back to the deployment's region), :dns_domain, :dns_record_name (falls back to the deployment slug), :triggered_by. """ def provision_deployment(deployment, opts \\ []) do region = opts[:region] || deployment.region inputs = %{ droplet_name: opts[:droplet_name] || "dep-#{deployment.slug}", droplet_region: region, droplet_size: opts[:size], droplet_image: opts[:image], droplet_tags: [ "deployment:#{deployment.id}", "tenant:#{deployment.tenant_id}" ], dns_domain: opts[:dns_domain], dns_record_name: opts[:dns_record_name] || deployment.slug } start_saga(%{ kind: "provision", deployment_id: deployment.id, triggered_by: opts[:triggered_by] || "manual", step_modules: [ Steps.MarkDeploymentProvisioning, Steps.CreateDroplet, Steps.WaitDropletActive, Steps.RegisterDroplet, Steps.LinkDeploymentResource, Steps.PointDeploymentDns, Steps.ActivateDeployment ], inputs: inputs }) end def get_saga(id), do: Repo.get(SagaRun, id) def get_saga!(id), do: Repo.get!(SagaRun, id) def update_saga(%SagaRun{} = saga, attrs) do saga |> SagaRun.changeset(attrs) |> Repo.update() end def list_sagas(opts \\ []) do base = from(s in SagaRun, order_by: [desc: s.inserted_at]) base |> maybe_filter(:status, opts[:status]) |> maybe_filter(:kind, opts[:kind]) |> maybe_filter(:deployment_id, opts[:deployment_id]) |> maybe_limit(opts[:limit]) |> Repo.all() end def list_step_results(saga_id) do from(r in SagaStepResult, where: r.saga_id == ^saga_id, order_by: [asc: r.step_idx] ) |> Repo.all() end def cancel_saga(%SagaRun{} = saga) do saga |> SagaRun.changeset(%{cancel_requested: true}) |> Repo.update() end def upsert_step_result(saga_id, step_idx, attrs) do case Repo.get_by(SagaStepResult, saga_id: saga_id, step_idx: step_idx) do nil -> %SagaStepResult{} |> SagaStepResult.changeset(Map.merge(attrs, %{saga_id: saga_id, step_idx: step_idx})) |> Repo.insert() existing -> existing |> SagaStepResult.changeset(attrs) |> Repo.update() end end defp maybe_filter(q, _f, nil), do: q defp maybe_filter(q, field, value), do: from(s in q, where: field(s, ^field) == ^value) defp maybe_limit(q, nil), do: q defp maybe_limit(q, n), do: from(s in q, limit: ^n) end