From 84576e3bf2d6364b3ec693d7ec42ef7524c7e6cb Mon Sep 17 00:00:00 2001 From: Daniel Harrison <52528+danhhz@users.noreply.github.com> Date: Fri, 6 Sep 2024 08:43:39 -0700 Subject: [PATCH 1/3] ct: support CREATE CONTINUAL TASK in parser --- src/sql-lexer/src/keywords.txt | 2 + src/sql-parser/src/ast/defs/statement.rs | 64 ++++++++++++++++-- src/sql-parser/src/parser.rs | 69 ++++++++++++++++++++ src/sql-parser/tests/testdata/continual-task | 39 +++++++++++ 4 files changed, 170 insertions(+), 4 deletions(-) create mode 100644 src/sql-parser/tests/testdata/continual-task diff --git a/src/sql-lexer/src/keywords.txt b/src/sql-lexer/src/keywords.txt index 9199c08546729..abb29501cfee1 100644 --- a/src/sql-lexer/src/keywords.txt +++ b/src/sql-lexer/src/keywords.txt @@ -95,6 +95,7 @@ Confluent Connection Connections Constraint +Continual Copy Count Counter @@ -423,6 +424,7 @@ System Table Tables Tail +Task Temp Temporary Text diff --git a/src/sql-parser/src/ast/defs/statement.rs b/src/sql-parser/src/ast/defs/statement.rs index ce0501ca01e2a..3c914f6d5dca5 100644 --- a/src/sql-parser/src/ast/defs/statement.rs +++ b/src/sql-parser/src/ast/defs/statement.rs @@ -29,10 +29,11 @@ use crate::ast::display::{self, AstDisplay, AstFormatter, WithOptionName}; use crate::ast::{ AstInfo, ColumnDef, ConnectionOption, ConnectionOptionName, CreateConnectionOption, CreateConnectionType, CreateSinkConnection, CreateSourceConnection, CreateSourceOption, - CreateSourceOptionName, DeferredItemName, Expr, Format, FormatSpecifier, Ident, IntervalValue, - KeyConstraint, MaterializedViewOption, Query, SelectItem, SinkEnvelope, SourceEnvelope, - SourceIncludeMetadata, SubscribeOutput, TableAlias, TableConstraint, TableWithJoins, - UnresolvedDatabaseName, UnresolvedItemName, UnresolvedObjectName, UnresolvedSchemaName, Value, + CreateSourceOptionName, CteMutRecColumnDef, DeferredItemName, Expr, Format, FormatSpecifier, + Ident, IntervalValue, KeyConstraint, MaterializedViewOption, Query, SelectItem, SinkEnvelope, + SourceEnvelope, SourceIncludeMetadata, SubscribeOutput, TableAlias, TableConstraint, + TableWithJoins, UnresolvedDatabaseName, UnresolvedItemName, UnresolvedObjectName, + UnresolvedSchemaName, Value, }; /// A top-level statement (SELECT, INSERT, CREATE, etc.) @@ -54,6 +55,7 @@ pub enum Statement { CreateSink(CreateSinkStatement), CreateView(CreateViewStatement), CreateMaterializedView(CreateMaterializedViewStatement), + CreateContinualTask(CreateContinualTaskStatement), CreateTable(CreateTableStatement), CreateTableFromSource(CreateTableFromSourceStatement), CreateIndex(CreateIndexStatement), @@ -127,6 +129,7 @@ impl AstDisplay for Statement { Statement::CreateSink(stmt) => f.write_node(stmt), Statement::CreateView(stmt) => f.write_node(stmt), Statement::CreateMaterializedView(stmt) => f.write_node(stmt), + Statement::CreateContinualTask(stmt) => f.write_node(stmt), Statement::CreateTable(stmt) => f.write_node(stmt), Statement::CreateTableFromSource(stmt) => f.write_node(stmt), Statement::CreateIndex(stmt) => f.write_node(stmt), @@ -203,6 +206,7 @@ pub fn statement_kind_label_value(kind: StatementKind) -> &'static str { StatementKind::CreateSink => "create_sink", StatementKind::CreateView => "create_view", StatementKind::CreateMaterializedView => "create_materialized_view", + StatementKind::CreateContinualTask => "create_continual_task", StatementKind::CreateTable => "create_table", StatementKind::CreateTableFromSource => "create_table_from_source", StatementKind::CreateIndex => "create_index", @@ -1391,6 +1395,58 @@ impl AstDisplay for CreateMaterializedViewStatement { } impl_display_t!(CreateMaterializedViewStatement); +/// `CREATE CONTINUAL TASK` +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct CreateContinualTaskStatement { + pub name: UnresolvedItemName, + pub columns: Vec>, + pub in_cluster: Option, + + // The thing we get input diffs from + pub input: T::ItemName, + + // The txn to execute on each set of diffs + pub stmts: Vec>, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum ContinualTaskStmt { + Delete(DeleteStatement), + Insert(InsertStatement), + // TODO(ct): Update/upsert? +} + +impl AstDisplay for CreateContinualTaskStatement { + fn fmt(&self, f: &mut AstFormatter) { + f.write_str("CREATE CONTINUAL TASK "); + f.write_node(&self.name); + f.write_str(" ("); + f.write_node(&display::comma_separated(&self.columns)); + f.write_str(")"); + + if let Some(cluster) = &self.in_cluster { + f.write_str(" IN CLUSTER "); + f.write_node(cluster); + } + + f.write_str(" ON INPUT "); + f.write_node(&self.input); + + f.write_str(" AS ("); + for (idx, stmt) in self.stmts.iter().enumerate() { + if idx > 0 { + f.write_str("; "); + } + match stmt { + ContinualTaskStmt::Delete(stmt) => f.write_node(stmt), + ContinualTaskStmt::Insert(stmt) => f.write_node(stmt), + } + } + f.write_str(")"); + } +} +impl_display_t!(CreateContinualTaskStatement); + /// `ALTER SET CLUSTER` #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct AlterSetClusterStatement { diff --git a/src/sql-parser/src/parser.rs b/src/sql-parser/src/parser.rs index 66c61a33de552..a5e04ae167e2f 100644 --- a/src/sql-parser/src/parser.rs +++ b/src/sql-parser/src/parser.rs @@ -1889,6 +1889,9 @@ impl<'a> Parser<'a> { { self.parse_create_materialized_view() .map_parser_err(StatementKind::CreateMaterializedView) + } else if self.peek_keywords(&[CONTINUAL, TASK]) { + self.parse_create_continual_task() + .map_parser_err(StatementKind::CreateContinualTask) } else if self.peek_keywords(&[USER]) { parser_err!( self, @@ -3576,6 +3579,72 @@ impl<'a> Parser<'a> { )) } + fn parse_create_continual_task(&mut self) -> Result, ParserError> { + // TODO(ct): If exists. + self.expect_keywords(&[CONTINUAL, TASK])?; + + // TODO(ct): Multiple outputs. + let name = self.parse_item_name()?; + self.expect_token(&Token::LParen)?; + let columns = self.parse_comma_separated(|parser| { + Ok(CteMutRecColumnDef { + name: parser.parse_identifier()?, + data_type: parser.parse_data_type()?, + }) + })?; + self.expect_token(&Token::RParen)?; + let in_cluster = self.parse_optional_in_cluster()?; + + // TODO(ct): Multiple inputs. + self.expect_keywords(&[ON, INPUT])?; + let input_table = self.parse_raw_name()?; + // TODO(ct): Allow renaming the inserts/deletes so that we can use + // something as both an "input" and a "reference". + + self.expect_keyword(AS)?; + + let mut stmts = Vec::new(); + let mut expecting_statement_delimiter = false; + self.expect_token(&Token::LParen)?; + // TODO(ct): Dedup this with parse_statements? + loop { + // ignore empty statements (between successive statement delimiters) + while self.consume_token(&Token::Semicolon) { + expecting_statement_delimiter = false; + } + + if self.consume_token(&Token::RParen) { + break; + } else if expecting_statement_delimiter { + self.expected(self.peek_pos(), "end of statement", self.peek_token())? + } + + let stmt = self.parse_statement().map_err(|err| err.error)?.ast; + match stmt { + Statement::Delete(stmt) => stmts.push(ContinualTaskStmt::Delete(stmt)), + Statement::Insert(stmt) => stmts.push(ContinualTaskStmt::Insert(stmt)), + _ => { + return parser_err!( + self, + self.peek_prev_pos(), + "unsupported query in CREATE CONTINUAL TASK" + ); + } + } + expecting_statement_delimiter = true; + } + + Ok(Statement::CreateContinualTask( + CreateContinualTaskStatement { + name, + columns, + in_cluster, + input: input_table, + stmts, + }, + )) + } + fn parse_materialized_view_option_name( &mut self, ) -> Result { diff --git a/src/sql-parser/tests/testdata/continual-task b/src/sql-parser/tests/testdata/continual-task new file mode 100644 index 0000000000000..858f54654e6ec --- /dev/null +++ b/src/sql-parser/tests/testdata/continual-task @@ -0,0 +1,39 @@ +# Copyright 2020 sqlparser-rs contributors. All rights reserved. +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# This file is derived from the sqlparser-rs project, available at +# https://github.com/andygrove/sqlparser-rs. It was incorporated +# directly into Materialize on December 21, 2019. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License in the LICENSE file at the +# root of this repository, or online at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +parse-statement +CREATE CONTINUAL TASK foo (key int, val int) ON INPUT append_only AS ( + DELETE FROM output WHERE key IN (SELECT key FROM inserts); + INSERT INTO output SELECT key, max(value) FROM inserts GROUP BY key; +); +---- +CREATE CONTINUAL TASK foo (key int4, val int4) ON INPUT append_only AS (DELETE FROM output WHERE key IN (SELECT key FROM inserts); INSERT INTO output SELECT key, max(value) FROM inserts GROUP BY key) +=> +CreateContinualTask(CreateContinualTaskStatement { name: UnresolvedItemName([Ident("foo")]), columns: [CteMutRecColumnDef { name: Ident("key"), data_type: Other { name: Name(UnresolvedItemName([Ident("int4")])), typ_mod: [] } }, CteMutRecColumnDef { name: Ident("val"), data_type: Other { name: Name(UnresolvedItemName([Ident("int4")])), typ_mod: [] } }], in_cluster: None, input: Name(UnresolvedItemName([Ident("append_only")])), stmts: [Delete(DeleteStatement { table_name: Name(UnresolvedItemName([Ident("output")])), alias: None, using: [], selection: Some(InSubquery { expr: Identifier([Ident("key")]), subquery: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Identifier([Ident("key")]), alias: None }], from: [TableWithJoins { relation: Table { name: Name(UnresolvedItemName([Ident("inserts")])), alias: None }, joins: [] }], selection: None, group_by: [], having: None, options: [] }), order_by: [], limit: None, offset: None }, negated: false }) }), Insert(InsertStatement { table_name: Name(UnresolvedItemName([Ident("output")])), columns: [], source: Query(Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Identifier([Ident("key")]), alias: None }, Expr { expr: Function(Function { name: Name(UnresolvedItemName([Ident("max")])), args: Args { args: [Identifier([Ident("value")])], order_by: [] }, filter: None, over: None, distinct: false }), alias: None }], from: [TableWithJoins { relation: Table { name: Name(UnresolvedItemName([Ident("inserts")])), alias: None }, joins: [] }], selection: None, group_by: [Identifier([Ident("key")])], having: None, options: [] }), order_by: [], limit: None, offset: None }), returning: [] })] }) + +parse-statement +CREATE CONTINUAL TASK "materialize"."public"."upsert" ("key" [s20 AS "pg_catalog"."int4"], "val" [s20 AS "pg_catalog"."int4"]) IN CLUSTER [s1] ON INPUT [u1 AS "materialize"."public"."append_only"] AS ( + DELETE FROM "upsert" WHERE "key" IN (SELECT "key" FROM [u1 AS "materialize"."public"."append_only"]); + INSERT INTO "upsert" SELECT "key", "pg_catalog"."max"("val") FROM [u1 AS "materialize"."public"."append_only"] GROUP BY "key" +) +---- +CREATE CONTINUAL TASK materialize.public.upsert (key [s20 AS pg_catalog.int4], val [s20 AS pg_catalog.int4]) IN CLUSTER [s1] ON INPUT [u1 AS materialize.public.append_only] AS (DELETE FROM upsert WHERE key IN (SELECT key FROM [u1 AS materialize.public.append_only]); INSERT INTO upsert SELECT key, pg_catalog.max(val) FROM [u1 AS materialize.public.append_only] GROUP BY key) +=> +CreateContinualTask(CreateContinualTaskStatement { name: UnresolvedItemName([Ident("materialize"), Ident("public"), Ident("upsert")]), columns: [CteMutRecColumnDef { name: Ident("key"), data_type: Other { name: Id("s20", UnresolvedItemName([Ident("pg_catalog"), Ident("int4")])), typ_mod: [] } }, CteMutRecColumnDef { name: Ident("val"), data_type: Other { name: Id("s20", UnresolvedItemName([Ident("pg_catalog"), Ident("int4")])), typ_mod: [] } }], in_cluster: Some(Resolved("s1")), input: Id("u1", UnresolvedItemName([Ident("materialize"), Ident("public"), Ident("append_only")])), stmts: [Delete(DeleteStatement { table_name: Name(UnresolvedItemName([Ident("upsert")])), alias: None, using: [], selection: Some(InSubquery { expr: Identifier([Ident("key")]), subquery: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Identifier([Ident("key")]), alias: None }], from: [TableWithJoins { relation: Table { name: Id("u1", UnresolvedItemName([Ident("materialize"), Ident("public"), Ident("append_only")])), alias: None }, joins: [] }], selection: None, group_by: [], having: None, options: [] }), order_by: [], limit: None, offset: None }, negated: false }) }), Insert(InsertStatement { table_name: Name(UnresolvedItemName([Ident("upsert")])), columns: [], source: Query(Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Identifier([Ident("key")]), alias: None }, Expr { expr: Function(Function { name: Name(UnresolvedItemName([Ident("pg_catalog"), Ident("max")])), args: Args { args: [Identifier([Ident("val")])], order_by: [] }, filter: None, over: None, distinct: false }), alias: None }], from: [TableWithJoins { relation: Table { name: Id("u1", UnresolvedItemName([Ident("materialize"), Ident("public"), Ident("append_only")])), alias: None }, joins: [] }], selection: None, group_by: [Identifier([Ident("key")])], having: None, options: [] }), order_by: [], limit: None, offset: None }), returning: [] })] }) From 3dd23d4e6b44679290a900ee431647988953fd92 Mon Sep 17 00:00:00 2001 From: Daniel Harrison <52528+danhhz@users.noreply.github.com> Date: Tue, 3 Sep 2024 09:39:35 -0700 Subject: [PATCH 2/3] ct: establish CREATE CONTINUAL TASK plumbing --- src/adapter/src/catalog/state.rs | 7 ++- src/adapter/src/command.rs | 5 ++ src/adapter/src/coord/catalog_serving.rs | 1 + src/adapter/src/coord/command_handler.rs | 1 + src/adapter/src/coord/sequencer.rs | 6 ++ src/adapter/src/coord/sequencer/inner.rs | 1 + .../sequencer/inner/create_continual_task.rs | 29 ++++++++++ src/adapter/src/statement_logging.rs | 1 + src/catalog/src/durable/objects.rs | 5 ++ src/compute-client/src/controller/instance.rs | 3 + src/compute-types/src/sinks.proto | 4 ++ src/compute-types/src/sinks.rs | 25 +++++++++ src/compute/src/render/sinks.rs | 4 ++ src/compute/src/sink.rs | 1 + src/compute/src/sink/continual_task.rs | 52 ++++++++++++++++++ src/environmentd/src/http/sql.rs | 1 + src/pgwire/src/protocol.rs | 1 + src/sql/src/normalize.rs | 22 ++++++-- src/sql/src/plan.rs | 6 ++ src/sql/src/plan/statement.rs | 3 + src/sql/src/plan/statement/ddl.rs | 55 ++++++++++++------- src/sql/src/rbac.rs | 1 + 22 files changed, 205 insertions(+), 29 deletions(-) create mode 100644 src/adapter/src/coord/sequencer/inner/create_continual_task.rs create mode 100644 src/compute/src/sink/continual_task.rs diff --git a/src/adapter/src/catalog/state.rs b/src/adapter/src/catalog/state.rs index ae2047c00eb2c..7bfeabc22a93e 100644 --- a/src/adapter/src/catalog/state.rs +++ b/src/adapter/src/catalog/state.rs @@ -65,9 +65,9 @@ use mz_sql::names::{ ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId, }; use mz_sql::plan::{ - CreateConnectionPlan, CreateIndexPlan, CreateMaterializedViewPlan, CreateSecretPlan, - CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, Params, - Plan, PlanContext, + CreateConnectionPlan, CreateContinualTaskPlan, CreateIndexPlan, CreateMaterializedViewPlan, + CreateSecretPlan, CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan, + CreateViewPlan, Params, Plan, PlanContext, }; use mz_sql::rbac; use mz_sql::session::metadata::SessionMetadata; @@ -946,6 +946,7 @@ impl CatalogState { initial_as_of, }) } + Plan::CreateContinualTask(CreateContinualTaskPlan {}) => todo!("WIP"), Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index { create_sql: index.create_sql, on: index.on, diff --git a/src/adapter/src/command.rs b/src/adapter/src/command.rs index 0f5e487824561..42f13fadfb92f 100644 --- a/src/adapter/src/command.rs +++ b/src/adapter/src/command.rs @@ -323,6 +323,8 @@ pub enum ExecuteResponse { CreatedViews, /// The requested materialized view was created. CreatedMaterializedView, + /// The requested continual task was created. + CreatedContinualTask, /// The requested type was created. CreatedType, /// The requested prepared statement was removed. @@ -490,6 +492,7 @@ impl TryInto for ExecuteResponseKind { ExecuteResponseKind::CreatedMaterializedView => { Ok(ExecuteResponse::CreatedMaterializedView) } + ExecuteResponseKind::CreatedContinualTask => Ok(ExecuteResponse::CreatedContinualTask), ExecuteResponseKind::CreatedType => Ok(ExecuteResponse::CreatedType), ExecuteResponseKind::Deallocate => Err(()), ExecuteResponseKind::DeclaredCursor => Ok(ExecuteResponse::DeclaredCursor), @@ -551,6 +554,7 @@ impl ExecuteResponse { CreatedView { .. } => Some("CREATE VIEW".into()), CreatedViews { .. } => Some("CREATE VIEWS".into()), CreatedMaterializedView { .. } => Some("CREATE MATERIALIZED VIEW".into()), + CreatedContinualTask { .. } => Some("CREATE CONTINUAL TASK".into()), CreatedType => Some("CREATE TYPE".into()), Deallocate { all } => Some(format!("DEALLOCATE{}", if *all { " ALL" } else { "" })), DeclaredCursor => Some("DECLARE CURSOR".into()), @@ -639,6 +643,7 @@ impl ExecuteResponse { CreateTable => &[CreatedTable], CreateView => &[CreatedView], CreateMaterializedView => &[CreatedMaterializedView], + CreateContinualTask => &[CreatedContinualTask], CreateIndex => &[CreatedIndex], CreateType => &[CreatedType], PlanKind::Deallocate => &[ExecuteResponseKind::Deallocate], diff --git a/src/adapter/src/coord/catalog_serving.rs b/src/adapter/src/coord/catalog_serving.rs index fddcbe3270261..167c105ffc077 100644 --- a/src/adapter/src/coord/catalog_serving.rs +++ b/src/adapter/src/coord/catalog_serving.rs @@ -74,6 +74,7 @@ pub fn auto_run_on_catalog_server<'a, 's, 'p>( | Plan::CreateRole(_) | Plan::CreateCluster(_) | Plan::CreateClusterReplica(_) + | Plan::CreateContinualTask(_) | Plan::CreateSource(_) | Plan::CreateSources(_) | Plan::CreateSecret(_) diff --git a/src/adapter/src/coord/command_handler.rs b/src/adapter/src/coord/command_handler.rs index b5adeb9cd1d13..095b9c897fcac 100644 --- a/src/adapter/src/coord/command_handler.rs +++ b/src/adapter/src/coord/command_handler.rs @@ -671,6 +671,7 @@ impl Coordinator { | Statement::CreateDatabase(_) | Statement::CreateIndex(_) | Statement::CreateMaterializedView(_) + | Statement::CreateContinualTask(_) | Statement::CreateRole(_) | Statement::CreateSchema(_) | Statement::CreateSecret(_) diff --git a/src/adapter/src/coord/sequencer.rs b/src/adapter/src/coord/sequencer.rs index 4480891ce196e..361b7a6e94d0d 100644 --- a/src/adapter/src/coord/sequencer.rs +++ b/src/adapter/src/coord/sequencer.rs @@ -213,6 +213,12 @@ impl Coordinator { self.sequence_create_materialized_view(ctx, plan, resolved_ids) .await; } + Plan::CreateContinualTask(plan) => { + let res = self + .sequence_create_continual_task(ctx.session(), plan, resolved_ids) + .await; + ctx.retire(res); + } Plan::CreateIndex(plan) => { self.sequence_create_index(ctx, plan, resolved_ids).await; } diff --git a/src/adapter/src/coord/sequencer/inner.rs b/src/adapter/src/coord/sequencer/inner.rs index c51498ea86c16..e957bd362a09c 100644 --- a/src/adapter/src/coord/sequencer/inner.rs +++ b/src/adapter/src/coord/sequencer/inner.rs @@ -112,6 +112,7 @@ use crate::util::{viewable_variables, ClientTransmitter, ResultExt}; use crate::{guard_write_critical_section, PeekResponseUnary, ReadHolds}; mod cluster; +mod create_continual_task; mod create_index; mod create_materialized_view; mod create_view; diff --git a/src/adapter/src/coord/sequencer/inner/create_continual_task.rs b/src/adapter/src/coord/sequencer/inner/create_continual_task.rs new file mode 100644 index 0000000000000..f94fa182182aa --- /dev/null +++ b/src/adapter/src/coord/sequencer/inner/create_continual_task.rs @@ -0,0 +1,29 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use mz_ore::instrument; +use mz_sql::names::ResolvedIds; +use mz_sql::plan; + +use crate::command::ExecuteResponse; +use crate::coord::Coordinator; +use crate::error::AdapterError; +use crate::session::Session; + +impl Coordinator { + #[instrument] + pub(crate) async fn sequence_create_continual_task( + &mut self, + session: &Session, + plan: plan::CreateContinualTaskPlan, + resolved_ids: ResolvedIds, + ) -> Result { + todo!("WIP {:?}", (session, plan, resolved_ids)); + } +} diff --git a/src/adapter/src/statement_logging.rs b/src/adapter/src/statement_logging.rs index 77b93efb82db7..cac4c3450a043 100644 --- a/src/adapter/src/statement_logging.rs +++ b/src/adapter/src/statement_logging.rs @@ -207,6 +207,7 @@ impl From<&ExecuteResponse> for StatementEndedExecutionReason { | ExecuteResponse::CreatedView | ExecuteResponse::CreatedViews | ExecuteResponse::CreatedMaterializedView + | ExecuteResponse::CreatedContinualTask | ExecuteResponse::CreatedType | ExecuteResponse::Deallocate { .. } | ExecuteResponse::DeclaredCursor diff --git a/src/catalog/src/durable/objects.rs b/src/catalog/src/durable/objects.rs index 3e4723b5e1443..92bc20ba8bb33 100644 --- a/src/catalog/src/durable/objects.rs +++ b/src/catalog/src/durable/objects.rs @@ -1150,6 +1150,11 @@ impl ItemValue { assert_eq!(tokens.next(), Some("VIEW")); CatalogItemType::MaterializedView } + Some("CONTINUAL") => { + assert_eq!(tokens.next(), Some("TASK")); + // TODO(ct): CatalogItemType::ContinualTask + CatalogItemType::MaterializedView + } Some("INDEX") => CatalogItemType::Index, Some("TYPE") => CatalogItemType::Type, Some("FUNCTION") => CatalogItemType::Func, diff --git a/src/compute-client/src/controller/instance.rs b/src/compute-client/src/controller/instance.rs index a435b0b1d280d..fa607a0a98c08 100644 --- a/src/compute-client/src/controller/instance.rs +++ b/src/compute-client/src/controller/instance.rs @@ -1289,6 +1289,9 @@ where }; ComputeSinkConnection::Persist(conn) } + ComputeSinkConnection::ContinualTask(conn) => { + todo!("WIP {:?}", conn); + } ComputeSinkConnection::Subscribe(conn) => ComputeSinkConnection::Subscribe(conn), ComputeSinkConnection::CopyToS3Oneshot(conn) => { ComputeSinkConnection::CopyToS3Oneshot(conn) diff --git a/src/compute-types/src/sinks.proto b/src/compute-types/src/sinks.proto index 200a0738133be..e55398971c505 100644 --- a/src/compute-types/src/sinks.proto +++ b/src/compute-types/src/sinks.proto @@ -38,6 +38,7 @@ message ProtoComputeSinkConnection { google.protobuf.Empty subscribe = 1; ProtoPersistSinkConnection persist = 2; ProtoCopyToS3OneshotSinkConnection copy_to_s3_oneshot = 3; + ProtoContinualTaskConnection continual_task = 4; } } @@ -46,6 +47,9 @@ message ProtoPersistSinkConnection { mz_storage_types.controller.ProtoCollectionMetadata storage_metadata = 2; } +message ProtoContinualTaskConnection { +} + message ProtoCopyToS3OneshotSinkConnection { mz_storage_types.sinks.ProtoS3UploadInfo upload_info = 1; mz_storage_types.connections.aws.ProtoAwsConnection aws_connection = 2; diff --git a/src/compute-types/src/sinks.rs b/src/compute-types/src/sinks.rs index bccab841efba7..9a3e2335c8e52 100644 --- a/src/compute-types/src/sinks.rs +++ b/src/compute-types/src/sinks.rs @@ -119,6 +119,8 @@ pub enum ComputeSinkConnection { Subscribe(SubscribeSinkConnection), /// TODO(#25239): Add documentation. Persist(PersistSinkConnection), + /// TODO(#25239): Add documentation. + ContinualTask(ContinualTaskConnection), /// A compute sink to do a oneshot copy to s3. CopyToS3Oneshot(CopyToS3OneshotSinkConnection), } @@ -129,6 +131,7 @@ impl ComputeSinkConnection { match self { ComputeSinkConnection::Subscribe(_) => "subscribe", ComputeSinkConnection::Persist(_) => "persist", + ComputeSinkConnection::ContinualTask(_) => "continual_task", ComputeSinkConnection::CopyToS3Oneshot(_) => "copy_to_s3_oneshot", } } @@ -150,6 +153,9 @@ impl RustType for ComputeSinkConnection Kind::Subscribe(()), ComputeSinkConnection::Persist(persist) => Kind::Persist(persist.into_proto()), + ComputeSinkConnection::ContinualTask(continual_task) => { + Kind::ContinualTask(continual_task.into_proto()) + } ComputeSinkConnection::CopyToS3Oneshot(s3) => { Kind::CopyToS3Oneshot(s3.into_proto()) } @@ -165,6 +171,9 @@ impl RustType for ComputeSinkConnection ComputeSinkConnection::Subscribe(SubscribeSinkConnection {}), Kind::Persist(persist) => ComputeSinkConnection::Persist(persist.into_rust()?), + Kind::ContinualTask(continual_task) => { + ComputeSinkConnection::ContinualTask(continual_task.into_rust()?) + } Kind::CopyToS3Oneshot(s3) => ComputeSinkConnection::CopyToS3Oneshot(s3.into_rust()?), }) } @@ -243,3 +252,19 @@ impl RustType for PersistSinkConnection { + _phantom: std::marker::PhantomData, +} + +impl RustType for ContinualTaskConnection { + fn into_proto(&self) -> ProtoContinualTaskConnection { + todo!("WIP"); + } + + fn from_proto(proto: ProtoContinualTaskConnection) -> Result { + todo!("WIP {:?}", proto); + } +} diff --git a/src/compute/src/render/sinks.rs b/src/compute/src/render/sinks.rs index 08b77708bdf27..f6a72eaf20bfe 100644 --- a/src/compute/src/render/sinks.rs +++ b/src/compute/src/render/sinks.rs @@ -122,6 +122,9 @@ where let region_name = match sink.connection { ComputeSinkConnection::Subscribe(_) => format!("SubscribeSink({:?})", sink_id), ComputeSinkConnection::Persist(_) => format!("PersistSink({:?})", sink_id), + ComputeSinkConnection::ContinualTask(_) => { + format!("ContinualTask({:?})", sink_id) + } ComputeSinkConnection::CopyToS3Oneshot(_) => { format!("CopyToS3OneshotSink({:?})", sink_id) } @@ -178,6 +181,7 @@ where match connection { ComputeSinkConnection::Subscribe(connection) => Box::new(connection.clone()), ComputeSinkConnection::Persist(connection) => Box::new(connection.clone()), + ComputeSinkConnection::ContinualTask(connection) => Box::new(connection.clone()), ComputeSinkConnection::CopyToS3Oneshot(connection) => Box::new(connection.clone()), } } diff --git a/src/compute/src/sink.rs b/src/compute/src/sink.rs index 753a42b84b95a..b7c6e61d4fd63 100644 --- a/src/compute/src/sink.rs +++ b/src/compute/src/sink.rs @@ -7,6 +7,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +pub(crate) mod continual_task; mod copy_to_s3_oneshot; mod correction; mod persist_sink; diff --git a/src/compute/src/sink/continual_task.rs b/src/compute/src/sink/continual_task.rs new file mode 100644 index 0000000000000..836dfb2332670 --- /dev/null +++ b/src/compute/src/sink/continual_task.rs @@ -0,0 +1,52 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::any::Any; +use std::rc::Rc; + +use differential_dataflow::Collection; +use mz_compute_types::sinks::{ComputeSinkDesc, ContinualTaskConnection}; +use mz_repr::{Diff, GlobalId, Row, Timestamp}; +use mz_storage_types::controller::CollectionMetadata; +use mz_storage_types::errors::DataflowError; +use timely::dataflow::Scope; +use timely::progress::Antichain; + +use crate::compute_state::ComputeState; +use crate::render::sinks::SinkRender; +use crate::render::StartSignal; + +impl SinkRender for ContinualTaskConnection +where + G: Scope, +{ + fn render_sink( + &self, + compute_state: &mut ComputeState, + sink: &ComputeSinkDesc, + sink_id: GlobalId, + as_of: Antichain, + start_signal: StartSignal, + oks: Collection, + errs: Collection, + ) -> Option> { + todo!( + "WIP {:?}", + ( + std::any::type_name_of_val(&compute_state), + sink, + sink_id, + as_of, + std::any::type_name_of_val(&start_signal), + std::any::type_name_of_val(&oks), + std::any::type_name_of_val(&errs), + ) + ); + } +} diff --git a/src/environmentd/src/http/sql.rs b/src/environmentd/src/http/sql.rs index 908f36db1c80c..60d21aa686e68 100644 --- a/src/environmentd/src/http/sql.rs +++ b/src/environmentd/src/http/sql.rs @@ -1222,6 +1222,7 @@ async fn execute_stmt( | ExecuteResponse::CreatedView { .. } | ExecuteResponse::CreatedViews { .. } | ExecuteResponse::CreatedMaterializedView { .. } + | ExecuteResponse::CreatedContinualTask { .. } | ExecuteResponse::CreatedType | ExecuteResponse::Comment | ExecuteResponse::Deleted(_) diff --git a/src/pgwire/src/protocol.rs b/src/pgwire/src/protocol.rs index 4f4deab78f7c5..12a4bcc84ae4f 100644 --- a/src/pgwire/src/protocol.rs +++ b/src/pgwire/src/protocol.rs @@ -1721,6 +1721,7 @@ where | ExecuteResponse::CreatedIndex { .. } | ExecuteResponse::CreatedIntrospectionSubscribe | ExecuteResponse::CreatedMaterializedView { .. } + | ExecuteResponse::CreatedContinualTask { .. } | ExecuteResponse::CreatedRole | ExecuteResponse::CreatedSchema { .. } | ExecuteResponse::CreatedSecret { .. } diff --git a/src/sql/src/normalize.rs b/src/sql/src/normalize.rs index eae5029fb287d..642825adb2578 100644 --- a/src/sql/src/normalize.rs +++ b/src/sql/src/normalize.rs @@ -21,12 +21,12 @@ use mz_repr::{ColumnName, GlobalId}; use mz_sql_parser::ast::display::AstDisplay; use mz_sql_parser::ast::visit_mut::{self, VisitMut}; use mz_sql_parser::ast::{ - CreateConnectionStatement, CreateIndexStatement, CreateMaterializedViewStatement, - CreateSecretStatement, CreateSinkStatement, CreateSourceStatement, CreateSubsourceStatement, - CreateTableFromSourceStatement, CreateTableStatement, CreateTypeStatement, CreateViewStatement, - CreateWebhookSourceStatement, CteBlock, Function, FunctionArgs, Ident, IfExistsBehavior, - MutRecBlock, Op, Query, Statement, TableFactor, UnresolvedItemName, UnresolvedSchemaName, - Value, ViewDefinition, + CreateConnectionStatement, CreateContinualTaskStatement, CreateIndexStatement, + CreateMaterializedViewStatement, CreateSecretStatement, CreateSinkStatement, + CreateSourceStatement, CreateSubsourceStatement, CreateTableFromSourceStatement, + CreateTableStatement, CreateTypeStatement, CreateViewStatement, CreateWebhookSourceStatement, + CteBlock, Function, FunctionArgs, Ident, IfExistsBehavior, MutRecBlock, Op, Query, Statement, + TableFactor, UnresolvedItemName, UnresolvedSchemaName, Value, ViewDefinition, }; use crate::names::{Aug, FullItemName, PartialItemName, PartialSchemaName, RawDatabaseSpecifier}; @@ -415,6 +415,16 @@ pub fn create_statement( *if_exists = IfExistsBehavior::Error; } + Statement::CreateContinualTask(CreateContinualTaskStatement { + name, + columns, + input, + stmts, + in_cluster, + }) => { + todo!("WIP {:?}", (name, columns, input, stmts, in_cluster)); + } + Statement::CreateIndex(CreateIndexStatement { name: _, in_cluster: _, diff --git a/src/sql/src/plan.rs b/src/sql/src/plan.rs index 636215cad456e..d8763dd9ca178 100644 --- a/src/sql/src/plan.rs +++ b/src/sql/src/plan.rs @@ -137,6 +137,7 @@ pub enum Plan { CreateTable(CreateTablePlan), CreateView(CreateViewPlan), CreateMaterializedView(CreateMaterializedViewPlan), + CreateContinualTask(CreateContinualTaskPlan), CreateIndex(CreateIndexPlan), CreateType(CreateTypePlan), Comment(CommentPlan), @@ -259,6 +260,7 @@ impl Plan { StatementKind::CreateDatabase => &[PlanKind::CreateDatabase], StatementKind::CreateIndex => &[PlanKind::CreateIndex], StatementKind::CreateMaterializedView => &[PlanKind::CreateMaterializedView], + StatementKind::CreateContinualTask => &[PlanKind::CreateContinualTask], StatementKind::CreateRole => &[PlanKind::CreateRole], StatementKind::CreateSchema => &[PlanKind::CreateSchema], StatementKind::CreateSecret => &[PlanKind::CreateSecret], @@ -327,6 +329,7 @@ impl Plan { Plan::CreateTable(_) => "create table", Plan::CreateView(_) => "create view", Plan::CreateMaterializedView(_) => "create materialized view", + Plan::CreateContinualTask(_) => "create continual task", Plan::CreateIndex(_) => "create index", Plan::CreateType(_) => "create type", Plan::Comment(_) => "comment", @@ -703,6 +706,9 @@ pub struct CreateMaterializedViewPlan { pub ambiguous_columns: bool, } +#[derive(Debug, Clone)] +pub struct CreateContinualTaskPlan {} + #[derive(Debug, Clone)] pub struct CreateIndexPlan { pub name: QualifiedItemName, diff --git a/src/sql/src/plan/statement.rs b/src/sql/src/plan/statement.rs index fbad34087db56..029f19ba26035 100644 --- a/src/sql/src/plan/statement.rs +++ b/src/sql/src/plan/statement.rs @@ -158,6 +158,7 @@ pub fn describe( Statement::CreateMaterializedView(stmt) => { ddl::describe_create_materialized_view(&scx, stmt)? } + Statement::CreateContinualTask(stmt) => ddl::describe_create_continual_task(&scx, stmt)?, Statement::DropObjects(stmt) => ddl::describe_drop_objects(&scx, stmt)?, Statement::DropOwned(stmt) => ddl::describe_drop_owned(&scx, stmt)?, @@ -343,6 +344,7 @@ pub fn plan( Statement::CreateMaterializedView(stmt) => { ddl::plan_create_materialized_view(scx, stmt, params) } + Statement::CreateContinualTask(stmt) => ddl::plan_create_continual_task(scx, stmt, params), Statement::DropObjects(stmt) => ddl::plan_drop_objects(scx, stmt), Statement::DropOwned(stmt) => ddl::plan_drop_owned(scx, stmt), @@ -1037,6 +1039,7 @@ impl From<&Statement> for StatementClassifica Statement::CreateCluster(_) => DDL, Statement::CreateClusterReplica(_) => DDL, Statement::CreateConnection(_) => DDL, + Statement::CreateContinualTask(_) => DDL, Statement::CreateDatabase(_) => DDL, Statement::CreateIndex(_) => DDL, Statement::CreateRole(_) => DDL, diff --git a/src/sql/src/plan/statement/ddl.rs b/src/sql/src/plan/statement/ddl.rs index 0f52ad13da749..d89c3379bae83 100644 --- a/src/sql/src/plan/statement/ddl.rs +++ b/src/sql/src/plan/statement/ddl.rs @@ -54,26 +54,26 @@ use mz_sql_parser::ast::{ ClusterScheduleOptionValue, ColumnDef, ColumnOption, CommentObjectType, CommentStatement, ConnectionOption, ConnectionOptionName, CreateClusterReplicaStatement, CreateClusterStatement, CreateConnectionOption, CreateConnectionOptionName, CreateConnectionStatement, - CreateConnectionType, CreateDatabaseStatement, CreateIndexStatement, - CreateMaterializedViewStatement, CreateRoleStatement, CreateSchemaStatement, - CreateSecretStatement, CreateSinkConnection, CreateSinkOption, CreateSinkOptionName, - CreateSinkStatement, CreateSourceConnection, CreateSourceOption, CreateSourceOptionName, - CreateSourceStatement, CreateSubsourceOption, CreateSubsourceOptionName, - CreateSubsourceStatement, CreateTableFromSourceStatement, CreateTableStatement, CreateTypeAs, - CreateTypeListOption, CreateTypeListOptionName, CreateTypeMapOption, CreateTypeMapOptionName, - CreateTypeStatement, CreateViewStatement, CreateWebhookSourceStatement, CsrConfigOption, - CsrConfigOptionName, CsrConnection, CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf, - CsvColumns, DeferredItemName, DocOnIdentifier, DocOnSchema, DropObjectsStatement, - DropOwnedStatement, Expr, Format, FormatSpecifier, Ident, IfExistsBehavior, IndexOption, - IndexOptionName, KafkaSinkConfigOption, KeyConstraint, LoadGeneratorOption, - LoadGeneratorOptionName, MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption, - MySqlConfigOptionName, PgConfigOption, PgConfigOptionName, ProtobufSchema, QualifiedReplica, - RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue, ReplicaDefinition, - ReplicaOption, ReplicaOptionName, RoleAttribute, SetRoleVar, SourceErrorPolicy, - SourceIncludeMetadata, Statement, TableConstraint, TableFromSourceOption, - TableFromSourceOptionName, TableOption, TableOptionName, UnresolvedDatabaseName, - UnresolvedItemName, UnresolvedObjectName, UnresolvedSchemaName, Value, ViewDefinition, - WithOptionValue, + CreateConnectionType, CreateContinualTaskStatement, CreateDatabaseStatement, + CreateIndexStatement, CreateMaterializedViewStatement, CreateRoleStatement, + CreateSchemaStatement, CreateSecretStatement, CreateSinkConnection, CreateSinkOption, + CreateSinkOptionName, CreateSinkStatement, CreateSourceConnection, CreateSourceOption, + CreateSourceOptionName, CreateSourceStatement, CreateSubsourceOption, + CreateSubsourceOptionName, CreateSubsourceStatement, CreateTableFromSourceStatement, + CreateTableStatement, CreateTypeAs, CreateTypeListOption, CreateTypeListOptionName, + CreateTypeMapOption, CreateTypeMapOptionName, CreateTypeStatement, CreateViewStatement, + CreateWebhookSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection, + CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf, CsvColumns, DeferredItemName, + DocOnIdentifier, DocOnSchema, DropObjectsStatement, DropOwnedStatement, Expr, Format, + FormatSpecifier, Ident, IfExistsBehavior, IndexOption, IndexOptionName, KafkaSinkConfigOption, + KeyConstraint, LoadGeneratorOption, LoadGeneratorOptionName, MaterializedViewOption, + MaterializedViewOptionName, MySqlConfigOption, MySqlConfigOptionName, PgConfigOption, + PgConfigOptionName, ProtobufSchema, QualifiedReplica, RefreshAtOptionValue, + RefreshEveryOptionValue, RefreshOptionValue, ReplicaDefinition, ReplicaOption, + ReplicaOptionName, RoleAttribute, SetRoleVar, SourceErrorPolicy, SourceIncludeMetadata, + Statement, TableConstraint, TableFromSourceOption, TableFromSourceOptionName, TableOption, + TableOptionName, UnresolvedDatabaseName, UnresolvedItemName, UnresolvedObjectName, + UnresolvedSchemaName, Value, ViewDefinition, WithOptionValue, }; use mz_sql_parser::ident; use mz_sql_parser::parser::StatementParseResult; @@ -2434,6 +2434,13 @@ pub fn describe_create_materialized_view( Ok(StatementDesc::new(None)) } +pub fn describe_create_continual_task( + _: &StatementContext, + _: CreateContinualTaskStatement, +) -> Result { + Ok(StatementDesc::new(None)) +} + pub fn plan_create_materialized_view( scx: &StatementContext, mut stmt: CreateMaterializedViewStatement, @@ -2706,6 +2713,14 @@ generate_extracted_config!( (Refresh, RefreshOptionValue, AllowMultiple) ); +pub fn plan_create_continual_task( + scx: &StatementContext, + stmt: CreateContinualTaskStatement, + params: &Params, +) -> Result { + todo!("WIP {:?}", (scx, stmt, params)); +} + pub fn describe_create_sink( _: &StatementContext, _: CreateSinkStatement, diff --git a/src/sql/src/rbac.rs b/src/sql/src/rbac.rs index 372cb8f772522..92bba069ef542 100644 --- a/src/sql/src/rbac.rs +++ b/src/sql/src/rbac.rs @@ -600,6 +600,7 @@ fn generate_rbac_requirements( item_usage: &CREATE_ITEM_USAGE, ..Default::default() }, + Plan::CreateContinualTask(plan::CreateContinualTaskPlan {}) => todo!("WIP"), Plan::CreateIndex(plan::CreateIndexPlan { name, index, From a2feb1c6d746566d08154b31a4e9795c89168b30 Mon Sep 17 00:00:00 2001 From: Daniel Harrison <52528+danhhz@users.noreply.github.com> Date: Fri, 6 Sep 2024 08:55:58 -0700 Subject: [PATCH 3/3] ct: add strawman impl of CREATE CONTINUAL TASK Strawman because: - I personally find it much easier to start with a crappy thing and incrementally improve it than to iteration on a huge branch forever. - Allows for more easily collaborating on the remaining work. - Also to build excitement internally! A continual task presents as something like a `BEFORE TRIGGER`: it watches some _input_ and whenever it changes at time `T`, executes a SQL txn, writing to some _output_ at the same time `T`. It can also read anything in materialize as a _reference_, most notably including the output. Only reacting to new inputs (and not the full history) makes a CT's rehydration time independent of the size of the inputs (NB this is not true for references), enabling things like writing UPSERT on top of an append-only shard in SQL (ignore the obvious bug with my upsert impl): ```sql CREATE CONTINUAL TASK upsert (key INT, val INT) ON INPUT append_only AS ( DELETE FROM upsert WHERE key IN (SELECT key FROM append_only); INSERT INTO upsert SELECT key, max(val) FROM append_only GROUP BY key; ) ``` Unlike a materialized view, the continual task does not update outputs if references later change. This enables things like auditing: ```sql CREATE CONTINUAL TASK audit_log (count INT8) ON INPUT anomalies AS ( INSERT INTO audit_log SELECT * FROM anomalies; ) ``` As mentioned above, this is in no way the final form of CTs. There's lots of big open questions left on what the feature should look like as presented to users. However, we'll start shipping it by exposing incrementally less limited (and more powerful) surface areas publicly: e.g. perhaps a RETENTION WINDOW on sources. --- src/adapter/src/catalog.rs | 10 + .../src/catalog/builtin_table_updates.rs | 17 +- src/adapter/src/catalog/state.rs | 27 +- .../sequencer/inner/create_continual_task.rs | 183 +++- src/compute-client/Cargo.toml | 2 +- src/compute-client/src/controller/instance.rs | 15 +- src/compute-types/src/sinks.proto | 2 + src/compute-types/src/sinks.rs | 28 +- src/compute/src/render.rs | 33 +- src/compute/src/render/continual_task.rs | 839 ++++++++++++++++++ src/compute/src/render/sinks.rs | 5 + src/compute/src/sink.rs | 1 - src/compute/src/sink/continual_task.rs | 52 -- src/compute/src/sink/copy_to_s3_oneshot.rs | 1 + src/compute/src/sink/persist_sink.rs | 1 + src/compute/src/sink/subscribe.rs | 1 + src/sql-parser/src/parser.rs | 1 + src/sql/src/names.rs | 13 +- src/sql/src/normalize.rs | 30 +- src/sql/src/plan.rs | 9 +- src/sql/src/plan/query.rs | 55 +- src/sql/src/plan/statement/ddl.rs | 193 +++- src/sql/src/rbac.rs | 22 +- src/sql/src/session/vars/definitions.rs | 6 + test/sqllogictest/ct_audit_log.slt | 53 ++ test/sqllogictest/ct_errors.slt | 36 + test/sqllogictest/ct_stream_table_join.slt | 49 + test/sqllogictest/ct_upsert.slt | 41 + 28 files changed, 1627 insertions(+), 98 deletions(-) create mode 100644 src/compute/src/render/continual_task.rs delete mode 100644 src/compute/src/sink/continual_task.rs create mode 100644 test/sqllogictest/ct_audit_log.slt create mode 100644 test/sqllogictest/ct_errors.slt create mode 100644 test/sqllogictest/ct_stream_table_join.slt create mode 100644 test/sqllogictest/ct_upsert.slt diff --git a/src/adapter/src/catalog.rs b/src/adapter/src/catalog.rs index 722eae1f079e0..5961b104e9df0 100644 --- a/src/adapter/src/catalog.rs +++ b/src/adapter/src/catalog.rs @@ -360,6 +360,16 @@ impl Catalog { } policies } + + /// TODO(ct): This exists for continual tasks because they are the first + /// self-referential thing in mz. We use this to inject something for the + /// optimizer to resolve the CT's own id to before we've saved it to the + /// catalog. The one usage of this is careful to destroy this copy of + /// catalog immediately after. There are better ways to do this, but it was + /// the easiest path to get the skeleton of the feature merged. + pub fn hack_add_ct(&mut self, id: GlobalId, entry: CatalogEntry) { + self.state.entry_by_id.insert(id, entry); + } } #[derive(Debug)] diff --git a/src/adapter/src/catalog/builtin_table_updates.rs b/src/adapter/src/catalog/builtin_table_updates.rs index 59ca6e2b5a427..659e3b11033ba 100644 --- a/src/adapter/src/catalog/builtin_table_updates.rs +++ b/src/adapter/src/catalog/builtin_table_updates.rs @@ -1245,16 +1245,19 @@ impl CatalogState { }) .into_element() .ast; - let query = match &create_stmt { - Statement::CreateMaterializedView(stmt) => &stmt.query, + let query_string = match &create_stmt { + Statement::CreateMaterializedView(stmt) => { + let mut query_string = stmt.query.to_ast_string_stable(); + // PostgreSQL appends a semicolon in `pg_matviews.definition`, we + // do the same for compatibility's sake. + query_string.push(';'); + query_string + } + // TODO(ct): Remove. + Statement::CreateContinualTask(_) => "TODO(ct)".into(), _ => unreachable!(), }; - let mut query_string = query.to_ast_string_stable(); - // PostgreSQL appends a semicolon in `pg_matviews.definition`, we - // do the same for compatibility's sake. - query_string.push(';'); - let mut updates = Vec::new(); updates.push(BuiltinTableUpdate { diff --git a/src/adapter/src/catalog/state.rs b/src/adapter/src/catalog/state.rs index 7bfeabc22a93e..9e00c89024400 100644 --- a/src/adapter/src/catalog/state.rs +++ b/src/adapter/src/catalog/state.rs @@ -38,6 +38,7 @@ use mz_controller::clusters::{ UnmanagedReplicaLocation, }; use mz_controller_types::{ClusterId, ReplicaId}; +use mz_expr::OptimizedMirRelationExpr; use mz_ore::collections::CollectionExt; use mz_ore::now::NOW_ZERO; use mz_ore::soft_assert_no_log; @@ -946,7 +947,31 @@ impl CatalogState { initial_as_of, }) } - Plan::CreateContinualTask(CreateContinualTaskPlan {}) => todo!("WIP"), + Plan::CreateContinualTask(CreateContinualTaskPlan { + desc, + continual_task, + .. + }) => { + // TODO(ct): Figure out how to make this survive restarts. The + // expr we saved still had the LocalId placeholders for the + // output, but we don't have access to the real Id here. + let optimized_expr = OptimizedMirRelationExpr::declare_optimized( + mz_expr::MirRelationExpr::constant(Vec::new(), desc.typ().clone()), + ); + // TODO(ct): CatalogItem::ContinualTask + CatalogItem::MaterializedView(MaterializedView { + create_sql: continual_task.create_sql, + raw_expr: Arc::new(continual_task.expr.clone()), + optimized_expr: Arc::new(optimized_expr), + desc, + resolved_ids, + cluster_id: continual_task.cluster_id, + non_null_assertions: continual_task.non_null_assertions, + custom_logical_compaction_window: continual_task.compaction_window, + refresh_schedule: continual_task.refresh_schedule, + initial_as_of: continual_task.as_of.map(Antichain::from_elem), + }) + } Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index { create_sql: index.create_sql, on: index.on, diff --git a/src/adapter/src/coord/sequencer/inner/create_continual_task.rs b/src/adapter/src/coord/sequencer/inner/create_continual_task.rs index f94fa182182aa..edf249274d4da 100644 --- a/src/adapter/src/coord/sequencer/inner/create_continual_task.rs +++ b/src/adapter/src/coord/sequencer/inner/create_continual_task.rs @@ -7,15 +7,34 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::sync::Arc; + +use mz_catalog::memory::objects::{ + CatalogEntry, CatalogItem, MaterializedView, Table, TableDataSource, +}; +use mz_compute_types::sinks::{ + ComputeSinkConnection, ContinualTaskConnection, PersistSinkConnection, +}; +use mz_expr::visit::Visit; +use mz_expr::{Id, LocalId}; use mz_ore::instrument; +use mz_repr::adt::mz_acl_item::PrivilegeMap; +use mz_repr::optimize::OverrideFrom; use mz_sql::names::ResolvedIds; -use mz_sql::plan; +use mz_sql::plan::{self, HirRelationExpr}; +use mz_sql::session::metadata::SessionMetadata; +use mz_storage_client::controller::{CollectionDescription, DataSource, DataSourceOther}; +use crate::catalog; use crate::command::ExecuteResponse; use crate::coord::Coordinator; use crate::error::AdapterError; +use crate::optimize::dataflows::dataflow_import_id_bundle; +use crate::optimize::{self, Optimize}; use crate::session::Session; +use crate::util::ResultExt; +// TODO(ct): Big oof. Dedup a bunch of this with MVs. impl Coordinator { #[instrument] pub(crate) async fn sequence_create_continual_task( @@ -24,6 +43,166 @@ impl Coordinator { plan: plan::CreateContinualTaskPlan, resolved_ids: ResolvedIds, ) -> Result { - todo!("WIP {:?}", (session, plan, resolved_ids)); + let plan::CreateContinualTaskPlan { + name, + desc, + input_id, + continual_task: + plan::MaterializedView { + create_sql, + cluster_id, + mut expr, + column_names, + non_null_assertions, + compaction_window: _, + refresh_schedule, + as_of: _, + }, + } = plan; + + // Collect optimizer parameters. + let compute_instance = self + .instance_snapshot(cluster_id) + .expect("compute instance does not exist"); + + let debug_name = self.catalog().resolve_full_name(&name, None).to_string(); + let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config()) + .override_from(&self.catalog.get_cluster(cluster_id).config.features()); + + let view_id = self.allocate_transient_id(); + let catalog_mut = self.catalog_mut(); + let sink_id = catalog_mut.allocate_user_id().await?; + + // Put a placeholder in the catalog so the optimizer can find something + // for the sink_id. + let fake_entry = CatalogEntry { + item: CatalogItem::Table(Table { + create_sql: Some(create_sql.clone()), + desc: desc.clone(), + conn_id: None, + resolved_ids: resolved_ids.clone(), + custom_logical_compaction_window: None, + is_retained_metrics_object: false, + data_source: TableDataSource::TableWrites { + defaults: Vec::new(), + }, + }), + referenced_by: Vec::new(), + used_by: Vec::new(), + id: sink_id, + oid: 0, + name: name.clone(), + owner_id: *session.current_role_id(), + privileges: PrivilegeMap::new(), + }; + catalog_mut.hack_add_ct(sink_id, fake_entry); + + // Build an optimizer for this CONTINUAL TASK. + let mut optimizer = optimize::materialized_view::Optimizer::new( + Arc::new(catalog_mut.clone()), + compute_instance, + sink_id, + view_id, + column_names, + non_null_assertions, + refresh_schedule.clone(), + debug_name, + optimizer_config, + self.optimizer_metrics(), + ); + + // Replace our placeholder fake ctes with the real output id, now that + // we have it. + expr.visit_mut_post(&mut |expr| match expr { + HirRelationExpr::Get { id, .. } if *id == Id::Local(LocalId::new(0)) => { + *id = Id::Global(sink_id); + } + _ => {} + })?; + + // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local and global) + let raw_expr = expr.clone(); + let local_mir_plan = optimizer.catch_unwind_optimize(expr)?; + let global_mir_plan = optimizer.catch_unwind_optimize(local_mir_plan.clone())?; + // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global) + let global_lir_plan = optimizer.catch_unwind_optimize(global_mir_plan.clone())?; + let (mut df_desc, _df_meta) = global_lir_plan.unapply(); + + // Timestamp selection + let mut id_bundle = dataflow_import_id_bundle(&df_desc, cluster_id.clone()); + // Can't acquire a read hold on ourselves because we don't exist yet. + // + // It is not necessary to take a read hold on the CT output in the + // coordinator, since the current scheme takes read holds in the + // coordinator only to ensure inputs don't get compacted until the + // compute controller has installed its own read holds, which happens + // below with the `ship_dataflow` call. + id_bundle.storage_ids.remove(&sink_id); + let read_holds = self.acquire_read_holds(&id_bundle); + let as_of = read_holds.least_valid_read(); + df_desc.set_as_of(as_of.clone()); + + // TODO(ct): HACKs + for sink in df_desc.sink_exports.values_mut() { + match &mut sink.connection { + ComputeSinkConnection::Persist(PersistSinkConnection { + storage_metadata, .. + }) => { + sink.connection = + ComputeSinkConnection::ContinualTask(ContinualTaskConnection { + input_id, + storage_metadata: *storage_metadata, + }) + } + _ => unreachable!("MV should produce persist sink connection"), + } + } + + let ops = vec![catalog::Op::CreateItem { + id: sink_id, + name: name.clone(), + item: CatalogItem::MaterializedView(MaterializedView { + // TODO(ct): This doesn't give the `DELETE FROM` / `INSERT INTO` + // names the `[u1 AS "materialize"."public"."append_only"]` + // style expansion. Bug? + create_sql, + raw_expr: Arc::new(raw_expr), + optimized_expr: Arc::new(local_mir_plan.expr()), + desc: desc.clone(), + resolved_ids, + cluster_id, + non_null_assertions: Vec::new(), + custom_logical_compaction_window: None, + refresh_schedule, + initial_as_of: Some(as_of.clone()), + }), + owner_id: *session.current_role_id(), + }]; + + let () = self + .catalog_transact_with_side_effects(Some(session), ops, |coord| async { + coord + .controller + .storage + .create_collections( + coord.catalog.state().storage_metadata(), + None, + vec![( + sink_id, + CollectionDescription { + desc, + data_source: DataSource::Other(DataSourceOther::Compute), + since: Some(as_of), + status_collection_id: None, + }, + )], + ) + .await + .unwrap_or_terminate("cannot fail to append"); + + coord.ship_dataflow(df_desc, cluster_id.clone(), None).await; + }) + .await?; + Ok(ExecuteResponse::CreatedContinualTask) } } diff --git a/src/compute-client/Cargo.toml b/src/compute-client/Cargo.toml index f9b208ae0ce2a..65c78bbd0a1b1 100644 --- a/src/compute-client/Cargo.toml +++ b/src/compute-client/Cargo.toml @@ -28,7 +28,7 @@ mz-dyncfg = { path = "../dyncfg" } mz-dyncfgs = { path = "../dyncfgs" } mz-expr = { path = "../expr" } mz-orchestrator = { path = "../orchestrator" } -mz-ore = { path = "../ore", features = ["tracing_"] } +mz-ore = { path = "../ore", features = ["tracing_","chrono"] } mz-persist = { path = "../persist" } mz-persist-client = { path = "../persist-client" } mz-persist-types = { path = "../persist-types" } diff --git a/src/compute-client/src/controller/instance.rs b/src/compute-client/src/controller/instance.rs index fa607a0a98c08..fb84afa3be0f3 100644 --- a/src/compute-client/src/controller/instance.rs +++ b/src/compute-client/src/controller/instance.rs @@ -22,7 +22,9 @@ use mz_cluster_client::client::{ClusterStartupEpoch, TimelyConfig}; use mz_compute_types::dataflows::{BuildDesc, DataflowDescription}; use mz_compute_types::plan::flat_plan::FlatPlan; use mz_compute_types::plan::LirId; -use mz_compute_types::sinks::{ComputeSinkConnection, ComputeSinkDesc, PersistSinkConnection}; +use mz_compute_types::sinks::{ + ComputeSinkConnection, ComputeSinkDesc, ContinualTaskConnection, PersistSinkConnection, +}; use mz_compute_types::sources::SourceInstanceDesc; use mz_controller_types::dyncfgs::WALLCLOCK_LAG_REFRESH_INTERVAL; use mz_dyncfg::ConfigSet; @@ -1290,7 +1292,16 @@ where ComputeSinkConnection::Persist(conn) } ComputeSinkConnection::ContinualTask(conn) => { - todo!("WIP {:?}", conn); + let metadata = self + .storage_collections + .collection_metadata(id) + .map_err(|_| DataflowCreationError::CollectionMissing(id))? + .clone(); + let conn = ContinualTaskConnection { + input_id: conn.input_id, + storage_metadata: metadata, + }; + ComputeSinkConnection::ContinualTask(conn) } ComputeSinkConnection::Subscribe(conn) => ComputeSinkConnection::Subscribe(conn), ComputeSinkConnection::CopyToS3Oneshot(conn) => { diff --git a/src/compute-types/src/sinks.proto b/src/compute-types/src/sinks.proto index e55398971c505..34cc9a2e357f3 100644 --- a/src/compute-types/src/sinks.proto +++ b/src/compute-types/src/sinks.proto @@ -48,6 +48,8 @@ message ProtoPersistSinkConnection { } message ProtoContinualTaskConnection { + mz_repr.global_id.ProtoGlobalId input_id = 1; + mz_storage_types.controller.ProtoCollectionMetadata storage_metadata = 2; } message ProtoCopyToS3OneshotSinkConnection { diff --git a/src/compute-types/src/sinks.rs b/src/compute-types/src/sinks.rs index 9a3e2335c8e52..f713098ba37e6 100644 --- a/src/compute-types/src/sinks.rs +++ b/src/compute-types/src/sinks.rs @@ -120,6 +120,10 @@ pub enum ComputeSinkConnection { /// TODO(#25239): Add documentation. Persist(PersistSinkConnection), /// TODO(#25239): Add documentation. + /// + /// TODO(ct): This also writes to persist, but with different behavior + /// (conflict resolution, only at input times, etc). It might be time to + /// rename PersistSink to something that reflects the differences. ContinualTask(ContinualTaskConnection), /// A compute sink to do a oneshot copy to s3. CopyToS3Oneshot(CopyToS3OneshotSinkConnection), @@ -253,18 +257,34 @@ impl RustType for PersistSinkConnection { - _phantom: std::marker::PhantomData, + /// TODO(#25239): Add documentation. + // + // TODO(ct): This can be removed once we render the "input" sources without + // the hack. + pub input_id: GlobalId, + /// TODO(ct): Add documentation. + pub storage_metadata: S, } impl RustType for ContinualTaskConnection { fn into_proto(&self) -> ProtoContinualTaskConnection { - todo!("WIP"); + ProtoContinualTaskConnection { + input_id: Some(self.input_id.into_proto()), + storage_metadata: Some(self.storage_metadata.into_proto()), + } } fn from_proto(proto: ProtoContinualTaskConnection) -> Result { - todo!("WIP {:?}", proto); + Ok(ContinualTaskConnection { + input_id: proto + .input_id + .into_rust_if_some("ProtoContinualTaskConnection::input_id")?, + storage_metadata: proto + .storage_metadata + .into_rust_if_some("ProtoContinualTaskConnection::output_metadata")?, + }) } } diff --git a/src/compute/src/render.rs b/src/compute/src/render.rs index 7a0e0db48cb64..25f760e8755c8 100644 --- a/src/compute/src/render.rs +++ b/src/compute/src/render.rs @@ -150,9 +150,11 @@ use crate::logging::compute::LogDataflowErrors; use crate::render::context::{ ArrangementFlavor, Context, MzArrangement, MzArrangementImport, ShutdownToken, }; +use crate::render::continual_task::ContinualTaskCtx; use crate::typedefs::{ErrSpine, KeyBatcher}; pub mod context; +pub(crate) mod continual_task; mod errors; mod flat_map; mod join; @@ -202,6 +204,10 @@ pub fn build_compute_dataflow( let build_name = format!("BuildRegion: {}", &dataflow.debug_name); timely_worker.dataflow_core(&name, worker_logging, Box::new(()), |_, scope| { + // TODO(ct): This should be a config of the source instead, but at least try + // to contain the hacks. + let mut ct_ctx = ContinualTaskCtx::new(&dataflow); + // The scope.clone() occurs to allow import in the region. // We build a region here to establish a pattern of a scope inside the dataflow, // so that other similar uses (e.g. with iterative scopes) do not require weird @@ -217,9 +223,16 @@ pub fn build_compute_dataflow( .expect("Linear operators should always be valid") }); + let ct_inserts_transformer = ct_ctx.get_ct_inserts_transformer(*source_id); + let snapshot_mode = if ct_inserts_transformer.is_none() { + SnapshotMode::Include + } else { + SnapshotMode::Exclude + }; + // Note: For correctness, we require that sources only emit times advanced by // `dataflow.as_of`. `persist_source` is documented to provide this guarantee. - let (mut ok_stream, err_stream, token) = persist_source::persist_source( + let (ok_stream, err_stream, token) = persist_source::persist_source( inner, *source_id, Arc::clone(&compute_state.persist_clients), @@ -227,7 +240,7 @@ pub fn build_compute_dataflow( &compute_state.worker_config, source.storage_metadata.clone(), dataflow.as_of.clone(), - SnapshotMode::Include, + snapshot_mode, dataflow.until.clone(), mfp.as_mut(), compute_state.dataflow_max_inflight_bytes(), @@ -235,6 +248,20 @@ pub fn build_compute_dataflow( |error| panic!("compute_import: {error}"), ); + let (mut ok_stream, err_stream) = match ct_inserts_transformer { + None => (ok_stream, err_stream), + Some(inserts_transformer_fn) => { + let (oks, errs, ct_times) = inserts_transformer_fn( + ok_stream.as_collection(), + err_stream.as_collection(), + ); + // TODO(ct): Ideally this would be encapsulated by ContinualTaskCtx, but + // the types are tricky. + ct_ctx.ct_times.push(ct_times.leave_region().leave_region()); + (oks.inner, errs.inner) + } + }; + // If `mfp` is non-identity, we need to apply what remains. // For the moment, assert that it is either trivial or `None`. assert!(mfp.map(|x| x.is_identity()).unwrap_or(true)); @@ -333,6 +360,7 @@ pub fn build_compute_dataflow( sink_id, &sink, start_signal.clone(), + ct_ctx.input_times(), ); } }); @@ -396,6 +424,7 @@ pub fn build_compute_dataflow( sink_id, &sink, start_signal.clone(), + ct_ctx.input_times(), ); } }); diff --git a/src/compute/src/render/continual_task.rs b/src/compute/src/render/continual_task.rs new file mode 100644 index 0000000000000..253f7ea0d1561 --- /dev/null +++ b/src/compute/src/render/continual_task.rs @@ -0,0 +1,839 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! A continual task presents as something like a `BEFORE TRIGGER`: it watches +//! some _input_ and whenever it changes at time `T`, executes a SQL txn, +//! writing to some _output_ at the same time `T`. It can also read anything in +//! materialize as a _reference_, most notably including the output. +//! +//! Only reacting to new inputs (and not the full history) makes a CT's +//! rehydration time independent of the size of the inputs (NB this is not true +//! for references), enabling things like writing UPSERT on top of an +//! append-only shard in SQL (ignore the obvious bug with my upsert impl): +//! +//! ```sql +//! CREATE CONTINUAL TASK upsert (key INT, val INT) ON INPUT append_only AS ( +//! DELETE FROM upsert WHERE key IN (SELECT key FROM append_only); +//! INSERT INTO upsert SELECT key, max(val) FROM append_only GROUP BY key; +//! ) +//! ``` +//! +//! Unlike a materialized view, the continual task does not update outputs if +//! references later change. This enables things like auditing: +//! +//! ```sql +//! CREATE CONTINUAL TASK audit_log (count INT8) ON INPUT anomalies AS ( +//! INSERT INTO audit_log SELECT * FROM anomalies; +//! ) +//! ``` +//! +//! Rough implementation overview: +//! - A CT is created and starts at some `start_ts` optionally later dropped and +//! stopped at some `end_ts`. +//! - A CT takes one or more _input_s. These must be persist shards (i.e. TABLE, +//! SOURCE, MV, but not VIEW). +//! - A CT has one or more _output_s. The outputs are (initially) owned by the +//! task and cannot be written to by other parts of the system. +//! - The task is run for each time one of the inputs changes starting at +//! `start_ts`. +//! - It is given the changes in its inputs at time `T` as diffs. +//! - These are presented as two SQL relations with just the inserts/deletes. +//! - NB: A full collection for the input can always be recovered by also +//! using the input as a "reference" (see below) and applying the diffs. +//! - The task logic is expressed as a SQL transaction that does all reads at +//! time `T-1` and commits all writes at `T` +//! - Intuition is that the logic runs before the input is written, like a +//! `CREATE TRIGGER ... BEFORE`. +//! - This logic can _reference_ any nameable object in the system, not just the +//! inputs. +//! - However, the logic/transaction can mutate only the outputs. +//! - Summary of differences between inputs and references: +//! - The task receives snapshot + changes for references (like regular +//! dataflow inputs today) but only changes for inputs. +//! - The task only produces output in response to changes in the inputs but +//! not in response to changes in the references. +//! - Inputs are reclocked by subtracting 1 from their timestamps, references +//! are not. +//! - Instead of re-evaluating the task logic from scratch for each input time, +//! we maintain the collection representing desired writes to the output(s) as +//! a dataflow. +//! - The task dataflow is tied to a `CLUSTER` and runs on each `REPLICA`. +//! - HA strategy: multi-replica clusters race to commit and the losers throw +//! away the result. + +use std::any::Any; +use std::cell::RefCell; +use std::collections::BTreeSet; +use std::rc::Rc; +use std::sync::Arc; + +use differential_dataflow::consolidation::consolidate_updates; +use differential_dataflow::difference::Semigroup; +use differential_dataflow::lattice::Lattice; +use differential_dataflow::{AsCollection, Collection, Hashable}; +use futures::{Future, FutureExt, StreamExt}; +use mz_compute_types::dataflows::DataflowDescription; +use mz_compute_types::sinks::{ComputeSinkConnection, ComputeSinkDesc, ContinualTaskConnection}; +use mz_ore::cast::CastFrom; +use mz_ore::collections::HashMap; +use mz_persist_client::error::UpperMismatch; +use mz_persist_client::write::WriteHandle; +use mz_persist_client::Diagnostics; +use mz_persist_types::codec_impls::UnitSchema; +use mz_repr::{Diff, GlobalId, Row, Timestamp}; +use mz_storage_types::controller::CollectionMetadata; +use mz_storage_types::errors::DataflowError; +use mz_storage_types::sources::SourceData; +use mz_timely_util::builder_async::{Button, Event, OperatorBuilder as AsyncOperatorBuilder}; +use timely::dataflow::channels::pact::{Exchange, Pipeline}; +use timely::dataflow::operators::{Filter, FrontierNotificator, Map, Operator}; +use timely::dataflow::{ProbeHandle, Scope}; +use timely::progress::frontier::AntichainRef; +use timely::progress::{Antichain, Timestamp as _}; +use timely::{Data, PartialOrder}; +use tracing::debug; + +use crate::compute_state::ComputeState; +use crate::render::sinks::SinkRender; +use crate::render::StartSignal; + +pub(crate) struct ContinualTaskCtx> { + name: Option, + dataflow_as_of: Option>, + ct_inputs: BTreeSet, + pub ct_times: Vec>, +} + +impl> ContinualTaskCtx { + pub fn new(dataflow: &DataflowDescription) -> Self { + let mut name = None; + let mut ct_inputs = BTreeSet::new(); + for (sink_id, sink) in &dataflow.sink_exports { + match &sink.connection { + ComputeSinkConnection::ContinualTask(ContinualTaskConnection { + input_id, .. + }) => { + ct_inputs.insert(*input_id); + // There's only one CT sink per dataflow at this point. + assert_eq!(name, None); + name = Some(sink_id.to_string()); + } + _ => continue, + } + } + let mut ret = ContinualTaskCtx { + name, + dataflow_as_of: None, + ct_inputs, + ct_times: Vec::new(), + }; + // Only clone the as_of if we're in a CT dataflow. + if ret.is_ct_dataflow() { + ret.dataflow_as_of = dataflow.as_of.clone(); + // Sanity check that we have a name if we're in a CT dataflow. + assert!(ret.name.is_some()); + } + ret + } + + pub fn is_ct_dataflow(&self) -> bool { + !self.ct_inputs.is_empty() + } + + pub fn get_ct_inserts_transformer>( + &self, + source_id: GlobalId, + ) -> Option< + impl FnOnce( + Collection, + Collection, + ) -> ( + Collection, + Collection, + Collection, + ), + > { + if !self.ct_inputs.contains(&source_id) { + return None; + } + + // Make a collection s.t, for each time T in the input, the output + // contains the inserts at T. + let inserts_source_fn = + move |oks: Collection, errs: Collection| { + let name = source_id.to_string(); + // Keep only the inserts. + // + // TODO(ct): At some point this will become a user option to instead + // keep only deletes. + let oks = oks.inner.filter(|(_, _, diff)| *diff > 0); + // Grab the original times for use in the sink operator. + let times = oks.map(|(_row, ts, diff)| ((), ts, diff)); + // SUBTLE: See the big module rustdoc for what's going on here. + let oks = step_backward(&name, oks.as_collection()); + let errs = step_backward(&name, errs); + // Then retract everything at the next timestamp. + let oks = oks.inner.flat_map(|(row, ts, diff)| { + let mut negation = diff.clone(); + differential_dataflow::Diff::negate(&mut negation); + [(row.clone(), ts.step_forward(), negation), (row, ts, diff)] + }); + (oks.as_collection(), errs, times.as_collection()) + }; + Some(inserts_source_fn) + } + + pub fn input_times(&self) -> Option> { + let (Some(name), Some(first)) = (self.name.as_ref(), self.ct_times.first()) else { + return None; + }; + let ct_times = differential_dataflow::collection::concatenate( + &mut first.scope(), + self.ct_times.iter().cloned(), + ); + // Reduce this down to one update per-time-per-worker before exchanging + // it, so we don't waste work on unnecessarily high data volumes. + let ct_times = times_reduce(name, ct_times); + Some(ct_times) + } +} + +impl SinkRender for ContinualTaskConnection +where + G: Scope, +{ + fn render_sink( + &self, + compute_state: &mut ComputeState, + _sink: &ComputeSinkDesc, + sink_id: GlobalId, + as_of: Antichain, + start_signal: StartSignal, + oks: Collection, + errs: Collection, + append_times: Option>, + ) -> Option> { + let name = sink_id.to_string(); + + // SUBTLE: See the big module rustdoc for what's going on here. + let oks = step_forward(&name, oks); + let errs = step_forward(&name, errs); + + let to_append = oks + .map(|x| SourceData(Ok(x))) + .concat(&errs.map(|x| SourceData(Err(x)))); + let append_times = append_times.expect("should be provided by ContinualTaskCtx"); + + let write_handle = { + let clients = Arc::clone(&compute_state.persist_clients); + let metadata = self.storage_metadata.clone(); + let handle_purpose = format!("ct_sink({})", name); + async move { + let client = clients + .open(metadata.persist_location) + .await + .expect("valid location"); + client + .open_writer( + metadata.data_shard, + metadata.relation_desc.into(), + UnitSchema.into(), + Diagnostics { + shard_name: sink_id.to_string(), + handle_purpose, + }, + ) + .await + .expect("codecs should match") + } + }; + + let collection = compute_state.expect_collection_mut(sink_id); + let mut probe = ProbeHandle::default(); + let to_append = to_append.probe_with(&mut probe); + collection.compute_probe = Some(probe); + let sink_write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum()))); + collection.sink_write_frontier = Some(Rc::clone(&sink_write_frontier)); + + // TODO(ct): Obey `compute_state.read_only_rx` + // + // Seemingly, the read-only env needs to tail the output shard and keep + // historical updates around until it sees that the output frontier + // advances beyond their times. + let sink_button = continual_task_sink( + &name, + to_append, + append_times, + as_of, + write_handle, + start_signal, + sink_write_frontier, + ); + Some(Rc::new(sink_button.press_on_drop())) + } +} + +fn continual_task_sink>( + name: &str, + to_append: Collection, + append_times: Collection, + as_of: Antichain, + write_handle: impl Future> + Send + 'static, + start_signal: StartSignal, + output_frontier: Rc>>, +) -> Button { + let scope = to_append.scope(); + let mut op = AsyncOperatorBuilder::new(format!("ct_sink({})", name), scope.clone()); + + // TODO(ct): This all works perfectly well data parallel (assuming we + // broadcast the append_times). We just need to hook it up to the + // multi-worker persist-sink, but that requires some refactoring. This would + // also remove the need for this to be an async timely operator. + let active_worker = name.hashed(); + let to_append_input = + op.new_input_for_many(&to_append.inner, Exchange::new(move |_| active_worker), []); + let append_times_input = op.new_input_for_many( + &append_times.inner, + Exchange::new(move |_| active_worker), + [], + ); + + let active_worker = usize::cast_from(active_worker) % scope.peers() == scope.index(); + let button = op.build(move |_capabilities| async move { + if !active_worker { + output_frontier.borrow_mut().clear(); + return; + } + + // SUBTLE: The start_signal below may not be unblocked by the compute + // controller until it thinks the inputs are "ready" (i.e. readable at + // the as_of), but if the CT is self-referential, one of the inputs will + // be the output (which starts at `T::minimum()`, not the as_of). To + // break this cycle, before we even get the start signal, go ahead and + // advance the output's (exclusive) upper to the first time that this CT + // might write: `as_of+1`. Because we don't want this to happen on + // restarts, only do it if the upper is `T::minimum()`. + let mut write_handle = write_handle.await; + { + let new_upper = as_of.into_iter().map(|x| x.step_forward()).collect(); + let res = write_handle + .compare_and_append_batch( + &mut [], + Antichain::from_elem(Timestamp::minimum()), + new_upper, + ) + .await + .expect("usage was valid"); + match res { + // We advanced the upper. + Ok(()) => {} + // Someone else advanced the upper. + Err(UpperMismatch { .. }) => {} + } + } + + let () = start_signal.await; + + #[derive(Debug)] + enum OpEvent { + ToAppend(Event>), + AppendTimes(Event>), + } + + impl OpEvent { + fn apply(self, state: &mut SinkState) { + debug!("ct_sink event {:?}", self); + match self { + OpEvent::ToAppend(Event::Data(_cap, x)) => state + .to_append + .extend(x.into_iter().map(|(k, t, d)| ((k, ()), t, d))), + OpEvent::ToAppend(Event::Progress(x)) => state.to_append_progress = x, + OpEvent::AppendTimes(Event::Data(_cap, x)) => state + .append_times + .extend(x.into_iter().map(|((), t, _d)| t)), + OpEvent::AppendTimes(Event::Progress(x)) => state.append_times_progress = x, + } + } + } + + let to_insert_input = to_append_input.map(OpEvent::ToAppend); + let append_times_input = append_times_input.map(OpEvent::AppendTimes); + let mut op_inputs = futures::stream::select(to_insert_input, append_times_input); + + let mut state = SinkState::new(); + loop { + if PartialOrder::less_than(&*output_frontier.borrow(), &state.output_progress) { + output_frontier.borrow_mut().clear(); + output_frontier + .borrow_mut() + .extend(state.output_progress.iter().cloned()); + } + let Some(event) = op_inputs.next().await else { + // Inputs exhausted, shutting down. + output_frontier.borrow_mut().clear(); + return; + }; + event.apply(&mut state); + // Also drain any other events that may be ready. + while let Some(Some(event)) = op_inputs.next().now_or_never() { + event.apply(&mut state); + } + debug!("ct_sink about to process {:?}", state); + let Some((new_upper, to_append)) = state.process() else { + continue; + }; + debug!("ct_sink got write {:?}: {:?}", new_upper, to_append); + + let mut expected_upper = write_handle.shared_upper(); + while PartialOrder::less_than(&expected_upper, &new_upper) { + let res = write_handle + .compare_and_append(&to_append, expected_upper.clone(), new_upper.clone()) + .await + .expect("usage was valid"); + match res { + Ok(()) => { + state.output_progress = new_upper; + break; + } + Err(err) => { + expected_upper = err.current; + continue; + } + } + } + } + }); + + button +} + +#[derive(Debug)] +struct SinkState { + /// The known times at which we're going to write data to the output. This + /// is guaranteed to include all times < append_times_progress, except that + /// ones < output_progress may have been truncated. + append_times: BTreeSet, + append_times_progress: Antichain, + + /// The data we've collected to append to the output. This is often + /// compacted to advancing times and is expected to be ~empty in the steady + /// state. + to_append: Vec<((K, V), T, D)>, + to_append_progress: Antichain, + + /// A lower bound on the upper of the output. + output_progress: Antichain, +} + +impl SinkState { + fn new() -> Self { + SinkState { + append_times: BTreeSet::new(), + append_times_progress: Antichain::from_elem(Timestamp::minimum()), + to_append: Vec::new(), + to_append_progress: Antichain::from_elem(Timestamp::minimum()), + output_progress: Antichain::from_elem(Timestamp::minimum()), + } + } + + /// Returns data to write to the output, if any, and the new upper to use. + /// + /// TODO(ct): Remove the Vec allocation here. + fn process(&mut self) -> Option<(Antichain, Vec<((&K, &V), &Timestamp, &D)>)> { + // We can only append at times >= the output_progress, so pop off + // anything unnecessary. + while let Some(x) = self.append_times.first() { + if self.output_progress.less_equal(x) { + break; + } + self.append_times.pop_first(); + } + + // Find the smallest append_time before append_time_progress. This is + // the next time we might need to write data at. Note that we can only + // act on append_times once the progress has passed them, because they + // could come out of order. + let write_ts = match self.append_times.first() { + Some(x) if !self.append_times_progress.less_equal(x) => x, + Some(_) | None => { + // The CT sink's contract is that it only writes data at times + // we received an input diff. There are none in + // `[output_progress, append_times_progress)`, so we can go + // ahead and advance the upper of the output, if it's not + // already. + // + // We could instead ensure liveness by basing this off of + // to_append, but for any CTs reading the output (expected to be + // a common case) we'd end up looping each timestamp through + // persist one-by-one. + if PartialOrder::less_than(&self.output_progress, &self.append_times_progress) { + return Some((self.append_times_progress.clone(), Vec::new())); + } + // Otherwise, nothing to do! + return None; + } + }; + + if self.to_append_progress.less_equal(write_ts) { + // Don't have all the necessary data yet. + if self.output_progress.less_than(write_ts) { + // We can advance the output upper up to the write_ts. For + // self-referential CTs this might be necessary to ensure + // dataflow progress. + return Some((Antichain::from_elem(write_ts.clone()), Vec::new())); + } + return None; + } + + // Time to write some data! Produce the collection as of write_ts by + // advancing timestamps, consolidating, and filtering out anything at + // future timestamps. + let as_of = &[write_ts.clone()]; + for (_, t, _) in &mut self.to_append { + t.advance_by(AntichainRef::new(as_of)) + } + // TODO(ct): Metrics for vec len and cap. + consolidate_updates(&mut self.to_append); + // TODO(ct): Resize the vec down if cap >> len? Or perhaps `Correction` + // might already be very close to what we need. + + let append_data = self + .to_append + .iter() + .filter_map(|((k, v), t, d)| (t <= write_ts).then_some(((k, v), t, d))) + .collect(); + Some((Antichain::from_elem(write_ts.step_forward()), append_data)) + } +} + +// TODO(ct): Write this as a non-async operator. +// +// Unfortunately, we can _almost_ use the stock `delay` operator, but not quite. +// This must advance both data and the output frontier forward, while delay only +// advances the data. +fn step_backward(name: &str, input: Collection) -> Collection +where + G: Scope, + D: Data, + R: Semigroup + 'static, +{ + let name = format!("ct_step_backward({})", name); + let mut builder = AsyncOperatorBuilder::new(name, input.scope()); + let (mut output, output_stream) = builder.new_output(); + let mut input = builder.new_input_for(&input.inner, Pipeline, &output); + builder.build(move |caps| async move { + let [mut cap]: [_; 1] = caps.try_into().expect("one capability per output"); + loop { + let Some(event) = input.next().await else { + return; + }; + match event { + Event::Data(_data_cap, mut data) => { + for (_, ts, _) in &mut data { + *ts = ts + .step_back() + .expect("should only receive data at times past the as_of"); + } + output.give_container(&cap, &mut data); + } + Event::Progress(new_progress) => { + let new_progress = new_progress.into_option().and_then(|x| x.step_back()); + let Some(new_progress) = new_progress else { + continue; + }; + if cap.time() < &new_progress { + cap.downgrade(&new_progress); + } + } + } + } + }); + + output_stream.as_collection() +} + +// TODO(ct): Write this as a non-async operator. +fn step_forward(name: &str, input: Collection) -> Collection +where + G: Scope, + D: Data, + R: Semigroup + 'static, +{ + let name = format!("ct_step_forward({})", name); + let mut builder = AsyncOperatorBuilder::new(name.clone(), input.scope()); + let mut input = builder.new_disconnected_input(&input.inner, Pipeline); + let (mut output, output_stream) = builder.new_output(); + builder.build(move |caps| async move { + let [mut cap]: [_; 1] = caps.try_into().expect("one capability per output"); + loop { + let Some(event) = input.next().await else { + return; + }; + match event { + Event::Data(_data_cap, mut data) => { + for (_, ts, _) in &mut data { + *ts = ts.step_forward(); + } + output.give_container(&cap, &mut data); + } + Event::Progress(progress) => { + if let Some(progress) = progress.into_option() { + cap.downgrade(&progress.step_forward()); + } + } + } + } + }); + + output_stream.as_collection() +} + +// This is essentially a specialized impl of consolidate, with a HashMap instead +// of the Trace. +fn times_reduce(name: &str, input: Collection) -> Collection +where + G: Scope, + R: Semigroup + 'static + std::fmt::Debug, +{ + let name = format!("ct_times_reduce({})", name); + input + .inner + .unary_frontier(Pipeline, &name, |_caps, _info| { + let mut notificator = FrontierNotificator::new(); + let mut stash = HashMap::<_, R>::new(); + let mut buf = Vec::new(); + move |input, output| { + while let Some((cap, data)) = input.next() { + data.swap(&mut buf); + for ((), ts, diff) in buf.drain(..) { + notificator.notify_at(cap.delayed(&ts)); + if let Some(sum) = stash.get_mut(&ts) { + sum.plus_equals(&diff); + } else { + stash.insert(ts, diff); + } + } + } + notificator.for_each(&[input.frontier()], |cap, _not| { + if let Some(diff) = stash.remove(cap.time()) { + output.session(&cap).give(((), cap.time().clone(), diff)); + } + }); + } + }) + .as_collection() +} + +#[cfg(test)] +mod tests { + use differential_dataflow::AsCollection; + use mz_repr::Timestamp; + use timely::dataflow::operators::capture::Extract; + use timely::dataflow::operators::{Capture, Input, ToStream}; + use timely::dataflow::ProbeHandle; + use timely::progress::Antichain; + use timely::Config; + + #[mz_ore::test] + fn step_forward() { + timely::execute(Config::thread(), |worker| { + let (mut input, probe, output) = worker.dataflow(|scope| { + let (handle, input) = scope.new_input(); + let mut probe = ProbeHandle::::new(); + let output = super::step_forward("test", input.as_collection()) + .probe_with(&mut probe) + .inner + .capture(); + (handle, probe, output) + }); + + let mut expected = Vec::new(); + for i in 0u64..10 { + let in_ts = Timestamp::new(i); + let out_ts = in_ts.step_forward(); + input.send((i, in_ts, 1)); + input.advance_to(in_ts.step_forward()); + + // We should get the data out advanced by `step_forward` and + // also, crucially, the output frontier should do the same (i.e. + // this is why we can't simply use `Collection::delay`). + worker.step_while(|| probe.less_than(&out_ts.step_forward())); + expected.push((i, out_ts, 1)); + } + // Closing the input should allow the output to advance and the + // dataflow to shut down. + input.close(); + while worker.step() {} + + let actual = output + .extract() + .into_iter() + .flat_map(|x| x.1) + .collect::>(); + assert_eq!(actual, expected); + }) + .unwrap(); + } + + #[mz_ore::test] + fn step_backward() { + timely::execute(Config::thread(), |worker| { + let (mut input, probe, output) = worker.dataflow(|scope| { + let (handle, input) = scope.new_input(); + let mut probe = ProbeHandle::::new(); + let output = super::step_backward("test", input.as_collection()) + .probe_with(&mut probe) + .inner + .capture(); + (handle, probe, output) + }); + + let mut expected = Vec::new(); + for i in 0u64..10 { + // Notice that these are declared backward: out_ts < in_ts + let out_ts = Timestamp::new(i); + let in_ts = out_ts.step_forward(); + input.send((i, in_ts, 1)); + input.advance_to(in_ts.step_forward()); + + // We should get the data out regressed by `step_backward`. + worker.step_while(|| probe.less_than(&out_ts.step_forward())); + expected.push((i, out_ts, 1)); + } + // Closing the input should allow the output to advance and the + // dataflow to shut down. + input.close(); + while worker.step() {} + + let actual = output + .extract() + .into_iter() + .flat_map(|x| x.1) + .collect::>(); + assert_eq!(actual, expected); + }) + .unwrap(); + } + + #[mz_ore::test] + fn times_reduce() { + let output = timely::execute_directly(|worker| { + worker.dataflow(|scope| { + let input = [ + ((), Timestamp::new(3), 1), + ((), Timestamp::new(2), 1), + ((), Timestamp::new(1), 1), + ((), Timestamp::new(2), 1), + ((), Timestamp::new(3), 1), + ((), Timestamp::new(3), 1), + ] + .to_stream(scope) + .as_collection(); + super::times_reduce("test", input).inner.capture() + }) + }); + let expected = vec![ + ((), Timestamp::new(1), 1), + ((), Timestamp::new(2), 2), + ((), Timestamp::new(3), 3), + ]; + let actual = output + .extract() + .into_iter() + .flat_map(|x| x.1) + .collect::>(); + assert_eq!(actual, expected); + } + + #[mz_ore::test] + fn ct_sink_state() { + #[track_caller] + fn assert_noop(state: &mut super::SinkState<&'static str, (), Timestamp, i64>) { + if let Some(ret) = state.process() { + panic!("should be nothing to write: {:?}", ret); + } + } + + #[track_caller] + fn assert_write( + state: &mut super::SinkState<&'static str, (), Timestamp, i64>, + expected_upper: u64, + expected_append: &[&str], + ) { + let (new_upper, to_append) = state.process().expect("should be something to write"); + assert_eq!( + new_upper, + Antichain::from_elem(Timestamp::new(expected_upper)) + ); + let to_append = to_append + .into_iter() + .map(|((k, ()), _ts, _diff)| *k) + .collect::>(); + assert_eq!(to_append, expected_append); + } + + let mut s = super::SinkState::new(); + + // Nothing to do at the initial state. + assert_noop(&mut s); + + // Getting data to append is not enough to do anything yet. + s.to_append.push((("a", ()), 1.into(), 1)); + s.to_append.push((("b", ()), 1.into(), 1)); + assert_noop(&mut s); + + // Knowing that this is the only data we'll get for that timestamp is + // still not enough. + s.to_append_progress = Antichain::from_elem(2.into()); + assert_noop(&mut s); + + // Even knowing that we got input at that time is not quite enough yet + // (we could be getting these out of order). + s.append_times.insert(1.into()); + assert_noop(&mut s); + + // Indeed, it did come out of order. Also note that this checks the == + // case for time vs progress. + s.append_times.insert(0.into()); + assert_noop(&mut s); + + // Okay, now we know that we've seen all the times we got input up to 3. + // This is enough to allow the empty write of `[0,1)`. + s.append_times_progress = Antichain::from_elem(3.into()); + assert_write(&mut s, 1, &[]); + + // That succeeded, now we can write the data at 1. + s.output_progress = Antichain::from_elem(1.into()); + assert_write(&mut s, 2, &["a", "b"]); + + // That succeeded, now we know about some empty time. + s.output_progress = Antichain::from_elem(2.into()); + assert_write(&mut s, 3, &[]); + + // That succeeded, now nothing to do. + s.output_progress = Antichain::from_elem(3.into()); + assert_noop(&mut s); + + // Find out about a new time to write at. Even without the data, we can + // do an empty write up to that time. + s.append_times.insert(5.into()); + s.append_times_progress = Antichain::from_elem(6.into()); + assert_write(&mut s, 5, &[]); + + // That succeeded, now nothing to do again. + s.output_progress = Antichain::from_elem(5.into()); + + // Retract one of the things currently in the collection and add a new + // thing, to verify the consolidate. + s.to_append.push((("a", ()), 5.into(), -1)); + s.to_append.push((("c", ()), 5.into(), 1)); + s.to_append_progress = Antichain::from_elem(6.into()); + assert_write(&mut s, 6, &["b", "c"]); + } +} diff --git a/src/compute/src/render/sinks.rs b/src/compute/src/render/sinks.rs index f6a72eaf20bfe..aecf6d43eadee 100644 --- a/src/compute/src/render/sinks.rs +++ b/src/compute/src/render/sinks.rs @@ -47,6 +47,7 @@ where sink_id: GlobalId, sink: &ComputeSinkDesc, start_signal: StartSignal, + ct_times: Option>, ) { soft_assert_or_log!( sink.non_null_assertions.is_strictly_sorted(), @@ -143,6 +144,7 @@ where start_signal, ok_collection.enter_region(inner), err_collection.enter_region(inner), + ct_times.map(|x| x.enter_region(inner)), ); if let Some(sink_token) = sink_token { @@ -169,6 +171,9 @@ where start_signal: StartSignal, sinked_collection: Collection, err_collection: Collection, + // TODO(ct): Figure out a better way to smuggle this in, potentially by + // removing the `SinkRender` trait entirely. + ct_times: Option>, ) -> Option>; } diff --git a/src/compute/src/sink.rs b/src/compute/src/sink.rs index b7c6e61d4fd63..753a42b84b95a 100644 --- a/src/compute/src/sink.rs +++ b/src/compute/src/sink.rs @@ -7,7 +7,6 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -pub(crate) mod continual_task; mod copy_to_s3_oneshot; mod correction; mod persist_sink; diff --git a/src/compute/src/sink/continual_task.rs b/src/compute/src/sink/continual_task.rs deleted file mode 100644 index 836dfb2332670..0000000000000 --- a/src/compute/src/sink/continual_task.rs +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright Materialize, Inc. and contributors. All rights reserved. -// -// Use of this software is governed by the Business Source License -// included in the LICENSE file. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0. - -use std::any::Any; -use std::rc::Rc; - -use differential_dataflow::Collection; -use mz_compute_types::sinks::{ComputeSinkDesc, ContinualTaskConnection}; -use mz_repr::{Diff, GlobalId, Row, Timestamp}; -use mz_storage_types::controller::CollectionMetadata; -use mz_storage_types::errors::DataflowError; -use timely::dataflow::Scope; -use timely::progress::Antichain; - -use crate::compute_state::ComputeState; -use crate::render::sinks::SinkRender; -use crate::render::StartSignal; - -impl SinkRender for ContinualTaskConnection -where - G: Scope, -{ - fn render_sink( - &self, - compute_state: &mut ComputeState, - sink: &ComputeSinkDesc, - sink_id: GlobalId, - as_of: Antichain, - start_signal: StartSignal, - oks: Collection, - errs: Collection, - ) -> Option> { - todo!( - "WIP {:?}", - ( - std::any::type_name_of_val(&compute_state), - sink, - sink_id, - as_of, - std::any::type_name_of_val(&start_signal), - std::any::type_name_of_val(&oks), - std::any::type_name_of_val(&errs), - ) - ); - } -} diff --git a/src/compute/src/sink/copy_to_s3_oneshot.rs b/src/compute/src/sink/copy_to_s3_oneshot.rs index 9c0a9cebe5eaa..3809f826d8c5c 100644 --- a/src/compute/src/sink/copy_to_s3_oneshot.rs +++ b/src/compute/src/sink/copy_to_s3_oneshot.rs @@ -45,6 +45,7 @@ where _start_signal: StartSignal, sinked_collection: Collection, err_collection: Collection, + _ct_times: Option>, ) -> Option> { // An encapsulation of the copy to response protocol. // Used to send rows and errors if this fails. diff --git a/src/compute/src/sink/persist_sink.rs b/src/compute/src/sink/persist_sink.rs index a2ee9846e4349..acb29945b982a 100644 --- a/src/compute/src/sink/persist_sink.rs +++ b/src/compute/src/sink/persist_sink.rs @@ -60,6 +60,7 @@ where start_signal: StartSignal, mut ok_collection: Collection, mut err_collection: Collection, + _ct_times: Option>, ) -> Option> { // Attach a probe reporting the compute frontier. // The `apply_refresh` operator can round up frontiers, making it impossible to accurately diff --git a/src/compute/src/sink/subscribe.rs b/src/compute/src/sink/subscribe.rs index 9768a09ba976b..11a6ac9d17c49 100644 --- a/src/compute/src/sink/subscribe.rs +++ b/src/compute/src/sink/subscribe.rs @@ -42,6 +42,7 @@ where _start_signal: StartSignal, sinked_collection: Collection, err_collection: Collection, + _ct_times: Option>, ) -> Option> { // An encapsulation of the Subscribe response protocol. // Used to send rows and progress messages, diff --git a/src/sql-parser/src/parser.rs b/src/sql-parser/src/parser.rs index a5e04ae167e2f..979f5f56d2a0c 100644 --- a/src/sql-parser/src/parser.rs +++ b/src/sql-parser/src/parser.rs @@ -3587,6 +3587,7 @@ impl<'a> Parser<'a> { let name = self.parse_item_name()?; self.expect_token(&Token::LParen)?; let columns = self.parse_comma_separated(|parser| { + // TODO(ct): NOT NULL, etc. Ok(CteMutRecColumnDef { name: parser.parse_identifier()?, data_type: parser.parse_data_type()?, diff --git a/src/sql/src/names.rs b/src/sql/src/names.rs index 75800afe84e13..576cf40aeb6ea 100644 --- a/src/sql/src/names.rs +++ b/src/sql/src/names.rs @@ -22,7 +22,7 @@ use mz_ore::str::StrExt; use mz_repr::role_id::RoleId; use mz_repr::ColumnName; use mz_repr::GlobalId; -use mz_sql_parser::ast::Expr; +use mz_sql_parser::ast::{CreateContinualTaskStatement, Expr}; use mz_sql_parser::ident; use proptest_derive::Arbitrary; use serde::{Deserialize, Serialize}; @@ -1569,6 +1569,17 @@ impl<'a> Fold for NameResolver<'a> { result } + fn fold_create_continual_task_statement( + &mut self, + stmt: CreateContinualTaskStatement, + ) -> CreateContinualTaskStatement { + // TODO(ct): Insert a fake CTE so that using the name of the continual + // task in the inserts and deletes resolves. + let cte_name = normalize::ident(stmt.name.0.last().expect("TODO(ct)").clone()); + self.ctes.insert(cte_name, LocalId::new(0)); + mz_sql_parser::ast::fold::fold_create_continual_task_statement(self, stmt) + } + fn fold_cte_id(&mut self, _id: ::CteId) -> ::CteId { panic!("this should have been handled when walking the CTE"); } diff --git a/src/sql/src/normalize.rs b/src/sql/src/normalize.rs index 642825adb2578..09212b825fbb4 100644 --- a/src/sql/src/normalize.rs +++ b/src/sql/src/normalize.rs @@ -21,12 +21,13 @@ use mz_repr::{ColumnName, GlobalId}; use mz_sql_parser::ast::display::AstDisplay; use mz_sql_parser::ast::visit_mut::{self, VisitMut}; use mz_sql_parser::ast::{ - CreateConnectionStatement, CreateContinualTaskStatement, CreateIndexStatement, - CreateMaterializedViewStatement, CreateSecretStatement, CreateSinkStatement, - CreateSourceStatement, CreateSubsourceStatement, CreateTableFromSourceStatement, - CreateTableStatement, CreateTypeStatement, CreateViewStatement, CreateWebhookSourceStatement, - CteBlock, Function, FunctionArgs, Ident, IfExistsBehavior, MutRecBlock, Op, Query, Statement, - TableFactor, UnresolvedItemName, UnresolvedSchemaName, Value, ViewDefinition, + ContinualTaskStmt, CreateConnectionStatement, CreateContinualTaskStatement, + CreateIndexStatement, CreateMaterializedViewStatement, CreateSecretStatement, + CreateSinkStatement, CreateSourceStatement, CreateSubsourceStatement, + CreateTableFromSourceStatement, CreateTableStatement, CreateTypeStatement, CreateViewStatement, + CreateWebhookSourceStatement, CteBlock, Function, FunctionArgs, Ident, IfExistsBehavior, + MutRecBlock, Op, Query, Statement, TableFactor, UnresolvedItemName, UnresolvedSchemaName, + Value, ViewDefinition, }; use crate::names::{Aug, FullItemName, PartialItemName, PartialSchemaName, RawDatabaseSpecifier}; @@ -417,12 +418,23 @@ pub fn create_statement( Statement::CreateContinualTask(CreateContinualTaskStatement { name, - columns, + columns: _, input, stmts, - in_cluster, + in_cluster: _, }) => { - todo!("WIP {:?}", (name, columns, input, stmts, in_cluster)); + *name = allocate_name(name)?; + let mut normalizer = QueryNormalizer::new(); + normalizer.visit_item_name_mut(input); + for stmt in stmts { + match stmt { + ContinualTaskStmt::Delete(stmt) => normalizer.visit_delete_statement_mut(stmt), + ContinualTaskStmt::Insert(stmt) => normalizer.visit_insert_statement_mut(stmt), + } + } + if let Some(err) = normalizer.err { + return Err(err); + } } Statement::CreateIndex(CreateIndexStatement { diff --git a/src/sql/src/plan.rs b/src/sql/src/plan.rs index d8763dd9ca178..897069e61a332 100644 --- a/src/sql/src/plan.rs +++ b/src/sql/src/plan.rs @@ -707,7 +707,14 @@ pub struct CreateMaterializedViewPlan { } #[derive(Debug, Clone)] -pub struct CreateContinualTaskPlan {} +pub struct CreateContinualTaskPlan { + pub name: QualifiedItemName, + pub desc: RelationDesc, + // TODO(ct): Multiple inputs. + pub input_id: GlobalId, + pub continual_task: MaterializedView, + // TODO(ct): replace, drop_ids, if_not_exists +} #[derive(Debug, Clone)] pub struct CreateIndexPlan { diff --git a/src/sql/src/plan/query.rs b/src/sql/src/plan/query.rs index 1dad7b41c41ea..e577eb593dec4 100644 --- a/src/sql/src/plan/query.rs +++ b/src/sql/src/plan/query.rs @@ -165,6 +165,57 @@ pub fn plan_root_query( }) } +/// TODO(ct): Dedup this with [plan_root_query]. +#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")] +pub fn plan_ct_query( + qcx: &mut QueryContext, + mut query: Query, +) -> Result, PlanError> { + transform_ast::transform(qcx.scx, &mut query)?; + let PlannedQuery { + mut expr, + scope, + order_by, + limit, + offset, + project, + group_size_hints, + } = plan_query(qcx, &query)?; + + let mut finishing = RowSetFinishing { + limit, + offset, + project, + order_by, + }; + + // Attempt to push the finishing's ordering past its projection. This allows + // data to be projected down on the workers rather than the coordinator. It + // also improves the optimizer's demand analysis, as the optimizer can only + // reason about demand information in `expr` (i.e., it can't see + // `finishing.project`). + try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by); + + expr.finish_maintained(&mut finishing, group_size_hints); + + let typ = qcx.relation_type(&expr); + let typ = RelationType::new( + finishing + .project + .iter() + .map(|i| typ.column_types[*i].clone()) + .collect(), + ); + let desc = RelationDesc::new(typ, scope.column_names()); + + Ok(PlannedRootQuery { + expr, + desc, + finishing, + scope, + }) +} + /// Attempts to push a projection through an order by. /// /// The returned bool indicates whether the pushdown was successful or not. @@ -6083,8 +6134,8 @@ impl QueryLifetime { /// Description of a CTE sufficient for query planning. #[derive(Debug, Clone)] pub struct CteDesc { - name: String, - desc: RelationDesc, + pub name: String, + pub desc: RelationDesc, } /// The state required when planning a `Query`. diff --git a/src/sql/src/plan/statement/ddl.rs b/src/sql/src/plan/statement/ddl.rs index d89c3379bae83..2f92a306fb6c8 100644 --- a/src/sql/src/plan/statement/ddl.rs +++ b/src/sql/src/plan/statement/ddl.rs @@ -22,7 +22,7 @@ use mz_adapter_types::compaction::{CompactionWindow, DEFAULT_LOGICAL_COMPACTION_ use mz_controller_types::{ is_cluster_size_v2, ClusterId, ReplicaId, DEFAULT_REPLICA_LOGGING_INTERVAL, }; -use mz_expr::{CollectionPlan, UnmaterializableFunc}; +use mz_expr::{CollectionPlan, LocalId, UnmaterializableFunc}; use mz_interchange::avro::{AvroSchemaGenerator, DocTarget}; use mz_ore::cast::{CastFrom, TryCastFrom}; use mz_ore::collections::{CollectionExt, HashSet}; @@ -124,7 +124,9 @@ use crate::names::{ }; use crate::normalize::{self, ident}; use crate::plan::error::PlanError; -use crate::plan::query::{plan_expr, scalar_type_from_catalog, ExprContext, QueryLifetime}; +use crate::plan::query::{ + plan_expr, scalar_type_from_catalog, scalar_type_from_sql, CteDesc, ExprContext, QueryLifetime, +}; use crate::plan::scope::Scope; use crate::plan::statement::ddl::connection::{INALTERABLE_OPTIONS, MUTUALLY_EXCLUSIVE_SETS}; use crate::plan::statement::{scl, StatementContext, StatementDesc}; @@ -139,15 +141,16 @@ use crate::plan::{ AlterSystemResetPlan, AlterSystemSetPlan, AlterTablePlan, ClusterSchedule, CommentPlan, ComputeReplicaConfig, ComputeReplicaIntrospectionConfig, ConnectionDetails, CreateClusterManagedPlan, CreateClusterPlan, CreateClusterReplicaPlan, - CreateClusterUnmanagedPlan, CreateClusterVariant, CreateConnectionPlan, CreateDatabasePlan, - CreateIndexPlan, CreateMaterializedViewPlan, CreateRolePlan, CreateSchemaPlan, - CreateSecretPlan, CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan, - CreateViewPlan, DataSourceDesc, DropObjectsPlan, DropOwnedPlan, FullItemName, Index, Ingestion, - MaterializedView, Params, Plan, PlanClusterOption, PlanNotice, QueryContext, ReplicaConfig, - Secret, Sink, Source, Table, TableDataSource, Type, VariableValue, View, WebhookBodyFormat, - WebhookHeaderFilters, WebhookHeaders, WebhookValidation, + CreateClusterUnmanagedPlan, CreateClusterVariant, CreateConnectionPlan, + CreateContinualTaskPlan, CreateDatabasePlan, CreateIndexPlan, CreateMaterializedViewPlan, + CreateRolePlan, CreateSchemaPlan, CreateSecretPlan, CreateSinkPlan, CreateSourcePlan, + CreateTablePlan, CreateTypePlan, CreateViewPlan, DataSourceDesc, DropObjectsPlan, + DropOwnedPlan, FullItemName, Index, Ingestion, MaterializedView, Params, Plan, + PlanClusterOption, PlanNotice, QueryContext, ReplicaConfig, Secret, Sink, Source, Table, + TableDataSource, Type, VariableValue, View, WebhookBodyFormat, WebhookHeaderFilters, + WebhookHeaders, WebhookValidation, }; -use crate::session::vars; +use crate::session::vars::{self, ENABLE_CREATE_CONTINUAL_TASK}; use crate::session::vars::{ ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_KAFKA_SINK_HEADERS, ENABLE_KAFKA_SINK_PARTITION_BY, ENABLE_REFRESH_EVERY_MVS, @@ -2715,10 +2718,176 @@ generate_extracted_config!( pub fn plan_create_continual_task( scx: &StatementContext, - stmt: CreateContinualTaskStatement, + mut stmt: CreateContinualTaskStatement, params: &Params, ) -> Result { - todo!("WIP {:?}", (scx, stmt, params)); + scx.require_feature_flag(&ENABLE_CREATE_CONTINUAL_TASK)?; + let cluster_id = match &stmt.in_cluster { + None => scx.catalog.resolve_cluster(None)?.id(), + Some(in_cluster) => in_cluster.id, + }; + stmt.in_cluster = Some(ResolvedClusterName { + id: cluster_id, + print_name: None, + }); + + let create_sql = + normalize::create_statement(scx, Statement::CreateContinualTask(stmt.clone()))?; + + let partial_name = normalize::unresolved_item_name(stmt.name)?; + let name = scx.allocate_qualified_name(partial_name.clone())?; + let desc = { + let mut desc_columns = Vec::with_capacity(stmt.columns.capacity()); + for col in stmt.columns.iter() { + desc_columns.push(( + normalize::column_name(col.name.clone()), + ColumnType { + scalar_type: scalar_type_from_sql(scx, &col.data_type)?, + nullable: true, + }, + )); + } + RelationDesc::from_names_and_types(desc_columns) + }; + let input = scx.get_item_by_resolved_name(&stmt.input)?; + + let mut qcx = QueryContext::root(scx, QueryLifetime::MaterializedView); + qcx.ctes.insert( + LocalId::new(0), + CteDesc { + name: name.item.clone(), + desc: desc.clone(), + }, + ); + + let mut exprs = Vec::new(); + for stmt in &stmt.stmts { + let query = continual_task_query(&name, stmt).ok_or_else(|| sql_err!("TODO(ct)"))?; + let query::PlannedRootQuery { + mut expr, + desc: desc_query, + finishing, + scope: _, + } = query::plan_ct_query(&mut qcx, query)?; + // We get back a trivial finishing, see comment in `plan_view`. + assert!(finishing.is_trivial(expr.arity())); + // TODO(ct): Is this right? + expr.bind_parameters(params)?; + // TODO(ct): Make this error message more closely match the various ones + // given for INSERT/DELETE. + if desc_query.iter_types().ne(desc.iter_types()) { + sql_bail!( + "CONTINUAL TASK query columns did not match: {:?} vs {:?}", + desc_query.iter().collect::>(), + desc.iter().collect::>() + ); + } + match stmt { + ast::ContinualTaskStmt::Insert(_) => exprs.push(expr), + ast::ContinualTaskStmt::Delete(_) => exprs.push(expr.negate()), + } + } + // TODO(ct): Collect things by output and assert that there is only one (or + // support multiple outputs). + let expr = exprs + .into_iter() + .reduce(|acc, expr| acc.union(expr)) + .ok_or_else(|| sql_err!("TODO(ct)"))?; + + let column_names: Vec = desc.iter_names().cloned().collect(); + if let Some(dup) = column_names.iter().duplicates().next() { + sql_bail!("column {} specified more than once", dup.as_str().quoted()); + } + + // Check for an object in the catalog with this same name + let full_name = scx.catalog.resolve_full_name(&name); + let partial_name = PartialItemName::from(full_name.clone()); + // For PostgreSQL compatibility, we need to prevent creating this when there + // is an existing object *or* type of the same name. + if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) { + return Err(PlanError::ItemAlreadyExists { + name: full_name.to_string(), + item_type: item.item_type(), + }); + } + + Ok(Plan::CreateContinualTask(CreateContinualTaskPlan { + name, + desc, + input_id: input.id(), + continual_task: MaterializedView { + create_sql, + expr, + column_names, + cluster_id, + non_null_assertions: Vec::new(), + compaction_window: None, + refresh_schedule: None, + as_of: None, + }, + })) +} + +fn continual_task_query<'a>( + ct_name: &QualifiedItemName, + stmt: &'a ast::ContinualTaskStmt, +) -> Option> { + match stmt { + ast::ContinualTaskStmt::Insert(ast::InsertStatement { + table_name: _, + columns, + source, + returning, + }) => { + if !columns.is_empty() || !returning.is_empty() { + return None; + } + match source { + ast::InsertSource::Query(query) => Some(query.clone()), + ast::InsertSource::DefaultValues => None, + } + } + ast::ContinualTaskStmt::Delete(ast::DeleteStatement { + table_name: _, + alias, + using, + selection, + }) => { + if !using.is_empty() { + return None; + } + // Construct a `SELECT *` with the `DELETE` selection as a `WHERE`. + let from = ast::TableWithJoins { + relation: ast::TableFactor::Table { + // TODO(ct): Huge hack. + name: ResolvedItemName::Cte { + id: LocalId::new(0), + name: ct_name.item.clone(), + }, + alias: alias.clone(), + }, + joins: Vec::new(), + }; + let select = ast::Select { + from: vec![from], + selection: selection.clone(), + distinct: None, + projection: vec![ast::SelectItem::Wildcard], + group_by: Vec::new(), + having: None, + options: Vec::new(), + }; + let query = ast::Query { + ctes: ast::CteBlock::Simple(Vec::new()), + body: ast::SetExpr::Select(Box::new(select)), + order_by: Vec::new(), + limit: None, + offset: None, + }; + // Then negate it to turn it into retractions (after planning it). + Some(query) + } + } } pub fn describe_create_sink( diff --git a/src/sql/src/rbac.rs b/src/sql/src/rbac.rs index 92bba069ef542..4c008aa40549e 100644 --- a/src/sql/src/rbac.rs +++ b/src/sql/src/rbac.rs @@ -600,7 +600,27 @@ fn generate_rbac_requirements( item_usage: &CREATE_ITEM_USAGE, ..Default::default() }, - Plan::CreateContinualTask(plan::CreateContinualTaskPlan {}) => todo!("WIP"), + Plan::CreateContinualTask(plan::CreateContinualTaskPlan { + name, + desc: _, + input_id: _, + continual_task, + }) => RbacRequirements { + privileges: vec![ + ( + SystemObjectId::Object(name.qualifiers.clone().into()), + AclMode::CREATE, + role_id, + ), + ( + SystemObjectId::Object(continual_task.cluster_id.into()), + AclMode::CREATE, + role_id, + ), + ], + item_usage: &CREATE_ITEM_USAGE, + ..Default::default() + }, Plan::CreateIndex(plan::CreateIndexPlan { name, index, diff --git a/src/sql/src/session/vars/definitions.rs b/src/sql/src/session/vars/definitions.rs index 6759680ce94e2..0f3ee002e5577 100644 --- a/src/sql/src/session/vars/definitions.rs +++ b/src/sql/src/session/vars/definitions.rs @@ -2122,6 +2122,12 @@ feature_flags!( default: true, enable_for_item_parsing: false, }, + { + name: enable_create_continual_task, + desc: "CREATE CONTINUAL TASK", + default: false, + enable_for_item_parsing: true, + }, ); impl From<&super::SystemVars> for OptimizerFeatures { diff --git a/test/sqllogictest/ct_audit_log.slt b/test/sqllogictest/ct_audit_log.slt new file mode 100644 index 0000000000000..80f7dc1b0b330 --- /dev/null +++ b/test/sqllogictest/ct_audit_log.slt @@ -0,0 +1,53 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +mode cockroach + +simple conn=mz_system,user=mz_system +ALTER SYSTEM SET enable_create_continual_task = true +---- +COMPLETE 0 + +statement ok +CREATE TABLE mv_input (key INT) + +statement ok +INSERT INTO mv_input VALUES (1); + +statement ok +CREATE MATERIALIZED VIEW anomalies AS SELECT sum(key)::INT FROM mv_input; + +query I +SELECT * FROM anomalies +---- +1 + +statement ok +CREATE CONTINUAL TASK audit_log (count INT) ON INPUT anomalies AS ( + INSERT INTO audit_log SELECT * FROM anomalies; +) + +query I +SELECT * FROM audit_log +---- +1 + +statement ok +INSERT INTO mv_input VALUES (2), (3) + +query I +SELECT * FROM anomalies +---- +6 + +query I +SELECT * FROM audit_log +---- +1 +6 diff --git a/test/sqllogictest/ct_errors.slt b/test/sqllogictest/ct_errors.slt new file mode 100644 index 0000000000000..5a1ab3cfeda66 --- /dev/null +++ b/test/sqllogictest/ct_errors.slt @@ -0,0 +1,36 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +mode cockroach + +statement ok +CREATE TABLE foo (key INT) + +# Feature flag is off by default +statement error CREATE CONTINUAL TASK is not supported +CREATE CONTINUAL TASK nope (key INT) ON INPUT foo AS ( + INSERT INTO nope SELECT * FROM foo; +) + +simple conn=mz_system,user=mz_system +ALTER SYSTEM SET enable_create_continual_task = true +---- +COMPLETE 0 + +# INSERT columns do not match +statement error CONTINUAL TASK query columns did not match +CREATE CONTINUAL TASK nope (key STRING) ON INPUT foo AS ( + INSERT INTO nope SELECT * FROM foo; +) + +# TODO(ct): Make this error (or work!) instead of panic +# statement error something +# CREATE CONTINUAL TASK nope (key INT) ON INPUT foo AS ( +# INSERT INTO nope SELECT null::INT +# ) diff --git a/test/sqllogictest/ct_stream_table_join.slt b/test/sqllogictest/ct_stream_table_join.slt new file mode 100644 index 0000000000000..2078b4b2893f9 --- /dev/null +++ b/test/sqllogictest/ct_stream_table_join.slt @@ -0,0 +1,49 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +mode cockroach + +simple conn=mz_system,user=mz_system +ALTER SYSTEM SET enable_create_continual_task = true +---- +COMPLETE 0 + +statement ok +CREATE TABLE big (key INT) + +statement ok +CREATE TABLE small (key INT, val STRING) + +statement ok +INSERT INTO small VALUES (1, 'v0') + +statement ok +CREATE CONTINUAL TASK stj (key INT, val STRING) ON INPUT big AS ( + INSERT INTO stj SELECT b.key, s.val FROM big b JOIN small s ON b.key = s.key; +) + +statement ok +INSERT INTO big VALUES (1) + +query IT +SELECT * FROM stj +---- +1 v0 + +statement ok +UPDATE small SET val = 'v1' + +statement ok +INSERT INTO big VALUES (1) + +query IT +SELECT * FROM stj +---- +1 v0 +1 v1 diff --git a/test/sqllogictest/ct_upsert.slt b/test/sqllogictest/ct_upsert.slt new file mode 100644 index 0000000000000..a3fad1a33db8f --- /dev/null +++ b/test/sqllogictest/ct_upsert.slt @@ -0,0 +1,41 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +mode cockroach + +simple conn=mz_system,user=mz_system +ALTER SYSTEM SET enable_create_continual_task = true +---- +COMPLETE 0 + +statement ok +CREATE TABLE append_only (key INT, val INT) + +statement ok +CREATE CONTINUAL TASK upsert (key INT, val INT) ON INPUT append_only AS ( + DELETE FROM upsert WHERE key IN (SELECT key FROM append_only); + INSERT INTO upsert SELECT key, max(val) FROM append_only GROUP BY key; +) + +statement ok +INSERT INTO append_only VALUES (1, 2), (1, 1) + +query II +SELECT * FROM upsert +---- +1 2 + +statement ok +INSERT INTO append_only VALUES (1, 3), (2, 4) + +query IT +SELECT * FROM upsert +---- +1 3 +2 4