defmodule ArcadiaCloud.Cloud do @moduledoc """ Context for cloud resource inventory. Single upsert chokepoint (`upsert_resource/2`) handles: - insert new (emit `discovered` event) - update existing (diff meaningful fields, emit `updated` event only when changed) - bump last_seen_at + reset stale_strike_count - restore previously-deleted resources Stale handling is in `mark_stale/3` — three-strike deletion across syncs. """ import Ecto.Query, warn: false alias ArcadiaCloud.Repo alias ArcadiaCloud.Cloud.{CloudProject, CloudResource, CloudResourceEvent} @meaningful_fields ~w(status name region size_slug tags attrs cloud_project_id tenant_id deployment_id)a # ---- projects ------------------------------------------------------------- def get_project_by_provider(provider, provider_id) do Repo.get_by(CloudProject, provider: provider, provider_id: provider_id) end def ensure_project(attrs) when is_map(attrs) do provider = attrs[:provider] || attrs["provider"] provider_id = attrs[:provider_id] || attrs["provider_id"] case get_project_by_provider(provider, provider_id) do nil -> %CloudProject{} |> CloudProject.changeset(attrs) |> Repo.insert() project -> project |> CloudProject.changeset(attrs) |> Repo.update() end end def list_projects, do: Repo.all(CloudProject) def skyai_internal_project do Repo.get_by(CloudProject, purpose: "skyai-infra", name: "skyai-internal") end # ---- resources ------------------------------------------------------------ def upsert_resource(attrs, opts \\ []) when is_map(attrs) do source = Keyword.get(opts, :source, "sync") provider = attrs[:provider] || attrs["provider"] provider_id = attrs[:provider_id] || attrs["provider_id"] now = DateTime.utc_now() |> DateTime.truncate(:second) attrs = attrs |> Map.put_new(:first_seen_at, now) |> Map.put(:last_seen_at, now) |> Map.put(:stale_strike_count, 0) Repo.transaction(fn -> case Repo.get_by(CloudResource, provider: provider, provider_id: provider_id) do nil -> {:ok, resource} = %CloudResource{} |> CloudResource.changeset(attrs) |> Repo.insert() write_event(resource, "discovered", nil, snapshot(resource), source) resource existing -> before_snap = snapshot(existing) {:ok, updated} = existing |> CloudResource.changeset(Map.delete(attrs, :first_seen_at)) |> Repo.update() cond do existing.deleted_at != nil -> # Was tombstoned; now seen again. Clear deleted_at and emit restore. {:ok, restored} = updated |> Ecto.Changeset.change(deleted_at: nil) |> Repo.update() write_event(restored, "restored", before_snap, snapshot(restored), source) restored meaningful_change?(before_snap, snapshot(updated)) -> write_event(updated, "updated", before_snap, snapshot(updated), source) updated true -> updated end end end) end @doc """ Three-strike deletion: any resource of `kind` whose `last_seen_at` is older than `sync_started_at` gets its strike count bumped. At 3 strikes, mark deleted. Returns count of strikes + deletions. """ def mark_stale(kind, sync_started_at, opts \\ []) do threshold = Keyword.get(opts, :threshold, 3) stale_query = from r in CloudResource, where: r.kind == ^kind, where: r.last_seen_at < ^sync_started_at, where: is_nil(r.deleted_at) # bump strikes {struck, _} = Repo.update_all(stale_query, inc: [stale_strike_count: 1]) # mark deleted those at/past threshold now = DateTime.utc_now() |> DateTime.truncate(:second) delete_query = from r in CloudResource, where: r.kind == ^kind, where: r.stale_strike_count >= ^threshold, where: is_nil(r.deleted_at) deleted = Repo.all(delete_query) Enum.each(deleted, fn r -> r |> Ecto.Changeset.change(deleted_at: now) |> Repo.update!() write_event(r, "deleted", snapshot(r), nil, "sync_stale") end) %{struck: struck, deleted: length(deleted)} end def list_resources(opts \\ []) do base = from r in CloudResource, where: is_nil(r.deleted_at), order_by: [desc: r.updated_at] base |> filter_by(:kind, opts[:kind]) |> filter_by(:tenant_id, opts[:tenant_id]) |> filter_by(:deployment_id, opts[:deployment_id]) |> Repo.all() end defp filter_by(query, _field, nil), do: query defp filter_by(query, field, value) do from(r in query, where: field(r, ^field) == ^value) end # ---- internals ------------------------------------------------------------ defp snapshot(%CloudResource{} = r) do Map.take(r, @meaningful_fields ++ [:provider_id, :name, :id]) end defp meaningful_change?(a, b) do Map.take(a, @meaningful_fields) != Map.take(b, @meaningful_fields) end defp write_event(%CloudResource{id: id}, event, before, after_, source) do %CloudResourceEvent{} |> CloudResourceEvent.changeset(%{ resource_id: id, event: event, before: before, after: after_, source: source, occurred_at: DateTime.utc_now() |> DateTime.truncate(:second) }) |> Repo.insert!() end end