The most load-bearing write workflow — droplet provisioning is the spine
of phase 4a deployment onboarding.
DigitalOcean.Client: create_droplet, get_droplet, list_droplets_by_tag,
destroy_droplet. list_paginated/3 now threads caller-supplied params
(opts[:params]) through pagination so tag-filtered listing works.
Four droplet saga steps:
- CreateDroplet — POST a droplet, tagged arcadia-saga-<saga8> +
managed-by-arcadia-cloud. Idempotency: re-run checks context for
droplet_id, then queries DO by the saga tag, so a crash between POST
and context-save adopts the existing droplet. compensate destroys it.
- WaitDropletActive — polls get_droplet until status "active" (96x5s);
records the public IP. No compensation (waiting has no side effect).
- RegisterDroplet — fetches the droplet, upserts it into cloud_resources
(inventory consistent immediately, not at next 15-min sync) and writes
cloud_provisioned desired-state {size_slug, region, image}. compensate
removes the DB rows (the droplet itself is destroyed by CreateDroplet's
compensate).
- DestroyDroplet — DELETE the droplet + mark its cloud_resources row
deleted. Terminal/irreversible: compensate is a logged noop, per the
saga design destroy-class steps don't roll back.
Provisioning helpers:
- provision_droplet/1 — [CreateDroplet, WaitDropletActive, RegisterDroplet]
- destroy_droplet/2 — [DestroyDroplet]
Live smoke verified end-to-end (full create + destroy on a real
s-1vcpu-512mb-10gb droplet in syd1):
- provision saga completed: droplet 572017320 created, reached active
with public IP, registered into cloud_resources (status=active) +
cloud_provisioned (spec recorded).
- destroy saga completed: cloud_resources row marked deleted; droplet
confirmed 404 on DO afterward. Account back to its original 5
droplets, zero leftover, ~1 cent total cost.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
267 lines
8.8 KiB
Elixir
267 lines
8.8 KiB
Elixir
defmodule ArcadiaCloud.DigitalOcean.Client do
|
|
@moduledoc """
|
|
Thin Req wrapper over the DigitalOcean v2 API.
|
|
|
|
Token resolution: per-purpose, looked up via `ArcadiaCloud.DigitalOcean.Tokens`.
|
|
Phase 0/1: env var `DO_API_TOKEN`. Phase 2: from the secrets vault.
|
|
|
|
Paginated list endpoints stream all pages by default.
|
|
"""
|
|
|
|
alias ArcadiaCloud.DigitalOcean.Tokens
|
|
|
|
@base "https://api.digitalocean.com/v2"
|
|
@page_size 100
|
|
|
|
# ---- public ---------------------------------------------------------------
|
|
|
|
def list_droplets(opts \\ []), do: list_paginated("/droplets", "droplets", opts)
|
|
def list_projects(opts \\ []), do: list_paginated("/projects", "projects", opts)
|
|
def list_domains(opts \\ []), do: list_paginated("/domains", "domains", opts)
|
|
def list_volumes(opts \\ []), do: list_paginated("/volumes", "volumes", opts)
|
|
def list_floating_ips(opts \\ []), do: list_paginated("/floating_ips", "floating_ips", opts)
|
|
def list_snapshots(opts \\ []), do: list_paginated("/snapshots", "snapshots", opts)
|
|
def list_firewalls(opts \\ []), do: list_paginated("/firewalls", "firewalls", opts)
|
|
def list_load_balancers(opts \\ []), do: list_paginated("/load_balancers", "load_balancers", opts)
|
|
|
|
def list_droplet_backups(droplet_id, opts \\ []) do
|
|
list_paginated("/droplets/#{droplet_id}/backups", "backups", opts)
|
|
end
|
|
|
|
def list_droplet_snapshots(droplet_id, opts \\ []) do
|
|
list_paginated("/droplets/#{droplet_id}/snapshots", "snapshots", opts)
|
|
end
|
|
|
|
# ---- droplet lifecycle ----------------------------------------------------
|
|
|
|
@doc """
|
|
Create a droplet. `attrs` must include name + region + size + image;
|
|
optional ssh_keys, tags, backups, ipv6, user_data, vpc_uuid.
|
|
Returns {:ok, droplet} — droplet is created async, poll get_droplet/2
|
|
until status is "active".
|
|
"""
|
|
def create_droplet(attrs, opts \\ []) do
|
|
case request(:post, "/droplets", body: attrs, purpose: opts[:purpose] || "provisioning") do
|
|
{:ok, %{"droplet" => droplet}} -> {:ok, droplet}
|
|
other -> other
|
|
end
|
|
end
|
|
|
|
def get_droplet(droplet_id, opts \\ []) do
|
|
case request(:get, "/droplets/#{droplet_id}", purpose: opts[:purpose] || "provisioning") do
|
|
{:ok, %{"droplet" => droplet}} -> {:ok, droplet}
|
|
other -> other
|
|
end
|
|
end
|
|
|
|
@doc "Lists droplets filtered by a tag — used for saga idempotency recovery."
|
|
def list_droplets_by_tag(tag, opts \\ []) do
|
|
list_paginated("/droplets", "droplets",
|
|
Keyword.merge(opts, params: [tag_name: tag], purpose: opts[:purpose] || "provisioning"))
|
|
end
|
|
|
|
def destroy_droplet(droplet_id, opts \\ []) do
|
|
request(:delete, "/droplets/#{droplet_id}", purpose: opts[:purpose] || "provisioning")
|
|
end
|
|
|
|
# ---- write actions --------------------------------------------------------
|
|
|
|
@doc """
|
|
Request a snapshot of a droplet. Returns {:ok, action} — the snapshot
|
|
is created asynchronously; poll `get_droplet_action/3` until the action
|
|
status is "completed".
|
|
"""
|
|
def create_droplet_snapshot(droplet_id, snapshot_name, opts \\ []) do
|
|
case request(:post, "/droplets/#{droplet_id}/actions",
|
|
body: %{type: "snapshot", name: snapshot_name},
|
|
purpose: opts[:purpose] || "provisioning"
|
|
) do
|
|
{:ok, %{"action" => action}} -> {:ok, action}
|
|
other -> other
|
|
end
|
|
end
|
|
|
|
def get_droplet_action(droplet_id, action_id, opts \\ []) do
|
|
case request(:get, "/droplets/#{droplet_id}/actions/#{action_id}",
|
|
purpose: opts[:purpose] || "provisioning"
|
|
) do
|
|
{:ok, %{"action" => action}} -> {:ok, action}
|
|
other -> other
|
|
end
|
|
end
|
|
|
|
def delete_snapshot(snapshot_id, opts \\ []) do
|
|
request(:delete, "/snapshots/#{snapshot_id}", purpose: opts[:purpose] || "provisioning")
|
|
end
|
|
|
|
# ---- DNS records ----------------------------------------------------------
|
|
|
|
def list_domain_records(domain, opts \\ []) do
|
|
list_paginated("/domains/#{domain}/records", "domain_records",
|
|
Keyword.put(opts, :purpose, opts[:purpose] || "sync_full"))
|
|
end
|
|
|
|
@doc """
|
|
Create a DNS record. `attrs` must include type + name + data; optional
|
|
ttl, priority, port, weight, flags, tag. Returns {:ok, record}.
|
|
"""
|
|
def create_domain_record(domain, attrs, opts \\ []) do
|
|
case request(:post, "/domains/#{domain}/records",
|
|
body: attrs,
|
|
purpose: opts[:purpose] || "provisioning"
|
|
) do
|
|
{:ok, %{"domain_record" => record}} -> {:ok, record}
|
|
other -> other
|
|
end
|
|
end
|
|
|
|
def update_domain_record(domain, record_id, attrs, opts \\ []) do
|
|
case request(:put, "/domains/#{domain}/records/#{record_id}",
|
|
body: attrs,
|
|
purpose: opts[:purpose] || "provisioning"
|
|
) do
|
|
{:ok, %{"domain_record" => record}} -> {:ok, record}
|
|
other -> other
|
|
end
|
|
end
|
|
|
|
def delete_domain_record(domain, record_id, opts \\ []) do
|
|
request(:delete, "/domains/#{domain}/records/#{record_id}",
|
|
purpose: opts[:purpose] || "provisioning"
|
|
)
|
|
end
|
|
|
|
# ---- billing --------------------------------------------------------------
|
|
|
|
def get_balance(opts \\ []) do
|
|
request(:get, "/customers/my/balance", purpose: opts[:purpose] || "billing")
|
|
end
|
|
|
|
def list_billing_history(opts \\ []) do
|
|
list_paginated("/customers/my/billing_history", "billing_history",
|
|
Keyword.put(opts, :purpose, opts[:purpose] || "billing"))
|
|
end
|
|
|
|
def get_invoice_summary(invoice_uuid, opts \\ []) do
|
|
request(:get, "/customers/my/invoices/#{invoice_uuid}/summary",
|
|
purpose: opts[:purpose] || "billing")
|
|
end
|
|
|
|
@doc """
|
|
Fetch the CSV body for an invoice. Returns {:ok, csv_string} | {:error, _}.
|
|
"""
|
|
def fetch_invoice_csv(invoice_uuid, opts \\ []) do
|
|
purpose = opts[:purpose] || "billing"
|
|
|
|
with {:ok, token} <- Tokens.fetch(purpose) do
|
|
case Req.request(
|
|
method: :get,
|
|
url: @base <> "/customers/my/invoices/#{invoice_uuid}/csv",
|
|
headers: [
|
|
{"authorization", "Bearer " <> token},
|
|
{"accept", "text/csv"}
|
|
],
|
|
retry: :transient,
|
|
max_retries: 3,
|
|
decode_body: false
|
|
) do
|
|
{:ok, %Req.Response{status: 200, body: body}} when is_binary(body) ->
|
|
{:ok, body}
|
|
|
|
{:ok, %Req.Response{status: status, body: body}} ->
|
|
{:error, {:http, status, body}}
|
|
|
|
{:error, e} ->
|
|
{:error, {:transport, e}}
|
|
end
|
|
end
|
|
end
|
|
|
|
def create_project(name, purpose, description \\ "", opts \\ []) do
|
|
body = %{
|
|
name: name,
|
|
purpose: purpose,
|
|
description: description,
|
|
environment: "Development"
|
|
}
|
|
|
|
request(:post, "/projects", body: body, purpose: opts[:purpose] || "provisioning")
|
|
|> case do
|
|
{:ok, %{"project" => project}} -> {:ok, project}
|
|
other -> other
|
|
end
|
|
end
|
|
|
|
def list_project_resources(project_id, opts \\ []) do
|
|
list_paginated("/projects/#{project_id}/resources", "resources", opts)
|
|
end
|
|
|
|
def assign_to_project(project_id, urns, opts \\ []) when is_list(urns) do
|
|
request(:post, "/projects/#{project_id}/resources",
|
|
body: %{resources: urns},
|
|
purpose: opts[:purpose] || "provisioning"
|
|
)
|
|
end
|
|
|
|
# ---- core -----------------------------------------------------------------
|
|
|
|
defp list_paginated(path, root_key, opts) do
|
|
purpose = opts[:purpose] || "sync_full"
|
|
extra_params = opts[:params] || []
|
|
do_paginate(path, root_key, purpose, extra_params, [], 1)
|
|
end
|
|
|
|
defp do_paginate(path, root_key, purpose, extra_params, acc, page) do
|
|
params = [page: page, per_page: @page_size] ++ extra_params
|
|
|
|
case request(:get, path, params: params, purpose: purpose) do
|
|
{:ok, %{} = body} ->
|
|
items = Map.get(body, root_key, [])
|
|
new_acc = acc ++ items
|
|
|
|
if has_next?(body) do
|
|
do_paginate(path, root_key, purpose, extra_params, new_acc, page + 1)
|
|
else
|
|
{:ok, new_acc}
|
|
end
|
|
|
|
err ->
|
|
err
|
|
end
|
|
end
|
|
|
|
defp has_next?(%{"links" => %{"pages" => %{"next" => _}}}), do: true
|
|
defp has_next?(_), do: false
|
|
|
|
defp request(method, path, opts) do
|
|
purpose = Keyword.fetch!(opts, :purpose)
|
|
|
|
with {:ok, token} <- Tokens.fetch(purpose) do
|
|
req_opts =
|
|
[
|
|
method: method,
|
|
url: @base <> path,
|
|
headers: [{"authorization", "Bearer " <> token}],
|
|
retry: :transient,
|
|
max_retries: 3
|
|
]
|
|
|> maybe_put(:params, opts[:params])
|
|
|> maybe_put(:json, opts[:body])
|
|
|
|
case Req.request(req_opts) do
|
|
{:ok, %Req.Response{status: status, body: body}} when status in 200..299 ->
|
|
{:ok, body}
|
|
|
|
{:ok, %Req.Response{status: status, body: body}} ->
|
|
{:error, {:http, status, body}}
|
|
|
|
{:error, exception} ->
|
|
{:error, {:transport, exception}}
|
|
end
|
|
end
|
|
end
|
|
|
|
defp maybe_put(opts, _key, nil), do: opts
|
|
defp maybe_put(opts, key, value), do: Keyword.put(opts, key, value)
|
|
end
|