From c1cbd434ac9778e8326fbeab9c1aba1b2ff37a1a Mon Sep 17 00:00:00 2001 From: Giuliano Silvestro Date: Tue, 19 May 2026 22:07:29 +1000 Subject: [PATCH] Phase 1 first chunk: inventory schema + DO droplet sync Models: - cloud_projects: arcadia-cloud's mirror of DO Projects, indexed by (provider, provider_id); tenant_id + purpose classify each project. - cloud_resources: single unified resource table; kind-specific bits in attrs JSONB; first_seen_at / last_seen_at / stale_strike_count drive three-strike deletion. - cloud_resource_events: append-only audit (discovered, updated, deleted, drift_detected, tagged, restored). ArcadiaCloud.Cloud context owns the single upsert chokepoint that: - inserts new with `discovered` event - updates existing only when meaningful fields change - restores tombstoned rows seen again - bumps last_seen_at and resets strike count mark_stale/3 implements the three-strike rule. ArcadiaCloud.DigitalOcean.Client is a Req wrapper with auto-pagination. Per-purpose token resolution via .Tokens (phase 1: env DO_API_TOKEN; phase 2: vault). Per project_arcadia_cloud memory the long-term shape is one PAT per queue purpose for rate-limit isolation. ArcadiaCloud.Sync.Bootstrap ensures the skyai-internal DO Project exists on first sync, idempotent thereafter. ArcadiaCloud.Sync.DropletsWorker runs full droplet sync on the cloud_sync_full Oban queue. InventoryController wired to real data: platform_admin sees all, tenants see only their scope. Live smoke test against real DO: 5 droplets synced; skyai-internal project auto-created; events written; endpoint returns scoped results. Co-Authored-By: Claude Opus 4.7 (1M context) --- lib/arcadia_cloud/cloud.ex | 184 ++++++++++++++++++ lib/arcadia_cloud/cloud/cloud_project.ex | 29 +++ lib/arcadia_cloud/cloud/cloud_resource.ex | 40 ++++ .../cloud/cloud_resource_event.ex | 27 +++ lib/arcadia_cloud/digital_ocean/client.ex | 106 ++++++++++ lib/arcadia_cloud/digital_ocean/tokens.ex | 24 +++ lib/arcadia_cloud/sync/bootstrap.ex | 51 +++++ lib/arcadia_cloud/sync/droplets_worker.ex | 78 ++++++++ .../controllers/inventory_controller.ex | 58 +++++- .../20260519120000_create_inventory.exs | 65 +++++++ 10 files changed, 658 insertions(+), 4 deletions(-) create mode 100644 lib/arcadia_cloud/cloud.ex create mode 100644 lib/arcadia_cloud/cloud/cloud_project.ex create mode 100644 lib/arcadia_cloud/cloud/cloud_resource.ex create mode 100644 lib/arcadia_cloud/cloud/cloud_resource_event.ex create mode 100644 lib/arcadia_cloud/digital_ocean/client.ex create mode 100644 lib/arcadia_cloud/digital_ocean/tokens.ex create mode 100644 lib/arcadia_cloud/sync/bootstrap.ex create mode 100644 lib/arcadia_cloud/sync/droplets_worker.ex create mode 100644 priv/repo/migrations/20260519120000_create_inventory.exs diff --git a/lib/arcadia_cloud/cloud.ex b/lib/arcadia_cloud/cloud.ex new file mode 100644 index 0000000..585e4de --- /dev/null +++ b/lib/arcadia_cloud/cloud.ex @@ -0,0 +1,184 @@ +defmodule ArcadiaCloud.Cloud do + @moduledoc """ + Context for cloud resource inventory. + + Single upsert chokepoint (`upsert_resource/2`) handles: + - insert new (emit `discovered` event) + - update existing (diff meaningful fields, emit `updated` event only when changed) + - bump last_seen_at + reset stale_strike_count + - restore previously-deleted resources + + Stale handling is in `mark_stale/3` — three-strike deletion across syncs. + """ + + import Ecto.Query, warn: false + + alias ArcadiaCloud.Repo + alias ArcadiaCloud.Cloud.{CloudProject, CloudResource, CloudResourceEvent} + + @meaningful_fields ~w(status name region size_slug tags attrs cloud_project_id + tenant_id deployment_id)a + + # ---- projects ------------------------------------------------------------- + + def get_project_by_provider(provider, provider_id) do + Repo.get_by(CloudProject, provider: provider, provider_id: provider_id) + end + + def ensure_project(attrs) when is_map(attrs) do + provider = attrs[:provider] || attrs["provider"] + provider_id = attrs[:provider_id] || attrs["provider_id"] + + case get_project_by_provider(provider, provider_id) do + nil -> + %CloudProject{} + |> CloudProject.changeset(attrs) + |> Repo.insert() + + project -> + project + |> CloudProject.changeset(attrs) + |> Repo.update() + end + end + + def list_projects, do: Repo.all(CloudProject) + + def skyai_internal_project do + Repo.get_by(CloudProject, purpose: "skyai-infra", name: "skyai-internal") + end + + # ---- resources ------------------------------------------------------------ + + def upsert_resource(attrs, opts \\ []) when is_map(attrs) do + source = Keyword.get(opts, :source, "sync") + provider = attrs[:provider] || attrs["provider"] + provider_id = attrs[:provider_id] || attrs["provider_id"] + now = DateTime.utc_now() |> DateTime.truncate(:second) + + attrs = + attrs + |> Map.put_new(:first_seen_at, now) + |> Map.put(:last_seen_at, now) + |> Map.put(:stale_strike_count, 0) + + Repo.transaction(fn -> + case Repo.get_by(CloudResource, provider: provider, provider_id: provider_id) do + nil -> + {:ok, resource} = + %CloudResource{} + |> CloudResource.changeset(attrs) + |> Repo.insert() + + write_event(resource, "discovered", nil, snapshot(resource), source) + resource + + existing -> + before_snap = snapshot(existing) + + {:ok, updated} = + existing + |> CloudResource.changeset(Map.delete(attrs, :first_seen_at)) + |> Repo.update() + + cond do + existing.deleted_at != nil -> + # Was tombstoned; now seen again. Clear deleted_at and emit restore. + {:ok, restored} = + updated + |> Ecto.Changeset.change(deleted_at: nil) + |> Repo.update() + + write_event(restored, "restored", before_snap, snapshot(restored), source) + restored + + meaningful_change?(before_snap, snapshot(updated)) -> + write_event(updated, "updated", before_snap, snapshot(updated), source) + updated + + true -> + updated + end + end + end) + end + + @doc """ + Three-strike deletion: any resource of `kind` whose `last_seen_at` is older + than `sync_started_at` gets its strike count bumped. At 3 strikes, mark + deleted. Returns count of strikes + deletions. + """ + def mark_stale(kind, sync_started_at, opts \\ []) do + threshold = Keyword.get(opts, :threshold, 3) + + stale_query = + from r in CloudResource, + where: r.kind == ^kind, + where: r.last_seen_at < ^sync_started_at, + where: is_nil(r.deleted_at) + + # bump strikes + {struck, _} = + Repo.update_all(stale_query, inc: [stale_strike_count: 1]) + + # mark deleted those at/past threshold + now = DateTime.utc_now() |> DateTime.truncate(:second) + + delete_query = + from r in CloudResource, + where: r.kind == ^kind, + where: r.stale_strike_count >= ^threshold, + where: is_nil(r.deleted_at) + + deleted = Repo.all(delete_query) + + Enum.each(deleted, fn r -> + r + |> Ecto.Changeset.change(deleted_at: now) + |> Repo.update!() + + write_event(r, "deleted", snapshot(r), nil, "sync_stale") + end) + + %{struck: struck, deleted: length(deleted)} + end + + def list_resources(opts \\ []) do + base = from r in CloudResource, where: is_nil(r.deleted_at), order_by: [desc: r.updated_at] + + base + |> filter_by(:kind, opts[:kind]) + |> filter_by(:tenant_id, opts[:tenant_id]) + |> filter_by(:deployment_id, opts[:deployment_id]) + |> Repo.all() + end + + defp filter_by(query, _field, nil), do: query + + defp filter_by(query, field, value) do + from(r in query, where: field(r, ^field) == ^value) + end + + # ---- internals ------------------------------------------------------------ + + defp snapshot(%CloudResource{} = r) do + Map.take(r, @meaningful_fields ++ [:provider_id, :name, :id]) + end + + defp meaningful_change?(a, b) do + Map.take(a, @meaningful_fields) != Map.take(b, @meaningful_fields) + end + + defp write_event(%CloudResource{id: id}, event, before, after_, source) do + %CloudResourceEvent{} + |> CloudResourceEvent.changeset(%{ + resource_id: id, + event: event, + before: before, + after: after_, + source: source, + occurred_at: DateTime.utc_now() |> DateTime.truncate(:second) + }) + |> Repo.insert!() + end +end diff --git a/lib/arcadia_cloud/cloud/cloud_project.ex b/lib/arcadia_cloud/cloud/cloud_project.ex new file mode 100644 index 0000000..dd3a07f --- /dev/null +++ b/lib/arcadia_cloud/cloud/cloud_project.ex @@ -0,0 +1,29 @@ +defmodule ArcadiaCloud.Cloud.CloudProject do + use Ecto.Schema + import Ecto.Changeset + + @primary_key {:id, :binary_id, autogenerate: true} + @foreign_key_type :binary_id + + schema "cloud_projects" do + field :provider, :string + field :provider_id, :string + field :name, :string + field :tenant_id, :binary_id + field :purpose, :string + field :metadata, :map, default: %{} + + timestamps(type: :utc_datetime) + end + + @required ~w(provider provider_id name purpose)a + @optional ~w(tenant_id metadata)a + + def changeset(project, attrs) do + project + |> cast(attrs, @required ++ @optional) + |> validate_required(@required) + |> validate_inclusion(:purpose, ~w(tenant-workload skyai-infra shared-services)) + |> unique_constraint([:provider, :provider_id]) + end +end diff --git a/lib/arcadia_cloud/cloud/cloud_resource.ex b/lib/arcadia_cloud/cloud/cloud_resource.ex new file mode 100644 index 0000000..39cd2b8 --- /dev/null +++ b/lib/arcadia_cloud/cloud/cloud_resource.ex @@ -0,0 +1,40 @@ +defmodule ArcadiaCloud.Cloud.CloudResource do + use Ecto.Schema + import Ecto.Changeset + + @primary_key {:id, :binary_id, autogenerate: true} + @foreign_key_type :binary_id + + schema "cloud_resources" do + field :provider, :string + field :provider_id, :string + field :kind, :string + field :name, :string + field :region, :string + field :status, :string + field :size_slug, :string + field :tenant_id, :binary_id + field :deployment_id, :binary_id + field :tags, {:array, :string}, default: [] + field :attrs, :map, default: %{} + field :first_seen_at, :utc_datetime + field :last_seen_at, :utc_datetime + field :stale_strike_count, :integer, default: 0 + field :deleted_at, :utc_datetime + + belongs_to :cloud_project, ArcadiaCloud.Cloud.CloudProject + + timestamps(type: :utc_datetime) + end + + @required ~w(provider provider_id kind name status first_seen_at last_seen_at)a + @optional ~w(region size_slug cloud_project_id tenant_id deployment_id tags attrs + stale_strike_count deleted_at)a + + def changeset(resource, attrs) do + resource + |> cast(attrs, @required ++ @optional) + |> validate_required(@required) + |> unique_constraint([:provider, :provider_id]) + end +end diff --git a/lib/arcadia_cloud/cloud/cloud_resource_event.ex b/lib/arcadia_cloud/cloud/cloud_resource_event.ex new file mode 100644 index 0000000..417a8d9 --- /dev/null +++ b/lib/arcadia_cloud/cloud/cloud_resource_event.ex @@ -0,0 +1,27 @@ +defmodule ArcadiaCloud.Cloud.CloudResourceEvent do + use Ecto.Schema + import Ecto.Changeset + + @primary_key {:id, :binary_id, autogenerate: true} + @foreign_key_type :binary_id + + schema "cloud_resource_events" do + field :event, :string + field :before, :map + field :after, :map + field :source, :string + field :occurred_at, :utc_datetime + + belongs_to :resource, ArcadiaCloud.Cloud.CloudResource + end + + @required ~w(resource_id event source occurred_at)a + @optional ~w(before after)a + + def changeset(event, attrs) do + event + |> cast(attrs, @required ++ @optional) + |> validate_required(@required) + |> validate_inclusion(:event, ~w(discovered updated deleted drift_detected tagged restored)) + end +end diff --git a/lib/arcadia_cloud/digital_ocean/client.ex b/lib/arcadia_cloud/digital_ocean/client.ex new file mode 100644 index 0000000..892aa2c --- /dev/null +++ b/lib/arcadia_cloud/digital_ocean/client.ex @@ -0,0 +1,106 @@ +defmodule ArcadiaCloud.DigitalOcean.Client do + @moduledoc """ + Thin Req wrapper over the DigitalOcean v2 API. + + Token resolution: per-purpose, looked up via `ArcadiaCloud.DigitalOcean.Tokens`. + Phase 0/1: env var `DO_API_TOKEN`. Phase 2: from the secrets vault. + + Paginated list endpoints stream all pages by default. + """ + + alias ArcadiaCloud.DigitalOcean.Tokens + + @base "https://api.digitalocean.com/v2" + @page_size 100 + + # ---- public --------------------------------------------------------------- + + def list_droplets(opts \\ []), do: list_paginated("/droplets", "droplets", opts) + def list_projects(opts \\ []), do: list_paginated("/projects", "projects", opts) + + def create_project(name, purpose, description \\ "", opts \\ []) do + body = %{ + name: name, + purpose: purpose, + description: description, + environment: "Development" + } + + request(:post, "/projects", body: body, purpose: opts[:purpose] || "provisioning") + |> case do + {:ok, %{"project" => project}} -> {:ok, project} + other -> other + end + end + + def list_project_resources(project_id, opts \\ []) do + list_paginated("/projects/#{project_id}/resources", "resources", opts) + end + + def assign_to_project(project_id, urns, opts \\ []) when is_list(urns) do + request(:post, "/projects/#{project_id}/resources", + body: %{resources: urns}, + purpose: opts[:purpose] || "provisioning" + ) + end + + # ---- core ----------------------------------------------------------------- + + defp list_paginated(path, root_key, opts) do + purpose = opts[:purpose] || "sync_full" + do_paginate(path, root_key, purpose, [], 1) + end + + defp do_paginate(path, root_key, purpose, acc, page) do + params = [page: page, per_page: @page_size] + + case request(:get, path, params: params, purpose: purpose) do + {:ok, %{} = body} -> + items = Map.get(body, root_key, []) + new_acc = acc ++ items + + if has_next?(body) do + do_paginate(path, root_key, purpose, new_acc, page + 1) + else + {:ok, new_acc} + end + + err -> + err + end + end + + defp has_next?(%{"links" => %{"pages" => %{"next" => _}}}), do: true + defp has_next?(_), do: false + + defp request(method, path, opts) do + purpose = Keyword.fetch!(opts, :purpose) + + with {:ok, token} <- Tokens.fetch(purpose) do + req_opts = + [ + method: method, + url: @base <> path, + headers: [{"authorization", "Bearer " <> token}], + retry: :transient, + max_retries: 3 + ] + |> maybe_put(:params, opts[:params]) + |> maybe_put(:json, opts[:body]) + + case Req.request(req_opts) do + {:ok, %Req.Response{status: status, body: body}} when status in 200..299 -> + {:ok, body} + + {:ok, %Req.Response{status: status, body: body}} -> + {:error, {:http, status, body}} + + {:error, exception} -> + {:error, {:transport, exception}} + end + end + end + + defp maybe_put(opts, _key, nil), do: opts + defp maybe_put(opts, key, value), do: Keyword.put(opts, key, value) +end diff --git a/lib/arcadia_cloud/digital_ocean/tokens.ex b/lib/arcadia_cloud/digital_ocean/tokens.ex new file mode 100644 index 0000000..16d4d31 --- /dev/null +++ b/lib/arcadia_cloud/digital_ocean/tokens.ex @@ -0,0 +1,24 @@ +defmodule ArcadiaCloud.DigitalOcean.Tokens do + @moduledoc """ + DO API token resolver. Per-purpose so worker queues use separate tokens + (rate limit isolation + blast radius — see project_arcadia_cloud memory). + + Phase 0/1 implementation: all purposes fall back to the single + `DO_API_TOKEN` env var (or `:default_token` app env). + Phase 2: read per-purpose bundles from the secrets vault. + """ + + @env_var "DO_API_TOKEN" + + def fetch(purpose) when is_binary(purpose) do + case resolve(purpose) do + nil -> {:error, :no_token_configured} + token -> {:ok, token} + end + end + + defp resolve(_purpose) do + Application.get_env(:arcadia_cloud, :do_api_token) || + System.get_env(@env_var) + end +end diff --git a/lib/arcadia_cloud/sync/bootstrap.ex b/lib/arcadia_cloud/sync/bootstrap.ex new file mode 100644 index 0000000..81e5e4d --- /dev/null +++ b/lib/arcadia_cloud/sync/bootstrap.ex @@ -0,0 +1,51 @@ +defmodule ArcadiaCloud.Sync.Bootstrap do + @moduledoc """ + First-run bootstrap: ensure the `skyai-internal` DO Project exists and + is registered in our `cloud_projects` table. Resources discovered before + any tenant project exists land here by default. + + Idempotent: safe to call on every sync. + """ + + alias ArcadiaCloud.Cloud + alias ArcadiaCloud.DigitalOcean.Client + + @internal_name "skyai-internal" + @internal_purpose "skyai-infra" + @do_purpose "Service or API" + + def ensure_skyai_internal do + case Cloud.skyai_internal_project() do + %{} = project -> + {:ok, project} + + nil -> + with {:ok, do_project} <- find_or_create_do_project() do + {:ok, _local} = + Cloud.ensure_project(%{ + provider: "digitalocean", + provider_id: do_project["id"], + name: @internal_name, + purpose: @internal_purpose, + metadata: %{ + do_purpose: do_project["purpose"], + description: do_project["description"] + } + }) + end + end + end + + defp find_or_create_do_project do + with {:ok, projects} <- Client.list_projects() do + case Enum.find(projects, &(&1["name"] == @internal_name)) do + nil -> + Client.create_project(@internal_name, @do_purpose, + "Sky AI internal infrastructure (auto-created by arcadia-cloud)") + + existing -> + {:ok, existing} + end + end + end +end diff --git a/lib/arcadia_cloud/sync/droplets_worker.ex b/lib/arcadia_cloud/sync/droplets_worker.ex new file mode 100644 index 0000000..13bc5e9 --- /dev/null +++ b/lib/arcadia_cloud/sync/droplets_worker.ex @@ -0,0 +1,78 @@ +defmodule ArcadiaCloud.Sync.DropletsWorker do + @moduledoc """ + Full sync of DigitalOcean droplets. + + Idempotent: re-runs are no-ops absent state changes. Three-strike deletion + for resources that vanish from N consecutive syncs. + + Tenant attribution is derived in two passes — phase 1 (this worker) tags + all discovered resources to the `skyai-internal` project by default. + Phase 1+ will resolve actual tenant projects via DO Projects API. + """ + + use Oban.Worker, queue: :cloud_sync_full, max_attempts: 3 + + alias ArcadiaCloud.Cloud + alias ArcadiaCloud.DigitalOcean.Client + alias ArcadiaCloud.Sync.Bootstrap + + @kind "droplet" + @provider "digitalocean" + + @impl Oban.Worker + def perform(_job) do + sync_started_at = DateTime.utc_now() |> DateTime.truncate(:second) + + with {:ok, _project} <- Bootstrap.ensure_skyai_internal(), + {:ok, droplets} <- Client.list_droplets() do + internal = Cloud.skyai_internal_project() + + Enum.each(droplets, fn d -> + Cloud.upsert_resource(normalize(d, internal, sync_started_at)) + end) + + Cloud.mark_stale(@kind, sync_started_at) + :ok + end + end + + # ---- normalization -------------------------------------------------------- + + defp normalize(d, project, now) do + %{ + provider: @provider, + provider_id: to_string(d["id"]), + kind: @kind, + name: d["name"], + region: get_in(d, ["region", "slug"]), + status: normalize_status(d["status"]), + size_slug: d["size_slug"], + cloud_project_id: project && project.id, + tags: d["tags"] || [], + attrs: %{ + memory_mb: d["memory"], + vcpus: d["vcpus"], + disk_gb: d["disk"], + image: take_image(d["image"]), + networks: d["networks"], + features: d["features"], + do_created_at: d["created_at"] + }, + first_seen_at: now, + last_seen_at: now + } + end + + defp normalize_status("active"), do: "active" + defp normalize_status("off"), do: "off" + defp normalize_status("new"), do: "provisioning" + defp normalize_status("archive"), do: "archived" + defp normalize_status(other) when is_binary(other), do: other + defp normalize_status(_), do: "unknown" + + defp take_image(nil), do: nil + + defp take_image(image) when is_map(image) do + Map.take(image, ["id", "name", "slug", "distribution"]) + end +end diff --git a/lib/arcadia_cloud_web/controllers/inventory_controller.ex b/lib/arcadia_cloud_web/controllers/inventory_controller.ex index c477235..a8d3e98 100644 --- a/lib/arcadia_cloud_web/controllers/inventory_controller.ex +++ b/lib/arcadia_cloud_web/controllers/inventory_controller.ex @@ -1,12 +1,62 @@ defmodule ArcadiaCloudWeb.InventoryController do @moduledoc """ - Cloud resource inventory. Phase 0 stub — returns an empty list. - Phase 1 wires this to `cloud_resources` filtered by tenant scope. + Cloud resource inventory. + + Scope rules (phase 1): + - platform_admin tenants see every non-deleted resource + - other tenants see only resources tagged to their tenant_id + + Filters: `?kind=droplet`, `?deployment_id=...` """ use ArcadiaCloudWeb, :controller - def index(conn, _params) do - json(conn, %{resources: []}) + alias ArcadiaCloud.Cloud + + def index(conn, params) do + identity = conn.assigns.current_identity + + base_opts = + [] + |> maybe_put(:kind, params["kind"]) + |> maybe_put(:deployment_id, params["deployment_id"]) + + opts = + if platform_admin?(identity) do + base_opts + else + Keyword.put(base_opts, :tenant_id, identity.tenant_id) + end + + resources = + Cloud.list_resources(opts) + |> Enum.map(&shape/1) + + json(conn, %{resources: resources, count: length(resources)}) + end + + defp platform_admin?(%{roles: roles}) when is_list(roles), do: "platform_admin" in roles + defp platform_admin?(_), do: false + + defp maybe_put(opts, _key, nil), do: opts + defp maybe_put(opts, _key, ""), do: opts + defp maybe_put(opts, key, value), do: Keyword.put(opts, key, value) + + defp shape(r) do + %{ + id: r.id, + provider: r.provider, + provider_id: r.provider_id, + kind: r.kind, + name: r.name, + region: r.region, + status: r.status, + size_slug: r.size_slug, + tenant_id: r.tenant_id, + deployment_id: r.deployment_id, + tags: r.tags, + first_seen_at: r.first_seen_at, + last_seen_at: r.last_seen_at + } end end diff --git a/priv/repo/migrations/20260519120000_create_inventory.exs b/priv/repo/migrations/20260519120000_create_inventory.exs new file mode 100644 index 0000000..3547cfa --- /dev/null +++ b/priv/repo/migrations/20260519120000_create_inventory.exs @@ -0,0 +1,65 @@ +defmodule ArcadiaCloud.Repo.Migrations.CreateInventory do + use Ecto.Migration + + def change do + create table(:cloud_projects, primary_key: false) do + add :id, :binary_id, primary_key: true + add :provider, :string, null: false + add :provider_id, :string, null: false + add :name, :string, null: false + add :tenant_id, :binary_id + add :purpose, :string, null: false + add :metadata, :map, default: %{}, null: false + + timestamps(type: :utc_datetime) + end + + create unique_index(:cloud_projects, [:provider, :provider_id]) + create index(:cloud_projects, [:tenant_id]) + create index(:cloud_projects, [:purpose]) + + create table(:cloud_resources, primary_key: false) do + add :id, :binary_id, primary_key: true + add :provider, :string, null: false + add :provider_id, :string, null: false + add :kind, :string, null: false + add :name, :string, null: false + add :region, :string + add :status, :string, null: false + add :size_slug, :string + add :cloud_project_id, references(:cloud_projects, type: :binary_id, on_delete: :nilify_all) + add :tenant_id, :binary_id + add :deployment_id, :binary_id + add :tags, {:array, :string}, default: [], null: false + add :attrs, :map, default: %{}, null: false + add :first_seen_at, :utc_datetime, null: false + add :last_seen_at, :utc_datetime, null: false + add :stale_strike_count, :integer, default: 0, null: false + add :deleted_at, :utc_datetime + + timestamps(type: :utc_datetime) + end + + create unique_index(:cloud_resources, [:provider, :provider_id]) + create index(:cloud_resources, [:kind]) + create index(:cloud_resources, [:tenant_id]) + create index(:cloud_resources, [:deployment_id]) + create index(:cloud_resources, [:cloud_project_id]) + create index(:cloud_resources, [:last_seen_at]) + create index(:cloud_resources, [:deleted_at]) + + create table(:cloud_resource_events, primary_key: false) do + add :id, :binary_id, primary_key: true + add :resource_id, references(:cloud_resources, type: :binary_id, on_delete: :delete_all), + null: false + add :event, :string, null: false + add :before, :map + add :after, :map + add :source, :string, null: false + add :occurred_at, :utc_datetime, null: false + end + + create index(:cloud_resource_events, [:resource_id, :occurred_at]) + create index(:cloud_resource_events, [:event]) + end +end