Wire a full tenant deployment as one orchestrated, compensating saga: mark → create droplet → wait active → register in inventory → link to deployment → point DNS → activate. A failure anywhere rolls the whole thing back — droplet destroyed, DNS reverted, deployment moved to cancelled. - New lifecycle state `provisioning`; deployments created via the provision path enter here and only reach `active` once the saga's ActivateDeployment step runs. - Four new steps: MarkDeploymentProvisioning (owns the deployment's failure state), LinkDeploymentResource, PointDeploymentDns, ActivateDeployment. - Provisioning.provision_deployment/2 assembles + starts the saga. - DeploymentController: POST /deployments with provision:true creates in `provisioning` and kicks the saga (202); GET /deployments/:id now returns the provisioning saga + per-step progress. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
253 lines
7.4 KiB
Elixir
253 lines
7.4 KiB
Elixir
defmodule ArcadiaCloud.Provisioning do
|
|
@moduledoc """
|
|
Context for saga orchestration — provisioning, suspension, offboarding,
|
|
updates, anything that wants compensation-based rollback over Oban.
|
|
|
|
Pattern: caller assembles a step list (per template or hand-rolled),
|
|
calls `start_saga/1`, the Runner Oban worker walks the steps,
|
|
persisting results and rolling back on failure.
|
|
"""
|
|
|
|
import Ecto.Query, warn: false
|
|
|
|
alias ArcadiaCloud.Repo
|
|
alias ArcadiaCloud.Provisioning.{SagaRun, SagaStepResult}
|
|
|
|
@doc """
|
|
Inserts a saga_runs row + enqueues the Runner job.
|
|
|
|
Required:
|
|
:kind — provision | suspend | offboard | update | rollback | test
|
|
:step_modules — ordered list of step module atoms or fully-qualified strings
|
|
:inputs — map of saga inputs (stored in context.__inputs__)
|
|
|
|
Optional:
|
|
:deployment_id — links the saga to a deployment (nil for skyai-internal)
|
|
:triggered_by — user_id or "system:<reason>"
|
|
"""
|
|
def start_saga(opts) when is_list(opts) do
|
|
start_saga(Map.new(opts))
|
|
end
|
|
|
|
def start_saga(%{} = attrs) do
|
|
step_modules = Enum.map(attrs[:step_modules] || [], &to_string/1)
|
|
inputs = attrs[:inputs] || %{}
|
|
|
|
saga_attrs = %{
|
|
kind: attrs[:kind],
|
|
step_modules: step_modules,
|
|
deployment_id: attrs[:deployment_id],
|
|
triggered_by: attrs[:triggered_by],
|
|
context: %{"__inputs__" => inputs}
|
|
}
|
|
|
|
with {:ok, saga} <- create_saga(saga_attrs),
|
|
{:ok, _job} <-
|
|
%{"saga_id" => saga.id}
|
|
|> ArcadiaCloud.Provisioning.Runner.new()
|
|
|> Oban.insert() do
|
|
{:ok, saga}
|
|
end
|
|
end
|
|
|
|
def create_saga(attrs) do
|
|
%SagaRun{}
|
|
|> SagaRun.changeset(attrs)
|
|
|> Repo.insert()
|
|
end
|
|
|
|
alias ArcadiaCloud.Provisioning.CloudProvisioned
|
|
|
|
@doc """
|
|
Records desired-state for a resource we provisioned. `spec` is a flat
|
|
map of field => expected value that drift detection later compares
|
|
against the live resource. Upserts on resource_id.
|
|
"""
|
|
def record_provisioned(resource_id, spec, opts \\ []) do
|
|
now = DateTime.utc_now() |> DateTime.truncate(:second)
|
|
|
|
attrs = %{
|
|
resource_id: resource_id,
|
|
spec: spec,
|
|
provisioned_at: now,
|
|
provisioned_by: opts[:provisioned_by] || "system",
|
|
saga_id: opts[:saga_id]
|
|
}
|
|
|
|
case Repo.get_by(CloudProvisioned, resource_id: resource_id) do
|
|
nil ->
|
|
%CloudProvisioned{}
|
|
|> CloudProvisioned.changeset(attrs)
|
|
|> Repo.insert()
|
|
|
|
existing ->
|
|
existing
|
|
|> CloudProvisioned.changeset(Map.put(attrs, :spec_version, existing.spec_version + 1))
|
|
|> Repo.update()
|
|
end
|
|
end
|
|
|
|
def get_provisioned(resource_id) do
|
|
Repo.get_by(CloudProvisioned, resource_id: resource_id)
|
|
end
|
|
|
|
@doc """
|
|
Starts a snapshot saga for a droplet. `droplet_provider_id` is the DO
|
|
numeric droplet id (string). Optional `:snapshot_label` and
|
|
`:triggered_by`.
|
|
"""
|
|
def snapshot_droplet(droplet_provider_id, opts \\ []) do
|
|
start_saga(%{
|
|
kind: "provision",
|
|
step_modules: [ArcadiaCloud.Provisioning.Steps.CreateDropletSnapshot],
|
|
inputs: %{
|
|
droplet_provider_id: to_string(droplet_provider_id),
|
|
snapshot_label: opts[:snapshot_label]
|
|
},
|
|
triggered_by: opts[:triggered_by] || "manual"
|
|
})
|
|
end
|
|
|
|
@doc """
|
|
Starts a droplet-provisioning saga: create → wait active → register
|
|
in inventory + record desired-state.
|
|
|
|
Required opts: :name, :region, :size, :image.
|
|
Optional: :tags, :ssh_keys, :triggered_by.
|
|
"""
|
|
def provision_droplet(opts) do
|
|
start_saga(%{
|
|
kind: "provision",
|
|
step_modules: [
|
|
ArcadiaCloud.Provisioning.Steps.CreateDroplet,
|
|
ArcadiaCloud.Provisioning.Steps.WaitDropletActive,
|
|
ArcadiaCloud.Provisioning.Steps.RegisterDroplet
|
|
],
|
|
inputs: %{
|
|
droplet_name: opts[:name],
|
|
droplet_region: opts[:region],
|
|
droplet_size: opts[:size],
|
|
droplet_image: opts[:image],
|
|
droplet_tags: opts[:tags] || [],
|
|
droplet_ssh_keys: opts[:ssh_keys] || []
|
|
},
|
|
triggered_by: opts[:triggered_by] || "manual"
|
|
})
|
|
end
|
|
|
|
@doc """
|
|
Starts a droplet-destroy saga. `droplet_provider_id` is the DO numeric
|
|
droplet id (string).
|
|
"""
|
|
def destroy_droplet(droplet_provider_id, opts \\ []) do
|
|
start_saga(%{
|
|
kind: "offboard",
|
|
step_modules: [ArcadiaCloud.Provisioning.Steps.DestroyDroplet],
|
|
inputs: %{droplet_provider_id: to_string(droplet_provider_id)},
|
|
triggered_by: opts[:triggered_by] || "manual"
|
|
})
|
|
end
|
|
|
|
alias ArcadiaCloud.Provisioning.Steps
|
|
|
|
@doc """
|
|
Assembles + starts the full deployment-provisioning choreography saga
|
|
for a deployment that was created in the `provisioning` state.
|
|
|
|
Steps: mark → create droplet → wait active → register in inventory →
|
|
link to deployment → point DNS → activate. A failure anywhere rolls
|
|
the whole thing back (droplet destroyed, DNS reverted, deployment
|
|
moved to `cancelled`).
|
|
|
|
Required opts: :size, :image. Optional: :region (falls back to the
|
|
deployment's region), :dns_domain, :dns_record_name (falls back to the
|
|
deployment slug), :triggered_by.
|
|
"""
|
|
def provision_deployment(deployment, opts \\ []) do
|
|
region = opts[:region] || deployment.region
|
|
|
|
inputs = %{
|
|
droplet_name: opts[:droplet_name] || "dep-#{deployment.slug}",
|
|
droplet_region: region,
|
|
droplet_size: opts[:size],
|
|
droplet_image: opts[:image],
|
|
droplet_tags: [
|
|
"deployment:#{deployment.id}",
|
|
"tenant:#{deployment.tenant_id}"
|
|
],
|
|
dns_domain: opts[:dns_domain],
|
|
dns_record_name: opts[:dns_record_name] || deployment.slug
|
|
}
|
|
|
|
start_saga(%{
|
|
kind: "provision",
|
|
deployment_id: deployment.id,
|
|
triggered_by: opts[:triggered_by] || "manual",
|
|
step_modules: [
|
|
Steps.MarkDeploymentProvisioning,
|
|
Steps.CreateDroplet,
|
|
Steps.WaitDropletActive,
|
|
Steps.RegisterDroplet,
|
|
Steps.LinkDeploymentResource,
|
|
Steps.PointDeploymentDns,
|
|
Steps.ActivateDeployment
|
|
],
|
|
inputs: inputs
|
|
})
|
|
end
|
|
|
|
def get_saga(id), do: Repo.get(SagaRun, id)
|
|
def get_saga!(id), do: Repo.get!(SagaRun, id)
|
|
|
|
def update_saga(%SagaRun{} = saga, attrs) do
|
|
saga
|
|
|> SagaRun.changeset(attrs)
|
|
|> Repo.update()
|
|
end
|
|
|
|
def list_sagas(opts \\ []) do
|
|
base = from(s in SagaRun, order_by: [desc: s.inserted_at])
|
|
|
|
base
|
|
|> maybe_filter(:status, opts[:status])
|
|
|> maybe_filter(:kind, opts[:kind])
|
|
|> maybe_filter(:deployment_id, opts[:deployment_id])
|
|
|> maybe_limit(opts[:limit])
|
|
|> Repo.all()
|
|
end
|
|
|
|
def list_step_results(saga_id) do
|
|
from(r in SagaStepResult,
|
|
where: r.saga_id == ^saga_id,
|
|
order_by: [asc: r.step_idx]
|
|
)
|
|
|> Repo.all()
|
|
end
|
|
|
|
def cancel_saga(%SagaRun{} = saga) do
|
|
saga
|
|
|> SagaRun.changeset(%{cancel_requested: true})
|
|
|> Repo.update()
|
|
end
|
|
|
|
def upsert_step_result(saga_id, step_idx, attrs) do
|
|
case Repo.get_by(SagaStepResult, saga_id: saga_id, step_idx: step_idx) do
|
|
nil ->
|
|
%SagaStepResult{}
|
|
|> SagaStepResult.changeset(Map.merge(attrs, %{saga_id: saga_id, step_idx: step_idx}))
|
|
|> Repo.insert()
|
|
|
|
existing ->
|
|
existing
|
|
|> SagaStepResult.changeset(attrs)
|
|
|> Repo.update()
|
|
end
|
|
end
|
|
|
|
defp maybe_filter(q, _f, nil), do: q
|
|
defp maybe_filter(q, field, value), do: from(s in q, where: field(s, ^field) == ^value)
|
|
|
|
defp maybe_limit(q, nil), do: q
|
|
defp maybe_limit(q, n), do: from(s in q, limit: ^n)
|
|
end
|