diff --git a/src/environmentd/src/test_util.rs b/src/environmentd/src/test_util.rs index b6f81b542b5a..9a9f79ff2716 100644 --- a/src/environmentd/src/test_util.rs +++ b/src/environmentd/src/test_util.rs @@ -1051,6 +1051,7 @@ pub async fn get_explain_timestamp_determination( /// WARNING: If multiple tests use this, and the tests are run in parallel, then make sure the test /// use different postgres tables. pub async fn create_postgres_source_with_table<'a>( + server: &TestServer, mz_client: &Client, table_name: &str, table_schema: &str, @@ -1059,6 +1060,10 @@ pub async fn create_postgres_source_with_table<'a>( Client, impl FnOnce(&'a Client, &'a Client) -> LocalBoxFuture<'a, ()>, ) { + server + .enable_feature_flags(&["enable_create_table_from_source"]) + .await; + let postgres_url = env::var("POSTGRES_URL") .map_err(|_| anyhow!("POSTGRES_URL environment variable is not set")) .unwrap(); @@ -1138,8 +1143,15 @@ pub async fn create_postgres_source_with_table<'a>( "CREATE SOURCE {source_name} FROM POSTGRES CONNECTION pgconn - (PUBLICATION '{source_name}') - FOR TABLES ({table_name});" + (PUBLICATION '{source_name}')" + )) + .await + .unwrap(); + mz_client + .batch_execute(&format!( + "CREATE TABLE {table_name} + FROM SOURCE {source_name} + (REFERENCE {table_name});" )) .await .unwrap(); diff --git a/src/environmentd/tests/sql.rs b/src/environmentd/tests/sql.rs index 9bb99474a308..7d1e1036651b 100644 --- a/src/environmentd/tests/sql.rs +++ b/src/environmentd/tests/sql.rs @@ -1760,9 +1760,14 @@ async fn test_timeline_read_holds() { let view_name = "v_hold"; let source_name = "source_hold"; - let (pg_client, cleanup_fn) = - test_util::create_postgres_source_with_table(&mz_client, view_name, "(a INT)", source_name) - .await; + let (pg_client, cleanup_fn) = test_util::create_postgres_source_with_table( + &server, + &mz_client, + view_name, + "(a INT)", + source_name, + ) + .await; // Create user table in Materialize. mz_client @@ -1836,6 +1841,7 @@ async fn test_session_linearizability(isolation_level: &str) { let pg_table_name = "v_lin"; let pg_source_name = "source_lin"; let (pg_client, cleanup_fn) = test_util::create_postgres_source_with_table( + &server, &mz_client, pg_table_name, "(a INT)",