Phase 2: droplet create/destroy saga

The most load-bearing write workflow — droplet provisioning is the spine
of phase 4a deployment onboarding.

DigitalOcean.Client: create_droplet, get_droplet, list_droplets_by_tag,
destroy_droplet. list_paginated/3 now threads caller-supplied params
(opts[:params]) through pagination so tag-filtered listing works.

Four droplet saga steps:
- CreateDroplet — POST a droplet, tagged arcadia-saga-<saga8> +
  managed-by-arcadia-cloud. Idempotency: re-run checks context for
  droplet_id, then queries DO by the saga tag, so a crash between POST
  and context-save adopts the existing droplet. compensate destroys it.
- WaitDropletActive — polls get_droplet until status "active" (96x5s);
  records the public IP. No compensation (waiting has no side effect).
- RegisterDroplet — fetches the droplet, upserts it into cloud_resources
  (inventory consistent immediately, not at next 15-min sync) and writes
  cloud_provisioned desired-state {size_slug, region, image}. compensate
  removes the DB rows (the droplet itself is destroyed by CreateDroplet's
  compensate).
- DestroyDroplet — DELETE the droplet + mark its cloud_resources row
  deleted. Terminal/irreversible: compensate is a logged noop, per the
  saga design destroy-class steps don't roll back.

Provisioning helpers:
- provision_droplet/1 — [CreateDroplet, WaitDropletActive, RegisterDroplet]
- destroy_droplet/2   — [DestroyDroplet]

Live smoke verified end-to-end (full create + destroy on a real
s-1vcpu-512mb-10gb droplet in syd1):
- provision saga completed: droplet 572017320 created, reached active
  with public IP, registered into cloud_resources (status=active) +
  cloud_provisioned (spec recorded).
- destroy saga completed: cloud_resources row marked deleted; droplet
  confirmed 404 on DO afterward. Account back to its original 5
  droplets, zero leftover, ~1 cent total cost.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-20 13:31:47 +10:00
parent e3bcd3fc77
commit b9fc4f9cf3
6 changed files with 427 additions and 4 deletions

View File

