From f43307232d8fa70c2b3cdc721a52e24e2baf8158 Mon Sep 17 00:00:00 2001 From: rylandbangerter Date: Wed, 4 Feb 2026 13:47:44 -0700 Subject: [PATCH 1/2] Fix concurrent index migrations with schema-driven multitenancy - Detect migrations with @disable_ddl_transaction attribute - Run concurrent index migrations outside transaction context - Use Ecto.Adapters.SQL.checkout to get fresh connection - Add comprehensive test suite for concurrent index scenarios - Maintain backward compatibility with regular migrations Fixes issue where CREATE INDEX CONCURRENTLY fails when creating tenants --- lib/multitenancy.ex | 98 ++++-- test/concurrent_index_multitenancy_test.exs | 339 ++++++++++++++++++++ 2 files changed, 419 insertions(+), 18 deletions(-) create mode 100644 test/concurrent_index_multitenancy_test.exs diff --git a/lib/multitenancy.ex b/lib/multitenancy.ex index 56d3f721..a2dcbd83 100644 --- a/lib/multitenancy.ex +++ b/lib/multitenancy.ex @@ -5,7 +5,6 @@ defmodule AshPostgres.MultiTenancy do @moduledoc false - @dialyzer {:nowarn_function, load_migration!: 1} require Logger # sobelow_skip ["SQL.Query"] @@ -44,21 +43,32 @@ defmodule AshPostgres.MultiTenancy do end) |> Enum.map(&extract_migration_info/1) |> Enum.filter(& &1) - |> Enum.map(&load_migration!/1) - |> Enum.each(fn {version, mod} -> - Ecto.Migration.Runner.run( - repo, - [], - version, - mod, - :forward, - :up, - :up, - all: true, - prefix: tenant_name - ) - - Ecto.Migration.SchemaMigration.up(repo, repo.config(), version, prefix: tenant_name) + |> Enum.map(&load_migration_with_file!/1) + |> Enum.each(fn {version, mod, file} -> + requires_no_transaction? = migration_requires_no_transaction?(file, mod) + + if requires_no_transaction? do + # For migrations that require no transaction (e.g., concurrent indexes), + # we need to ensure they run outside of any transaction. + # Ecto.Migration.Runner.run will handle @disable_ddl_transaction correctly + # if we're not already in a transaction, so we use a separate connection + # or ensure we're not in a transaction. + run_migration_without_transaction(repo, version, mod, tenant_name) + else + Ecto.Migration.Runner.run( + repo, + [], + version, + mod, + :forward, + :up, + :up, + all: true, + prefix: tenant_name + ) + + Ecto.Migration.SchemaMigration.up(repo, repo.config(), version, prefix: tenant_name) + end end) end @@ -74,17 +84,18 @@ defmodule AshPostgres.MultiTenancy do :ok end - defp load_migration!({version, _, file}) when is_binary(file) do + defp load_migration_with_file!({version, _, file}) when is_binary(file) do loaded_modules = file |> compile_file() |> Enum.map(&elem(&1, 0)) if mod = Enum.find(loaded_modules, &migration?/1) do - {version, mod} + {version, mod, file} else raise Ecto.MigrationError, "file #{Path.relative_to_cwd(file)} does not define an Ecto.Migration" end end + defp compile_file(file) do AshPostgres.MigrationCompileCache.start_link() AshPostgres.MigrationCompileCache.compile_file(file) @@ -121,4 +132,55 @@ defmodule AshPostgres.MultiTenancy do defp tenant_name_regex do ~r/^[a-zA-Z0-9_-]+$/ end + + # Check if a migration requires no transaction by examining the file content + # and the migration module attributes + defp migration_requires_no_transaction?(file, mod) do + # First check the file content for @disable_ddl_transaction + file_content = File.read!(file) + has_disable_ddl_transaction? = String.contains?(file_content, "@disable_ddl_transaction") + + # Also check if the module has the attribute set (for compiled modules) + has_module_attribute? = + try do + if function_exported?(mod, :__migration__, 0) do + migration_info = mod.__migration__() + Map.get(migration_info, :disable_ddl_transaction, false) + else + false + end + rescue + _ -> false + end + + has_disable_ddl_transaction? || has_module_attribute? + end + + # Run a migration that requires no transaction outside of any transaction context + defp run_migration_without_transaction(repo, version, mod, tenant_name) do + # For migrations that require no transaction (e.g., concurrent indexes), + # we need to ensure they run outside of any transaction. + # Ecto.Migration.Runner.run respects @disable_ddl_transaction, but if we're + # already in a transaction, PostgreSQL will still error. + # + # We use Ecto.Adapters.SQL.checkout/3 to get a fresh connection from the pool + # that's not part of any transaction, ensuring the migration runs correctly. + config = repo.config() + + Ecto.Adapters.SQL.checkout(repo, config, fn -> + Ecto.Migration.Runner.run( + repo, + [], + version, + mod, + :forward, + :up, + :up, + all: true, + prefix: tenant_name + ) + + Ecto.Migration.SchemaMigration.up(repo, config, version, prefix: tenant_name) + end) + end end diff --git a/test/concurrent_index_multitenancy_test.exs b/test/concurrent_index_multitenancy_test.exs new file mode 100644 index 00000000..9e795774 --- /dev/null +++ b/test/concurrent_index_multitenancy_test.exs @@ -0,0 +1,339 @@ +# SPDX-FileCopyrightText: 2019 ash_postgres contributors +# +# SPDX-License-Identifier: MIT + +defmodule AshPostgres.Test.ConcurrentIndexMultitenancyTest do + use AshPostgres.RepoCase, async: false + + alias AshPostgres.MultiTenancy + + @temp_migrations_dir "priv/test_repo/temp_tenant_migrations" + + setup do + # Create temporary migrations directory + File.mkdir_p!(@temp_migrations_dir) + + # Ensure create_tenant!/2 reads tenant migrations from our temp directory. + # AshPostgres.MultiTenancy.create_tenant!/2 does not accept an explicit migrations_path, + # so we temporarily set the repo config value it consults. + original_repo_env = Application.get_env(:ash_postgres, AshPostgres.TestRepo, []) + + Application.put_env( + :ash_postgres, + AshPostgres.TestRepo, + Keyword.put(original_repo_env, :tenant_migrations_path, @temp_migrations_dir) + ) + + on_exit(fn -> + # Clean up temporary migrations directory + File.rm_rf!(@temp_migrations_dir) + + # Restore repo config + Application.put_env(:ash_postgres, AshPostgres.TestRepo, original_repo_env) + + # Clean up any test schemas + test_tenants = [ + "test_tenant_concurrent", + "test_tenant_regular", + "test_tenant_mixed", + "test_tenant_multiple_concurrent" + ] + + for tenant <- test_tenants do + try do + Ecto.Adapters.SQL.query!( + AshPostgres.TestRepo, + "DROP SCHEMA IF EXISTS \"#{tenant}\" CASCADE", + [] + ) + rescue + _ -> :ok + end + end + end) + + :ok + end + + describe "concurrent index migrations with multitenancy" do + test "create_tenant! works with concurrent index migration" do + tenant_name = "test_tenant_concurrent" + migration_file = Path.join(@temp_migrations_dir, "20250101000001_create_table_with_concurrent_index.exs") + + # Create migration file with concurrent index + migration_content = """ + defmodule AshPostgres.TestRepo.TempTenantMigrations.CreateTableWithConcurrentIndex do + use Ecto.Migration + + @disable_ddl_transaction true + + def up do + create table(:test_table, primary_key: false, prefix: prefix()) do + add :id, :uuid, null: false, default: fragment("uuid_generate_v4()"), primary_key: true + add :name, :text + add :email, :text + end + + create index(:test_table, [:email], concurrently: true, prefix: prefix()) + end + + def down do + drop index(:test_table, [:email], prefix: prefix()) + drop table(:test_table, prefix: prefix()) + end + end + """ + + File.write!(migration_file, migration_content) + + # Create tenant - this should succeed without transaction errors + assert :ok = try_create_tenant(tenant_name, @temp_migrations_dir) + + # Verify schema exists + assert schema_exists?(tenant_name) + + # Verify table exists + assert table_exists?(tenant_name, "test_table") + + # Verify concurrent index exists + assert index_exists?(tenant_name, "test_table", "test_table_email_index") + end + + test "migrate_tenant works with regular (non-concurrent) migration" do + tenant_name = "test_tenant_regular" + migration_file = Path.join(@temp_migrations_dir, "20250101000001_create_table_regular.exs") + + # Create migration file without concurrent index + migration_content = """ + defmodule AshPostgres.TestRepo.TempTenantMigrations.CreateTableRegular do + use Ecto.Migration + + def up do + create table(:regular_table, primary_key: false, prefix: prefix()) do + add :id, :uuid, null: false, default: fragment("uuid_generate_v4()"), primary_key: true + add :name, :text + add :status, :text + end + + create index(:regular_table, [:status], prefix: prefix()) + end + + def down do + drop index(:regular_table, [:status], prefix: prefix()) + drop table(:regular_table, prefix: prefix()) + end + end + """ + + File.write!(migration_file, migration_content) + + # Create schema first + Ecto.Adapters.SQL.query!( + AshPostgres.TestRepo, + "CREATE SCHEMA IF NOT EXISTS \"#{tenant_name}\"", + [] + ) + + # Migrate tenant - this should succeed + assert :ok = try_migrate_tenant(tenant_name, @temp_migrations_dir) + + # Verify schema exists + assert schema_exists?(tenant_name) + + # Verify table exists + assert table_exists?(tenant_name, "regular_table") + + # Verify index exists + assert index_exists?(tenant_name, "regular_table", "regular_table_status_index") + end + + test "migrate_tenant works with mixed migrations (concurrent and non-concurrent)" do + tenant_name = "test_tenant_mixed" + + # Create first migration without concurrent index + migration1_file = Path.join(@temp_migrations_dir, "20250101000001_create_table_first.exs") + migration1_content = """ + defmodule AshPostgres.TestRepo.TempTenantMigrations.CreateTableFirst do + use Ecto.Migration + + def up do + create table(:first_table, primary_key: false, prefix: prefix()) do + add :id, :uuid, null: false, default: fragment("uuid_generate_v4()"), primary_key: true + add :name, :text + end + + create index(:first_table, [:name], prefix: prefix()) + end + + def down do + drop index(:first_table, [:name], prefix: prefix()) + drop table(:first_table, prefix: prefix()) + end + end + """ + + File.write!(migration1_file, migration1_content) + + # Create second migration with concurrent index + migration2_file = Path.join(@temp_migrations_dir, "20250101000002_add_concurrent_index.exs") + migration2_content = """ + defmodule AshPostgres.TestRepo.TempTenantMigrations.AddConcurrentIndex do + use Ecto.Migration + + @disable_ddl_transaction true + + def up do + create index(:first_table, [:name], concurrently: true, name: :name_concurrent_index, prefix: prefix()) + end + + def down do + drop index(:first_table, [:name], name: :name_concurrent_index, prefix: prefix()) + end + end + """ + + File.write!(migration2_file, migration2_content) + + # Create tenant - this should succeed with both migrations + assert :ok = try_create_tenant(tenant_name, @temp_migrations_dir) + + # Verify schema exists + assert schema_exists?(tenant_name) + + # Verify table exists + assert table_exists?(tenant_name, "first_table") + + # Verify both indexes exist + assert index_exists?(tenant_name, "first_table", "first_table_name_index") + assert index_exists?(tenant_name, "first_table", "name_concurrent_index") + end + + test "migrate_tenant handles multiple concurrent indexes in one migration" do + tenant_name = "test_tenant_multiple_concurrent" + migration_file = Path.join(@temp_migrations_dir, "20250101000001_multiple_concurrent_indexes.exs") + + migration_content = """ + defmodule AshPostgres.TestRepo.TempTenantMigrations.MultipleConcurrentIndexes do + use Ecto.Migration + + @disable_ddl_transaction true + + def up do + create table(:multi_table, primary_key: false, prefix: prefix()) do + add :id, :uuid, null: false, default: fragment("uuid_generate_v4()"), primary_key: true + add :name, :text + add :email, :text + add :phone, :text + end + + create index(:multi_table, [:email], concurrently: true, prefix: prefix()) + create index(:multi_table, [:phone], concurrently: true, prefix: prefix()) + end + + def down do + drop index(:multi_table, [:phone], prefix: prefix()) + drop index(:multi_table, [:email], prefix: prefix()) + drop table(:multi_table, prefix: prefix()) + end + end + """ + + File.write!(migration_file, migration_content) + + assert :ok = try_create_tenant(tenant_name, @temp_migrations_dir) + + assert schema_exists?(tenant_name) + assert table_exists?(tenant_name, "multi_table") + assert index_exists?(tenant_name, "multi_table", "multi_table_email_index") + assert index_exists?(tenant_name, "multi_table", "multi_table_phone_index") + end + end + + # Helper functions + + defp try_create_tenant(tenant_name, migrations_path) do + # Ensure `create_tenant!/2` will read migrations from the supplied path + original_repo_env = Application.get_env(:ash_postgres, AshPostgres.TestRepo, []) + + try do + Application.put_env( + :ash_postgres, + AshPostgres.TestRepo, + Keyword.put(original_repo_env, :tenant_migrations_path, migrations_path) + ) + + MultiTenancy.create_tenant!(tenant_name, AshPostgres.TestRepo) + :ok + rescue + e -> + flunk("Failed to create tenant #{tenant_name}: #{inspect(e)}") + after + # Restore repo config even on failure + Application.put_env(:ash_postgres, AshPostgres.TestRepo, original_repo_env) + end + end + + defp try_migrate_tenant(tenant_name, migrations_path) do + try do + MultiTenancy.migrate_tenant(tenant_name, AshPostgres.TestRepo, migrations_path) + :ok + rescue + e -> + flunk("Failed to migrate tenant #{tenant_name}: #{inspect(e)}") + end + end + + defp schema_exists?(schema_name) do + result = + Ecto.Adapters.SQL.query!( + AshPostgres.TestRepo, + """ + SELECT schema_name + FROM information_schema.schemata + WHERE schema_name = $1 + """, + [schema_name] + ) + + case result.rows do + [[^schema_name]] -> true + _ -> false + end + end + + defp table_exists?(schema_name, table_name) do + result = + Ecto.Adapters.SQL.query!( + AshPostgres.TestRepo, + """ + SELECT table_name + FROM information_schema.tables + WHERE table_schema = $1 AND table_name = $2 + """, + [schema_name, table_name] + ) + + case result.rows do + [[^table_name]] -> true + _ -> false + end + end + + defp index_exists?(schema_name, table_name, index_name) do + result = + Ecto.Adapters.SQL.query!( + AshPostgres.TestRepo, + """ + SELECT indexname + FROM pg_indexes + WHERE schemaname = $1 AND tablename = $2 AND indexname = $3 + """, + [schema_name, table_name, index_name] + ) + + case result.rows do + [[^index_name]] -> true + _ -> false + end + end +end From 4862d5c75d17f34fc7106f371991bb88ebc3246a Mon Sep 17 00:00:00 2001 From: rylandbangerter Date: Mon, 23 Feb 2026 13:13:16 -0700 Subject: [PATCH 2/2] Address PR feedback: use only module metadata for disable_ddl_transaction check Co-authored-by: Cursor --- lib/multitenancy.ex | 34 ++++++++++++---------------------- 1 file changed, 12 insertions(+), 22 deletions(-) diff --git a/lib/multitenancy.ex b/lib/multitenancy.ex index a2dcbd83..c554c0a7 100644 --- a/lib/multitenancy.ex +++ b/lib/multitenancy.ex @@ -45,7 +45,7 @@ defmodule AshPostgres.MultiTenancy do |> Enum.filter(& &1) |> Enum.map(&load_migration_with_file!/1) |> Enum.each(fn {version, mod, file} -> - requires_no_transaction? = migration_requires_no_transaction?(file, mod) + requires_no_transaction? = migration_requires_no_transaction?(mod) if requires_no_transaction? do # For migrations that require no transaction (e.g., concurrent indexes), @@ -133,27 +133,17 @@ defmodule AshPostgres.MultiTenancy do ~r/^[a-zA-Z0-9_-]+$/ end - # Check if a migration requires no transaction by examining the file content - # and the migration module attributes - defp migration_requires_no_transaction?(file, mod) do - # First check the file content for @disable_ddl_transaction - file_content = File.read!(file) - has_disable_ddl_transaction? = String.contains?(file_content, "@disable_ddl_transaction") - - # Also check if the module has the attribute set (for compiled modules) - has_module_attribute? = - try do - if function_exported?(mod, :__migration__, 0) do - migration_info = mod.__migration__() - Map.get(migration_info, :disable_ddl_transaction, false) - else - false - end - rescue - _ -> false - end - - has_disable_ddl_transaction? || has_module_attribute? + # Check if a migration requires no transaction by examining the compiled module's + # migration metadata. The module is already compiled at this point, so we ask + # the module directly rather than reading the file. This also catches cases + # where the attribute is set programmatically via Module.put_attribute/3. + defp migration_requires_no_transaction?(mod) do + if function_exported?(mod, :__migration__, 0) do + migration_info = mod.__migration__() + Map.get(migration_info, :disable_ddl_transaction, false) + else + false + end end # Run a migration that requires no transaction outside of any transaction context