diff --git a/lib/arcadia_cloud/digital_ocean/client.ex b/lib/arcadia_cloud/digital_ocean/client.ex index 2a7bfc0..04a1d07 100644 --- a/lib/arcadia_cloud/digital_ocean/client.ex +++ b/lib/arcadia_cloud/digital_ocean/client.ex @@ -32,6 +32,38 @@ defmodule ArcadiaCloud.DigitalOcean.Client do list_paginated("/droplets/#{droplet_id}/snapshots", "snapshots", opts) end + # ---- droplet lifecycle ---------------------------------------------------- + + @doc """ + Create a droplet. `attrs` must include name + region + size + image; + optional ssh_keys, tags, backups, ipv6, user_data, vpc_uuid. + Returns {:ok, droplet} — droplet is created async, poll get_droplet/2 + until status is "active". + """ + def create_droplet(attrs, opts \\ []) do + case request(:post, "/droplets", body: attrs, purpose: opts[:purpose] || "provisioning") do + {:ok, %{"droplet" => droplet}} -> {:ok, droplet} + other -> other + end + end + + def get_droplet(droplet_id, opts \\ []) do + case request(:get, "/droplets/#{droplet_id}", purpose: opts[:purpose] || "provisioning") do + {:ok, %{"droplet" => droplet}} -> {:ok, droplet} + other -> other + end + end + + @doc "Lists droplets filtered by a tag — used for saga idempotency recovery." + def list_droplets_by_tag(tag, opts \\ []) do + list_paginated("/droplets", "droplets", + Keyword.merge(opts, params: [tag_name: tag], purpose: opts[:purpose] || "provisioning")) + end + + def destroy_droplet(droplet_id, opts \\ []) do + request(:delete, "/droplets/#{droplet_id}", purpose: opts[:purpose] || "provisioning") + end + # ---- write actions -------------------------------------------------------- @doc """ @@ -175,11 +207,12 @@ defmodule ArcadiaCloud.DigitalOcean.Client do defp list_paginated(path, root_key, opts) do purpose = opts[:purpose] || "sync_full" - do_paginate(path, root_key, purpose, [], 1) + extra_params = opts[:params] || [] + do_paginate(path, root_key, purpose, extra_params, [], 1) end - defp do_paginate(path, root_key, purpose, acc, page) do - params = [page: page, per_page: @page_size] + defp do_paginate(path, root_key, purpose, extra_params, acc, page) do + params = [page: page, per_page: @page_size] ++ extra_params case request(:get, path, params: params, purpose: purpose) do {:ok, %{} = body} -> @@ -187,7 +220,7 @@ defmodule ArcadiaCloud.DigitalOcean.Client do new_acc = acc ++ items if has_next?(body) do - do_paginate(path, root_key, purpose, new_acc, page + 1) + do_paginate(path, root_key, purpose, extra_params, new_acc, page + 1) else {:ok, new_acc} end diff --git a/lib/arcadia_cloud/provisioning.ex b/lib/arcadia_cloud/provisioning.ex index 8c53ca0..9d31b8a 100644 --- a/lib/arcadia_cloud/provisioning.ex +++ b/lib/arcadia_cloud/provisioning.ex @@ -108,6 +108,46 @@ defmodule ArcadiaCloud.Provisioning do }) end + @doc """ + Starts a droplet-provisioning saga: create → wait active → register + in inventory + record desired-state. + + Required opts: :name, :region, :size, :image. + Optional: :tags, :ssh_keys, :triggered_by. + """ + def provision_droplet(opts) do + start_saga(%{ + kind: "provision", + step_modules: [ + ArcadiaCloud.Provisioning.Steps.CreateDroplet, + ArcadiaCloud.Provisioning.Steps.WaitDropletActive, + ArcadiaCloud.Provisioning.Steps.RegisterDroplet + ], + inputs: %{ + droplet_name: opts[:name], + droplet_region: opts[:region], + droplet_size: opts[:size], + droplet_image: opts[:image], + droplet_tags: opts[:tags] || [], + droplet_ssh_keys: opts[:ssh_keys] || [] + }, + triggered_by: opts[:triggered_by] || "manual" + }) + end + + @doc """ + Starts a droplet-destroy saga. `droplet_provider_id` is the DO numeric + droplet id (string). + """ + def destroy_droplet(droplet_provider_id, opts \\ []) do + start_saga(%{ + kind: "offboard", + step_modules: [ArcadiaCloud.Provisioning.Steps.DestroyDroplet], + inputs: %{droplet_provider_id: to_string(droplet_provider_id)}, + triggered_by: opts[:triggered_by] || "manual" + }) + end + def get_saga(id), do: Repo.get(SagaRun, id) def get_saga!(id), do: Repo.get!(SagaRun, id) diff --git a/lib/arcadia_cloud/provisioning/steps/create_droplet.ex b/lib/arcadia_cloud/provisioning/steps/create_droplet.ex new file mode 100644 index 0000000..098ff2f --- /dev/null +++ b/lib/arcadia_cloud/provisioning/steps/create_droplet.ex @@ -0,0 +1,120 @@ +defmodule ArcadiaCloud.Provisioning.Steps.CreateDroplet do + @moduledoc """ + Creates a DO droplet. + + Saga inputs: + droplet_name — required + droplet_region — required (e.g. "syd1") + droplet_size — required (e.g. "s-1vcpu-1gb") + droplet_image — required (e.g. "ubuntu-24-04-x64") + droplet_tags — optional list of extra tags + droplet_ssh_keys — optional list of SSH key ids/fingerprints + + Idempotency: every droplet is tagged `arcadia-saga-`. On re-run + the step checks context for `droplet_id`, then queries DO for a droplet + carrying the saga tag — so a crash between POST and context-save adopts + the existing droplet instead of creating a second. + + Compensation: destroys the droplet. + """ + + @behaviour ArcadiaCloud.Provisioning.Step + + alias ArcadiaCloud.DigitalOcean.Client + alias ArcadiaCloud.Provisioning.SagaState + + @impl true + def name, do: "create_droplet" + + @impl true + def execute(state) do + with {:ok, attrs} <- read_inputs(state) do + saga_tag = saga_tag(state) + + cond do + SagaState.get_output(state, :droplet_id) -> + {:ok, state} + + true -> + case find_by_saga_tag(saga_tag) do + {:ok, %{"id" => id}} -> + {:ok, record(state, id, attrs.name)} + + :not_found -> + create(state, attrs, saga_tag) + + {:error, reason} -> + {:error, reason} + end + end + end + end + + @impl true + def compensate(state) do + case SagaState.get_output(state, :droplet_id) do + nil -> + :ok + + droplet_id -> + case Client.destroy_droplet(droplet_id) do + {:ok, _} -> :ok + {:error, {:http, 404, _}} -> :ok + {:error, reason} -> {:error, reason} + end + end + end + + # ---- internals ------------------------------------------------------------ + + defp create(state, attrs, saga_tag) do + body = %{ + name: attrs.name, + region: attrs.region, + size: attrs.size, + image: attrs.image, + tags: [saga_tag, "managed-by-arcadia-cloud" | attrs.tags], + ssh_keys: attrs.ssh_keys + } + + case Client.create_droplet(body) do + {:ok, %{"id" => id}} -> {:ok, record(state, id, attrs.name)} + {:error, reason} -> {:error, reason} + end + end + + defp record(state, droplet_id, name) do + state + |> SagaState.put_output(:droplet_id, droplet_id) + |> SagaState.put_output(:droplet_name, name) + end + + defp find_by_saga_tag(tag) do + case Client.list_droplets_by_tag(tag) do + {:ok, [droplet | _]} -> {:ok, droplet} + {:ok, []} -> :not_found + {:error, reason} -> {:error, reason} + end + end + + defp saga_tag(state) do + "arcadia-saga-" <> (state.saga_id |> to_string() |> String.slice(0, 8)) + end + + defp read_inputs(state) do + attrs = %{ + name: SagaState.get_input(state, :droplet_name), + region: SagaState.get_input(state, :droplet_region), + size: SagaState.get_input(state, :droplet_size), + image: SagaState.get_input(state, :droplet_image), + tags: SagaState.get_input(state, :droplet_tags) || [], + ssh_keys: SagaState.get_input(state, :droplet_ssh_keys) || [] + } + + if attrs.name && attrs.region && attrs.size && attrs.image do + {:ok, attrs} + else + {:error, :missing_droplet_inputs} + end + end +end diff --git a/lib/arcadia_cloud/provisioning/steps/destroy_droplet.ex b/lib/arcadia_cloud/provisioning/steps/destroy_droplet.ex new file mode 100644 index 0000000..27cd798 --- /dev/null +++ b/lib/arcadia_cloud/provisioning/steps/destroy_droplet.ex @@ -0,0 +1,68 @@ +defmodule ArcadiaCloud.Provisioning.Steps.DestroyDroplet do + @moduledoc """ + Destroys a droplet and marks its cloud_resources row deleted. + + Saga inputs: + droplet_provider_id — required; the DO numeric droplet id + + This is a terminal, irreversible action. `compensate/1` is therefore a + noop with a log line — a destroyed droplet cannot be un-destroyed. + Per the saga design, destroy-class steps don't roll back; a saga that + needs to fail-safe should sequence DestroyDroplet last. + + Idempotent: a droplet already gone (404) is treated as success. + """ + + @behaviour ArcadiaCloud.Provisioning.Step + + import Ecto.Query + require Logger + + alias ArcadiaCloud.Cloud.CloudResource + alias ArcadiaCloud.DigitalOcean.Client + alias ArcadiaCloud.Provisioning.SagaState + alias ArcadiaCloud.Repo + + @impl true + def name, do: "destroy_droplet" + + @impl true + def execute(state) do + droplet_id = SagaState.get_input(state, :droplet_provider_id) + + cond do + is_nil(droplet_id) -> + {:error, :missing_droplet_provider_id} + + true -> + case Client.destroy_droplet(droplet_id) do + {:ok, _} -> + mark_resource_deleted(droplet_id) + {:ok, SagaState.put_output(state, :destroyed_droplet_id, droplet_id)} + + {:error, {:http, 404, _}} -> + mark_resource_deleted(droplet_id) + {:ok, SagaState.put_output(state, :destroyed_droplet_id, droplet_id)} + + {:error, reason} -> + {:error, reason} + end + end + end + + @impl true + def compensate(state) do + Logger.warning( + "[saga #{state.saga_id}] DestroyDroplet cannot be compensated — droplet destruction is terminal" + ) + + :ok + end + + defp mark_resource_deleted(provider_id) do + from(r in CloudResource, + where: r.provider == "digitalocean" and r.kind == "droplet" and r.provider_id == ^to_string(provider_id) + ) + |> Repo.update_all(set: [deleted_at: DateTime.utc_now() |> DateTime.truncate(:second)]) + end +end diff --git a/lib/arcadia_cloud/provisioning/steps/register_droplet.ex b/lib/arcadia_cloud/provisioning/steps/register_droplet.ex new file mode 100644 index 0000000..8bbff37 --- /dev/null +++ b/lib/arcadia_cloud/provisioning/steps/register_droplet.ex @@ -0,0 +1,103 @@ +defmodule ArcadiaCloud.Provisioning.Steps.RegisterDroplet do + @moduledoc """ + Makes arcadia-cloud's own DB reflect a freshly-provisioned droplet + without waiting for the next 15-minute sync: + + 1. fetch the droplet from DO + 2. upsert it into cloud_resources (inventory immediately consistent) + 3. record desired-state in cloud_provisioned (so drift detection has + a baseline) + + Compensation: marks the cloud_resources row deleted and removes the + cloud_provisioned row. The droplet itself is destroyed by the + CreateDroplet step's compensate — this step only undoes DB rows. + """ + + @behaviour ArcadiaCloud.Provisioning.Step + + import Ecto.Query + + alias ArcadiaCloud.{Cloud, Provisioning, Repo} + alias ArcadiaCloud.Cloud.CloudResource + alias ArcadiaCloud.DigitalOcean.Client + alias ArcadiaCloud.Provisioning.{CloudProvisioned, SagaState} + + @impl true + def name, do: "register_droplet" + + @impl true + def execute(state) do + case SagaState.get_output(state, :droplet_id) do + nil -> + {:error, :no_droplet_id_in_context} + + droplet_id -> + with {:ok, droplet} <- Client.get_droplet(droplet_id), + {:ok, resource} <- Cloud.upsert_resource(normalize(droplet)), + {:ok, _prov} <- record_provisioned(state, droplet, resource) do + {:ok, SagaState.put_output(state, :cloud_resource_id, resource.id)} + end + end + end + + @impl true + def compensate(state) do + case SagaState.get_output(state, :cloud_resource_id) do + nil -> + :ok + + resource_id -> + Repo.delete_all(from(p in CloudProvisioned, where: p.resource_id == ^resource_id)) + + from(r in CloudResource, where: r.id == ^resource_id) + |> Repo.update_all(set: [deleted_at: DateTime.utc_now() |> DateTime.truncate(:second)]) + + :ok + end + end + + # ---- internals ------------------------------------------------------------ + + defp record_provisioned(state, droplet, resource) do + Provisioning.record_provisioned( + resource.id, + %{ + "size_slug" => droplet["size_slug"], + "region" => get_in(droplet, ["region", "slug"]), + "image" => get_in(droplet, ["image", "slug"]) + }, + provisioned_by: "saga:#{state.saga_id}", + saga_id: state.saga_id + ) + end + + defp normalize(d) do + now = DateTime.utc_now() |> DateTime.truncate(:second) + + %{ + provider: "digitalocean", + provider_id: to_string(d["id"]), + kind: "droplet", + name: d["name"], + region: get_in(d, ["region", "slug"]), + status: normalize_status(d["status"]), + size_slug: d["size_slug"], + tags: d["tags"] || [], + attrs: %{ + memory_mb: d["memory"], + vcpus: d["vcpus"], + disk_gb: d["disk"], + networks: d["networks"], + do_created_at: d["created_at"] + }, + first_seen_at: now, + last_seen_at: now + } + end + + defp normalize_status("active"), do: "active" + defp normalize_status("off"), do: "off" + defp normalize_status("new"), do: "provisioning" + defp normalize_status(other) when is_binary(other), do: other + defp normalize_status(_), do: "unknown" +end diff --git a/lib/arcadia_cloud/provisioning/steps/wait_droplet_active.ex b/lib/arcadia_cloud/provisioning/steps/wait_droplet_active.ex new file mode 100644 index 0000000..05eec57 --- /dev/null +++ b/lib/arcadia_cloud/provisioning/steps/wait_droplet_active.ex @@ -0,0 +1,59 @@ +defmodule ArcadiaCloud.Provisioning.Steps.WaitDropletActive do + @moduledoc """ + Polls a droplet (created by a prior CreateDroplet step) until its + status is "active". Reads `droplet_id` from saga context. + + No compensation — waiting has no side effect to undo. If the saga + rolls back, the prior CreateDroplet step's compensate destroys the + droplet regardless of whether it ever reached active. + """ + + @behaviour ArcadiaCloud.Provisioning.Step + + alias ArcadiaCloud.DigitalOcean.Client + alias ArcadiaCloud.Provisioning.SagaState + + @poll_interval_ms 5_000 + @poll_max_attempts 96 + + @impl true + def name, do: "wait_droplet_active" + + @impl true + def execute(state) do + case SagaState.get_output(state, :droplet_id) do + nil -> {:error, :no_droplet_id_in_context} + droplet_id -> poll(state, droplet_id, 1) + end + end + + defp poll(_state, _droplet_id, attempt) when attempt > @poll_max_attempts do + {:error, :droplet_active_timeout} + end + + defp poll(state, droplet_id, attempt) do + case Client.get_droplet(droplet_id) do + {:ok, %{"status" => "active"} = droplet} -> + public_ip = extract_public_ip(droplet) + {:ok, SagaState.put_output(state, :droplet_public_ip, public_ip)} + + {:ok, %{"status" => status}} when status in ["new", "off"] -> + Process.sleep(@poll_interval_ms) + poll(state, droplet_id, attempt + 1) + + {:ok, %{"status" => other}} -> + {:error, {:unexpected_droplet_status, other}} + + {:error, reason} -> + {:error, reason} + end + end + + defp extract_public_ip(droplet) do + droplet + |> get_in(["networks", "v4"]) + |> List.wrap() + |> Enum.find(%{}, &(&1["type"] == "public")) + |> Map.get("ip_address") + end +end