@@ -32,6 +32,38 @@ defmodule ArcadiaCloud.DigitalOcean.Client do
list_paginated("/droplets/#{droplet_id}/snapshots", "snapshots", opts)
end
# ---- droplet lifecycle ----------------------------------------------------
@doc """
Create a droplet. `attrs` must include name + region + size + image;
optional ssh_keys, tags, backups, ipv6, user_data, vpc_uuid.
Returns {:ok, droplet} — droplet is created async, poll get_droplet/2
until status is "active".
"""
def create_droplet(attrs, opts \\ []) do
case request(:post, "/droplets", body: attrs, purpose: opts[:purpose] || "provisioning") do
{:ok, %{"droplet" => droplet}} -> {:ok, droplet}
other -> other
end
end
def get_droplet(droplet_id, opts \\ []) do
case request(:get, "/droplets/#{droplet_id}", purpose: opts[:purpose] || "provisioning") do
{:ok, %{"droplet" => droplet}} -> {:ok, droplet}
other -> other
end
end
@doc "Lists droplets filtered by a tag — used for saga idempotency recovery."
def list_droplets_by_tag(tag, opts \\ []) do
list_paginated("/droplets", "droplets",
Keyword.merge(opts, params: [tag_name: tag], purpose: opts[:purpose] || "provisioning"))
end
def destroy_droplet(droplet_id, opts \\ []) do
request(:delete, "/droplets/#{droplet_id}", purpose: opts[:purpose] || "provisioning")
end
# ---- write actions --------------------------------------------------------
@doc """
@@ -175,11 +207,12 @@ defmodule ArcadiaCloud.DigitalOcean.Client do
defp list_paginated(path, root_key, opts) do
purpose = opts[:purpose] || "sync_full"
do_paginate(path, root_key, purpose, [], 1)
extra_params = opts[:params] || []
do_paginate(path, root_key, purpose, extra_params, [], 1)
end
defp do_paginate(path, root_key, purpose, acc, page) do
params = [page: page, per_page: @page_size]
defp do_paginate(path, root_key, purpose, extra_params, acc, page) do
params = [page: page, per_page: @page_size] ++ extra_params
case request(:get, path, params: params, purpose: purpose) do
{:ok, %{} = body} ->
@@ -187,7 +220,7 @@ defmodule ArcadiaCloud.DigitalOcean.Client do
new_acc = acc ++ items
if has_next?(body) do
do_paginate(path, root_key, purpose, new_acc, page + 1)
do_paginate(path, root_key, purpose, extra_params, new_acc, page + 1)
else
{:ok, new_acc}
end

View File

@@ -108,6 +108,46 @@ defmodule ArcadiaCloud.Provisioning do
})
end
@doc """
Starts a droplet-provisioning saga: create → wait active → register
in inventory + record desired-state.
Required opts: :name, :region, :size, :image.
Optional: :tags, :ssh_keys, :triggered_by.
"""
def provision_droplet(opts) do
start_saga(%{
kind: "provision",
step_modules: [
ArcadiaCloud.Provisioning.Steps.CreateDroplet,
ArcadiaCloud.Provisioning.Steps.WaitDropletActive,
ArcadiaCloud.Provisioning.Steps.RegisterDroplet
],
inputs: %{
droplet_name: opts[:name],
droplet_region: opts[:region],
droplet_size: opts[:size],
droplet_image: opts[:image],
droplet_tags: opts[:tags] || [],
droplet_ssh_keys: opts[:ssh_keys] || []
},
triggered_by: opts[:triggered_by] || "manual"
})
end
@doc """
Starts a droplet-destroy saga. `droplet_provider_id` is the DO numeric
droplet id (string).
"""
def destroy_droplet(droplet_provider_id, opts \\ []) do
start_saga(%{
kind: "offboard",
step_modules: [ArcadiaCloud.Provisioning.Steps.DestroyDroplet],
inputs: %{droplet_provider_id: to_string(droplet_provider_id)},
triggered_by: opts[:triggered_by] || "manual"
})
end
def get_saga(id), do: Repo.get(SagaRun, id)
def get_saga!(id), do: Repo.get!(SagaRun, id)

View File

@@ -0,0 +1,120 @@
defmodule ArcadiaCloud.Provisioning.Steps.CreateDroplet do
@moduledoc """
Creates a DO droplet.
Saga inputs:
droplet_name — required
droplet_region — required (e.g. "syd1")
droplet_size — required (e.g. "s-1vcpu-1gb")
droplet_image — required (e.g. "ubuntu-24-04-x64")
droplet_tags — optional list of extra tags
droplet_ssh_keys — optional list of SSH key ids/fingerprints
Idempotency: every droplet is tagged `arcadia-saga-<saga8>`. On re-run
the step checks context for `droplet_id`, then queries DO for a droplet
carrying the saga tag — so a crash between POST and context-save adopts
the existing droplet instead of creating a second.
Compensation: destroys the droplet.
"""
@behaviour ArcadiaCloud.Provisioning.Step
alias ArcadiaCloud.DigitalOcean.Client
alias ArcadiaCloud.Provisioning.SagaState
@impl true
def name, do: "create_droplet"
@impl true
def execute(state) do
with {:ok, attrs} <- read_inputs(state) do
saga_tag = saga_tag(state)
cond do
SagaState.get_output(state, :droplet_id) ->
{:ok, state}
true ->
case find_by_saga_tag(saga_tag) do
{:ok, %{"id" => id}} ->
{:ok, record(state, id, attrs.name)}
:not_found ->
create(state, attrs, saga_tag)
{:error, reason} ->
{:error, reason}
end
end
end
end
@impl true
def compensate(state) do
case SagaState.get_output(state, :droplet_id) do
nil ->
:ok
droplet_id ->
case Client.destroy_droplet(droplet_id) do
{:ok, _} -> :ok
{:error, {:http, 404, _}} -> :ok
{:error, reason} -> {:error, reason}
end
end
end
# ---- internals ------------------------------------------------------------
defp create(state, attrs, saga_tag) do
body = %{
name: attrs.name,
region: attrs.region,
size: attrs.size,
image: attrs.image,
tags: [saga_tag, "managed-by-arcadia-cloud" | attrs.tags],
ssh_keys: attrs.ssh_keys
}
case Client.create_droplet(body) do
{:ok, %{"id" => id}} -> {:ok, record(state, id, attrs.name)}
{:error, reason} -> {:error, reason}
end
end
defp record(state, droplet_id, name) do
state
|> SagaState.put_output(:droplet_id, droplet_id)
|> SagaState.put_output(:droplet_name, name)
end
defp find_by_saga_tag(tag) do
case Client.list_droplets_by_tag(tag) do
{:ok, [droplet | _]} -> {:ok, droplet}
{:ok, []} -> :not_found
{:error, reason} -> {:error, reason}
end
end
defp saga_tag(state) do
"arcadia-saga-" <> (state.saga_id |> to_string() |> String.slice(0, 8))
end
defp read_inputs(state) do
attrs = %{
name: SagaState.get_input(state, :droplet_name),
region: SagaState.get_input(state, :droplet_region),
size: SagaState.get_input(state, :droplet_size),
image: SagaState.get_input(state, :droplet_image),
tags: SagaState.get_input(state, :droplet_tags) || [],
ssh_keys: SagaState.get_input(state, :droplet_ssh_keys) || []
}
if attrs.name && attrs.region && attrs.size && attrs.image do
{:ok, attrs}
else
{:error, :missing_droplet_inputs}
end
end
end

View File

@@ -0,0 +1,68 @@
defmodule ArcadiaCloud.Provisioning.Steps.DestroyDroplet do
@moduledoc """
Destroys a droplet and marks its cloud_resources row deleted.
Saga inputs:
droplet_provider_id — required; the DO numeric droplet id
This is a terminal, irreversible action. `compensate/1` is therefore a
noop with a log line — a destroyed droplet cannot be un-destroyed.
Per the saga design, destroy-class steps don't roll back; a saga that
needs to fail-safe should sequence DestroyDroplet last.
Idempotent: a droplet already gone (404) is treated as success.
"""
@behaviour ArcadiaCloud.Provisioning.Step
import Ecto.Query
require Logger
alias ArcadiaCloud.Cloud.CloudResource
alias ArcadiaCloud.DigitalOcean.Client
alias ArcadiaCloud.Provisioning.SagaState
alias ArcadiaCloud.Repo
@impl true
def name, do: "destroy_droplet"
@impl true
def execute(state) do
droplet_id = SagaState.get_input(state, :droplet_provider_id)
cond do
is_nil(droplet_id) ->
{:error, :missing_droplet_provider_id}
true ->
case Client.destroy_droplet(droplet_id) do
{:ok, _} ->
mark_resource_deleted(droplet_id)
{:ok, SagaState.put_output(state, :destroyed_droplet_id, droplet_id)}
{:error, {:http, 404, _}} ->
mark_resource_deleted(droplet_id)
{:ok, SagaState.put_output(state, :destroyed_droplet_id, droplet_id)}
{:error, reason} ->
{:error, reason}
end
end
end
@impl true
def compensate(state) do
Logger.warning(
"[saga #{state.saga_id}] DestroyDroplet cannot be compensated — droplet destruction is terminal"
)
:ok
end
defp mark_resource_deleted(provider_id) do
from(r in CloudResource,
where: r.provider == "digitalocean" and r.kind == "droplet" and r.provider_id == ^to_string(provider_id)
)
|> Repo.update_all(set: [deleted_at: DateTime.utc_now() |> DateTime.truncate(:second)])
end
end

View File

@@ -0,0 +1,103 @@
defmodule ArcadiaCloud.Provisioning.Steps.RegisterDroplet do
@moduledoc """
Makes arcadia-cloud's own DB reflect a freshly-provisioned droplet
without waiting for the next 15-minute sync:
1. fetch the droplet from DO
2. upsert it into cloud_resources (inventory immediately consistent)
3. record desired-state in cloud_provisioned (so drift detection has
a baseline)
Compensation: marks the cloud_resources row deleted and removes the
cloud_provisioned row. The droplet itself is destroyed by the
CreateDroplet step's compensate — this step only undoes DB rows.
"""
@behaviour ArcadiaCloud.Provisioning.Step
import Ecto.Query
alias ArcadiaCloud.{Cloud, Provisioning, Repo}
alias ArcadiaCloud.Cloud.CloudResource
alias ArcadiaCloud.DigitalOcean.Client
alias ArcadiaCloud.Provisioning.{CloudProvisioned, SagaState}
@impl true
def name, do: "register_droplet"
@impl true
def execute(state) do
case SagaState.get_output(state, :droplet_id) do
nil ->
{:error, :no_droplet_id_in_context}
droplet_id ->
with {:ok, droplet} <- Client.get_droplet(droplet_id),
{:ok, resource} <- Cloud.upsert_resource(normalize(droplet)),
{:ok, _prov} <- record_provisioned(state, droplet, resource) do
{:ok, SagaState.put_output(state, :cloud_resource_id, resource.id)}
end
end
end
@impl true
def compensate(state) do
case SagaState.get_output(state, :cloud_resource_id) do
nil ->
:ok
resource_id ->
Repo.delete_all(from(p in CloudProvisioned, where: p.resource_id == ^resource_id))
from(r in CloudResource, where: r.id == ^resource_id)
|> Repo.update_all(set: [deleted_at: DateTime.utc_now() |> DateTime.truncate(:second)])
:ok
end
end
# ---- internals ------------------------------------------------------------
defp record_provisioned(state, droplet, resource) do
Provisioning.record_provisioned(
resource.id,
%{
"size_slug" => droplet["size_slug"],
"region" => get_in(droplet, ["region", "slug"]),
"image" => get_in(droplet, ["image", "slug"])
},
provisioned_by: "saga:#{state.saga_id}",
saga_id: state.saga_id
)
end
defp normalize(d) do
now = DateTime.utc_now() |> DateTime.truncate(:second)
%{
provider: "digitalocean",
provider_id: to_string(d["id"]),
kind: "droplet",
name: d["name"],
region: get_in(d, ["region", "slug"]),
status: normalize_status(d["status"]),
size_slug: d["size_slug"],
tags: d["tags"] || [],
attrs: %{
memory_mb: d["memory"],
vcpus: d["vcpus"],
disk_gb: d["disk"],
networks: d["networks"],
do_created_at: d["created_at"]
},
first_seen_at: now,
last_seen_at: now
}
end
defp normalize_status("active"), do: "active"
defp normalize_status("off"), do: "off"
defp normalize_status("new"), do: "provisioning"
defp normalize_status(other) when is_binary(other), do: other
defp normalize_status(_), do: "unknown"
end

View File

@@ -0,0 +1,59 @@
defmodule ArcadiaCloud.Provisioning.Steps.WaitDropletActive do
@moduledoc """
Polls a droplet (created by a prior CreateDroplet step) until its
status is "active". Reads `droplet_id` from saga context.
No compensation — waiting has no side effect to undo. If the saga
rolls back, the prior CreateDroplet step's compensate destroys the
droplet regardless of whether it ever reached active.
"""
@behaviour ArcadiaCloud.Provisioning.Step
alias ArcadiaCloud.DigitalOcean.Client
alias ArcadiaCloud.Provisioning.SagaState
@poll_interval_ms 5_000
@poll_max_attempts 96
@impl true
def name, do: "wait_droplet_active"
@impl true
def execute(state) do
case SagaState.get_output(state, :droplet_id) do
nil -> {:error, :no_droplet_id_in_context}
droplet_id -> poll(state, droplet_id, 1)
end
end
defp poll(_state, _droplet_id, attempt) when attempt > @poll_max_attempts do
{:error, :droplet_active_timeout}
end
defp poll(state, droplet_id, attempt) do
case Client.get_droplet(droplet_id) do
{:ok, %{"status" => "active"} = droplet} ->
public_ip = extract_public_ip(droplet)
{:ok, SagaState.put_output(state, :droplet_public_ip, public_ip)}
{:ok, %{"status" => status}} when status in ["new", "off"] ->
Process.sleep(@poll_interval_ms)
poll(state, droplet_id, attempt + 1)
{:ok, %{"status" => other}} ->
{:error, {:unexpected_droplet_status, other}}
{:error, reason} ->
{:error, reason}
end
end
defp extract_public_ip(droplet) do
droplet
|> get_in(["networks", "v4"])
|> List.wrap()
|> Enum.find(%{}, &(&1["type"] == "public"))
|> Map.get("ip_address")
end
end