diff --git a/src/adapter/src/catalog.rs b/src/adapter/src/catalog.rs index 722eae1f079e0..5961b104e9df0 100644 --- a/src/adapter/src/catalog.rs +++ b/src/adapter/src/catalog.rs @@ -360,6 +360,16 @@ impl Catalog { } policies } + + /// TODO(ct): This exists for continual tasks because they are the first + /// self-referential thing in mz. We use this to inject something for the + /// optimizer to resolve the CT's own id to before we've saved it to the + /// catalog. The one usage of this is careful to destroy this copy of + /// catalog immediately after. There are better ways to do this, but it was + /// the easiest path to get the skeleton of the feature merged. + pub fn hack_add_ct(&mut self, id: GlobalId, entry: CatalogEntry) { + self.state.entry_by_id.insert(id, entry); + } } #[derive(Debug)] diff --git a/src/adapter/src/catalog/builtin_table_updates.rs b/src/adapter/src/catalog/builtin_table_updates.rs index 59ca6e2b5a427..659e3b11033ba 100644 --- a/src/adapter/src/catalog/builtin_table_updates.rs +++ b/src/adapter/src/catalog/builtin_table_updates.rs @@ -1245,16 +1245,19 @@ impl CatalogState { }) .into_element() .ast; - let query = match &create_stmt { - Statement::CreateMaterializedView(stmt) => &stmt.query, + let query_string = match &create_stmt { + Statement::CreateMaterializedView(stmt) => { + let mut query_string = stmt.query.to_ast_string_stable(); + // PostgreSQL appends a semicolon in `pg_matviews.definition`, we + // do the same for compatibility's sake. + query_string.push(';'); + query_string + } + // TODO(ct): Remove. + Statement::CreateContinualTask(_) => "TODO(ct)".into(), _ => unreachable!(), }; - let mut query_string = query.to_ast_string_stable(); - // PostgreSQL appends a semicolon in `pg_matviews.definition`, we - // do the same for compatibility's sake. - query_string.push(';'); - let mut updates = Vec::new(); updates.push(BuiltinTableUpdate { diff --git a/src/adapter/src/catalog/state.rs b/src/adapter/src/catalog/state.rs index ae2047c00eb2c..9e00c89024400 100644 --- a/src/adapter/src/catalog/state.rs +++ b/src/adapter/src/catalog/state.rs @@ -38,6 +38,7 @@ use mz_controller::clusters::{ UnmanagedReplicaLocation, }; use mz_controller_types::{ClusterId, ReplicaId}; +use mz_expr::OptimizedMirRelationExpr; use mz_ore::collections::CollectionExt; use mz_ore::now::NOW_ZERO; use mz_ore::soft_assert_no_log; @@ -65,9 +66,9 @@ use mz_sql::names::{ ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId, }; use mz_sql::plan::{ - CreateConnectionPlan, CreateIndexPlan, CreateMaterializedViewPlan, CreateSecretPlan, - CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, Params, - Plan, PlanContext, + CreateConnectionPlan, CreateContinualTaskPlan, CreateIndexPlan, CreateMaterializedViewPlan, + CreateSecretPlan, CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan, + CreateViewPlan, Params, Plan, PlanContext, }; use mz_sql::rbac; use mz_sql::session::metadata::SessionMetadata; @@ -946,6 +947,31 @@ impl CatalogState { initial_as_of, }) } + Plan::CreateContinualTask(CreateContinualTaskPlan { + desc, + continual_task, + .. + }) => { + // TODO(ct): Figure out how to make this survive restarts. The + // expr we saved still had the LocalId placeholders for the + // output, but we don't have access to the real Id here. + let optimized_expr = OptimizedMirRelationExpr::declare_optimized( + mz_expr::MirRelationExpr::constant(Vec::new(), desc.typ().clone()), + ); + // TODO(ct): CatalogItem::ContinualTask + CatalogItem::MaterializedView(MaterializedView { + create_sql: continual_task.create_sql, + raw_expr: Arc::new(continual_task.expr.clone()), + optimized_expr: Arc::new(optimized_expr), + desc, + resolved_ids, + cluster_id: continual_task.cluster_id, + non_null_assertions: continual_task.non_null_assertions, + custom_logical_compaction_window: continual_task.compaction_window, + refresh_schedule: continual_task.refresh_schedule, + initial_as_of: continual_task.as_of.map(Antichain::from_elem), + }) + } Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index { create_sql: index.create_sql, on: index.on, diff --git a/src/adapter/src/command.rs b/src/adapter/src/command.rs index 0f5e487824561..42f13fadfb92f 100644 --- a/src/adapter/src/command.rs +++ b/src/adapter/src/command.rs @@ -323,6 +323,8 @@ pub enum ExecuteResponse { CreatedViews, /// The requested materialized view was created. CreatedMaterializedView, + /// The requested continual task was created. + CreatedContinualTask, /// The requested type was created. CreatedType, /// The requested prepared statement was removed. @@ -490,6 +492,7 @@ impl TryInto for ExecuteResponseKind { ExecuteResponseKind::CreatedMaterializedView => { Ok(ExecuteResponse::CreatedMaterializedView) } + ExecuteResponseKind::CreatedContinualTask => Ok(ExecuteResponse::CreatedContinualTask), ExecuteResponseKind::CreatedType => Ok(ExecuteResponse::CreatedType), ExecuteResponseKind::Deallocate => Err(()), ExecuteResponseKind::DeclaredCursor => Ok(ExecuteResponse::DeclaredCursor), @@ -551,6 +554,7 @@ impl ExecuteResponse { CreatedView { .. } => Some("CREATE VIEW".into()), CreatedViews { .. } => Some("CREATE VIEWS".into()), CreatedMaterializedView { .. } => Some("CREATE MATERIALIZED VIEW".into()), + CreatedContinualTask { .. } => Some("CREATE CONTINUAL TASK".into()), CreatedType => Some("CREATE TYPE".into()), Deallocate { all } => Some(format!("DEALLOCATE{}", if *all { " ALL" } else { "" })), DeclaredCursor => Some("DECLARE CURSOR".into()), @@ -639,6 +643,7 @@ impl ExecuteResponse { CreateTable => &[CreatedTable], CreateView => &[CreatedView], CreateMaterializedView => &[CreatedMaterializedView], + CreateContinualTask => &[CreatedContinualTask], CreateIndex => &[CreatedIndex], CreateType => &[CreatedType], PlanKind::Deallocate => &[ExecuteResponseKind::Deallocate], diff --git a/src/adapter/src/coord/catalog_serving.rs b/src/adapter/src/coord/catalog_serving.rs index fddcbe3270261..167c105ffc077 100644 --- a/src/adapter/src/coord/catalog_serving.rs +++ b/src/adapter/src/coord/catalog_serving.rs @@ -74,6 +74,7 @@ pub fn auto_run_on_catalog_server<'a, 's, 'p>( | Plan::CreateRole(_) | Plan::CreateCluster(_) | Plan::CreateClusterReplica(_) + | Plan::CreateContinualTask(_) | Plan::CreateSource(_) | Plan::CreateSources(_) | Plan::CreateSecret(_) diff --git a/src/adapter/src/coord/command_handler.rs b/src/adapter/src/coord/command_handler.rs index b5adeb9cd1d13..095b9c897fcac 100644 --- a/src/adapter/src/coord/command_handler.rs +++ b/src/adapter/src/coord/command_handler.rs @@ -671,6 +671,7 @@ impl Coordinator { | Statement::CreateDatabase(_) | Statement::CreateIndex(_) | Statement::CreateMaterializedView(_) + | Statement::CreateContinualTask(_) | Statement::CreateRole(_) | Statement::CreateSchema(_) | Statement::CreateSecret(_) diff --git a/src/adapter/src/coord/sequencer.rs b/src/adapter/src/coord/sequencer.rs index 4480891ce196e..361b7a6e94d0d 100644 --- a/src/adapter/src/coord/sequencer.rs +++ b/src/adapter/src/coord/sequencer.rs @@ -213,6 +213,12 @@ impl Coordinator { self.sequence_create_materialized_view(ctx, plan, resolved_ids) .await; } + Plan::CreateContinualTask(plan) => { + let res = self + .sequence_create_continual_task(ctx.session(), plan, resolved_ids) + .await; + ctx.retire(res); + } Plan::CreateIndex(plan) => { self.sequence_create_index(ctx, plan, resolved_ids).await; } diff --git a/src/adapter/src/coord/sequencer/inner.rs b/src/adapter/src/coord/sequencer/inner.rs index c51498ea86c16..e957bd362a09c 100644 --- a/src/adapter/src/coord/sequencer/inner.rs +++ b/src/adapter/src/coord/sequencer/inner.rs @@ -112,6 +112,7 @@ use crate::util::{viewable_variables, ClientTransmitter, ResultExt}; use crate::{guard_write_critical_section, PeekResponseUnary, ReadHolds}; mod cluster; +mod create_continual_task; mod create_index; mod create_materialized_view; mod create_view; diff --git a/src/adapter/src/coord/sequencer/inner/create_continual_task.rs b/src/adapter/src/coord/sequencer/inner/create_continual_task.rs new file mode 100644 index 0000000000000..edf249274d4da --- /dev/null +++ b/src/adapter/src/coord/sequencer/inner/create_continual_task.rs @@ -0,0 +1,208 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::sync::Arc; + +use mz_catalog::memory::objects::{ + CatalogEntry, CatalogItem, MaterializedView, Table, TableDataSource, +}; +use mz_compute_types::sinks::{ + ComputeSinkConnection, ContinualTaskConnection, PersistSinkConnection, +}; +use mz_expr::visit::Visit; +use mz_expr::{Id, LocalId}; +use mz_ore::instrument; +use mz_repr::adt::mz_acl_item::PrivilegeMap; +use mz_repr::optimize::OverrideFrom; +use mz_sql::names::ResolvedIds; +use mz_sql::plan::{self, HirRelationExpr}; +use mz_sql::session::metadata::SessionMetadata; +use mz_storage_client::controller::{CollectionDescription, DataSource, DataSourceOther}; + +use crate::catalog; +use crate::command::ExecuteResponse; +use crate::coord::Coordinator; +use crate::error::AdapterError; +use crate::optimize::dataflows::dataflow_import_id_bundle; +use crate::optimize::{self, Optimize}; +use crate::session::Session; +use crate::util::ResultExt; + +// TODO(ct): Big oof. Dedup a bunch of this with MVs. +impl Coordinator { + #[instrument] + pub(crate) async fn sequence_create_continual_task( + &mut self, + session: &Session, + plan: plan::CreateContinualTaskPlan, + resolved_ids: ResolvedIds, + ) -> Result { + let plan::CreateContinualTaskPlan { + name, + desc, + input_id, + continual_task: + plan::MaterializedView { + create_sql, + cluster_id, + mut expr, + column_names, + non_null_assertions, + compaction_window: _, + refresh_schedule, + as_of: _, + }, + } = plan; + + // Collect optimizer parameters. + let compute_instance = self + .instance_snapshot(cluster_id) + .expect("compute instance does not exist"); + + let debug_name = self.catalog().resolve_full_name(&name, None).to_string(); + let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config()) + .override_from(&self.catalog.get_cluster(cluster_id).config.features()); + + let view_id = self.allocate_transient_id(); + let catalog_mut = self.catalog_mut(); + let sink_id = catalog_mut.allocate_user_id().await?; + + // Put a placeholder in the catalog so the optimizer can find something + // for the sink_id. + let fake_entry = CatalogEntry { + item: CatalogItem::Table(Table { + create_sql: Some(create_sql.clone()), + desc: desc.clone(), + conn_id: None, + resolved_ids: resolved_ids.clone(), + custom_logical_compaction_window: None, + is_retained_metrics_object: false, + data_source: TableDataSource::TableWrites { + defaults: Vec::new(), + }, + }), + referenced_by: Vec::new(), + used_by: Vec::new(), + id: sink_id, + oid: 0, + name: name.clone(), + owner_id: *session.current_role_id(), + privileges: PrivilegeMap::new(), + }; + catalog_mut.hack_add_ct(sink_id, fake_entry); + + // Build an optimizer for this CONTINUAL TASK. + let mut optimizer = optimize::materialized_view::Optimizer::new( + Arc::new(catalog_mut.clone()), + compute_instance, + sink_id, + view_id, + column_names, + non_null_assertions, + refresh_schedule.clone(), + debug_name, + optimizer_config, + self.optimizer_metrics(), + ); + + // Replace our placeholder fake ctes with the real output id, now that + // we have it. + expr.visit_mut_post(&mut |expr| match expr { + HirRelationExpr::Get { id, .. } if *id == Id::Local(LocalId::new(0)) => { + *id = Id::Global(sink_id); + } + _ => {} + })?; + + // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local and global) + let raw_expr = expr.clone(); + let local_mir_plan = optimizer.catch_unwind_optimize(expr)?; + let global_mir_plan = optimizer.catch_unwind_optimize(local_mir_plan.clone())?; + // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global) + let global_lir_plan = optimizer.catch_unwind_optimize(global_mir_plan.clone())?; + let (mut df_desc, _df_meta) = global_lir_plan.unapply(); + + // Timestamp selection + let mut id_bundle = dataflow_import_id_bundle(&df_desc, cluster_id.clone()); + // Can't acquire a read hold on ourselves because we don't exist yet. + // + // It is not necessary to take a read hold on the CT output in the + // coordinator, since the current scheme takes read holds in the + // coordinator only to ensure inputs don't get compacted until the + // compute controller has installed its own read holds, which happens + // below with the `ship_dataflow` call. + id_bundle.storage_ids.remove(&sink_id); + let read_holds = self.acquire_read_holds(&id_bundle); + let as_of = read_holds.least_valid_read(); + df_desc.set_as_of(as_of.clone()); + + // TODO(ct): HACKs + for sink in df_desc.sink_exports.values_mut() { + match &mut sink.connection { + ComputeSinkConnection::Persist(PersistSinkConnection { + storage_metadata, .. + }) => { + sink.connection = + ComputeSinkConnection::ContinualTask(ContinualTaskConnection { + input_id, + storage_metadata: *storage_metadata, + }) + } + _ => unreachable!("MV should produce persist sink connection"), + } + } + + let ops = vec![catalog::Op::CreateItem { + id: sink_id, + name: name.clone(), + item: CatalogItem::MaterializedView(MaterializedView { + // TODO(ct): This doesn't give the `DELETE FROM` / `INSERT INTO` + // names the `[u1 AS "materialize"."public"."append_only"]` + // style expansion. Bug? + create_sql, + raw_expr: Arc::new(raw_expr), + optimized_expr: Arc::new(local_mir_plan.expr()), + desc: desc.clone(), + resolved_ids, + cluster_id, + non_null_assertions: Vec::new(), + custom_logical_compaction_window: None, + refresh_schedule, + initial_as_of: Some(as_of.clone()), + }), + owner_id: *session.current_role_id(), + }]; + + let () = self + .catalog_transact_with_side_effects(Some(session), ops, |coord| async { + coord + .controller + .storage + .create_collections( + coord.catalog.state().storage_metadata(), + None, + vec![( + sink_id, + CollectionDescription { + desc, + data_source: DataSource::Other(DataSourceOther::Compute), + since: Some(as_of), + status_collection_id: None, + }, + )], + ) + .await + .unwrap_or_terminate("cannot fail to append"); + + coord.ship_dataflow(df_desc, cluster_id.clone(), None).await; + }) + .await?; + Ok(ExecuteResponse::CreatedContinualTask) + } +} diff --git a/src/adapter/src/statement_logging.rs b/src/adapter/src/statement_logging.rs index 77b93efb82db7..cac4c3450a043 100644 --- a/src/adapter/src/statement_logging.rs +++ b/src/adapter/src/statement_logging.rs @@ -207,6 +207,7 @@ impl From<&ExecuteResponse> for StatementEndedExecutionReason { | ExecuteResponse::CreatedView | ExecuteResponse::CreatedViews | ExecuteResponse::CreatedMaterializedView + | ExecuteResponse::CreatedContinualTask | ExecuteResponse::CreatedType | ExecuteResponse::Deallocate { .. } | ExecuteResponse::DeclaredCursor diff --git a/src/catalog/src/durable/objects.rs b/src/catalog/src/durable/objects.rs index 3e4723b5e1443..92bc20ba8bb33 100644 --- a/src/catalog/src/durable/objects.rs +++ b/src/catalog/src/durable/objects.rs @@ -1150,6 +1150,11 @@ impl ItemValue { assert_eq!(tokens.next(), Some("VIEW")); CatalogItemType::MaterializedView } + Some("CONTINUAL") => { + assert_eq!(tokens.next(), Some("TASK")); + // TODO(ct): CatalogItemType::ContinualTask + CatalogItemType::MaterializedView + } Some("INDEX") => CatalogItemType::Index, Some("TYPE") => CatalogItemType::Type, Some("FUNCTION") => CatalogItemType::Func, diff --git a/src/compute-client/Cargo.toml b/src/compute-client/Cargo.toml index f9b208ae0ce2a..65c78bbd0a1b1 100644 --- a/src/compute-client/Cargo.toml +++ b/src/compute-client/Cargo.toml @@ -28,7 +28,7 @@ mz-dyncfg = { path = "../dyncfg" } mz-dyncfgs = { path = "../dyncfgs" } mz-expr = { path = "../expr" } mz-orchestrator = { path = "../orchestrator" } -mz-ore = { path = "../ore", features = ["tracing_"] } +mz-ore = { path = "../ore", features = ["tracing_","chrono"] } mz-persist = { path = "../persist" } mz-persist-client = { path = "../persist-client" } mz-persist-types = { path = "../persist-types" } diff --git a/src/compute-client/src/controller/instance.rs b/src/compute-client/src/controller/instance.rs index a435b0b1d280d..fb84afa3be0f3 100644 --- a/src/compute-client/src/controller/instance.rs +++ b/src/compute-client/src/controller/instance.rs @@ -22,7 +22,9 @@ use mz_cluster_client::client::{ClusterStartupEpoch, TimelyConfig}; use mz_compute_types::dataflows::{BuildDesc, DataflowDescription}; use mz_compute_types::plan::flat_plan::FlatPlan; use mz_compute_types::plan::LirId; -use mz_compute_types::sinks::{ComputeSinkConnection, ComputeSinkDesc, PersistSinkConnection}; +use mz_compute_types::sinks::{ + ComputeSinkConnection, ComputeSinkDesc, ContinualTaskConnection, PersistSinkConnection, +}; use mz_compute_types::sources::SourceInstanceDesc; use mz_controller_types::dyncfgs::WALLCLOCK_LAG_REFRESH_INTERVAL; use mz_dyncfg::ConfigSet; @@ -1289,6 +1291,18 @@ where }; ComputeSinkConnection::Persist(conn) } + ComputeSinkConnection::ContinualTask(conn) => { + let metadata = self + .storage_collections + .collection_metadata(id) + .map_err(|_| DataflowCreationError::CollectionMissing(id))? + .clone(); + let conn = ContinualTaskConnection { + input_id: conn.input_id, + storage_metadata: metadata, + }; + ComputeSinkConnection::ContinualTask(conn) + } ComputeSinkConnection::Subscribe(conn) => ComputeSinkConnection::Subscribe(conn), ComputeSinkConnection::CopyToS3Oneshot(conn) => { ComputeSinkConnection::CopyToS3Oneshot(conn) diff --git a/src/compute-types/src/sinks.proto b/src/compute-types/src/sinks.proto index 200a0738133be..34cc9a2e357f3 100644 --- a/src/compute-types/src/sinks.proto +++ b/src/compute-types/src/sinks.proto @@ -38,6 +38,7 @@ message ProtoComputeSinkConnection { google.protobuf.Empty subscribe = 1; ProtoPersistSinkConnection persist = 2; ProtoCopyToS3OneshotSinkConnection copy_to_s3_oneshot = 3; + ProtoContinualTaskConnection continual_task = 4; } } @@ -46,6 +47,11 @@ message ProtoPersistSinkConnection { mz_storage_types.controller.ProtoCollectionMetadata storage_metadata = 2; } +message ProtoContinualTaskConnection { + mz_repr.global_id.ProtoGlobalId input_id = 1; + mz_storage_types.controller.ProtoCollectionMetadata storage_metadata = 2; +} + message ProtoCopyToS3OneshotSinkConnection { mz_storage_types.sinks.ProtoS3UploadInfo upload_info = 1; mz_storage_types.connections.aws.ProtoAwsConnection aws_connection = 2; diff --git a/src/compute-types/src/sinks.rs b/src/compute-types/src/sinks.rs index bccab841efba7..f713098ba37e6 100644 --- a/src/compute-types/src/sinks.rs +++ b/src/compute-types/src/sinks.rs @@ -119,6 +119,12 @@ pub enum ComputeSinkConnection { Subscribe(SubscribeSinkConnection), /// TODO(#25239): Add documentation. Persist(PersistSinkConnection), + /// TODO(#25239): Add documentation. + /// + /// TODO(ct): This also writes to persist, but with different behavior + /// (conflict resolution, only at input times, etc). It might be time to + /// rename PersistSink to something that reflects the differences. + ContinualTask(ContinualTaskConnection), /// A compute sink to do a oneshot copy to s3. CopyToS3Oneshot(CopyToS3OneshotSinkConnection), } @@ -129,6 +135,7 @@ impl ComputeSinkConnection { match self { ComputeSinkConnection::Subscribe(_) => "subscribe", ComputeSinkConnection::Persist(_) => "persist", + ComputeSinkConnection::ContinualTask(_) => "continual_task", ComputeSinkConnection::CopyToS3Oneshot(_) => "copy_to_s3_oneshot", } } @@ -150,6 +157,9 @@ impl RustType for ComputeSinkConnection Kind::Subscribe(()), ComputeSinkConnection::Persist(persist) => Kind::Persist(persist.into_proto()), + ComputeSinkConnection::ContinualTask(continual_task) => { + Kind::ContinualTask(continual_task.into_proto()) + } ComputeSinkConnection::CopyToS3Oneshot(s3) => { Kind::CopyToS3Oneshot(s3.into_proto()) } @@ -165,6 +175,9 @@ impl RustType for ComputeSinkConnection ComputeSinkConnection::Subscribe(SubscribeSinkConnection {}), Kind::Persist(persist) => ComputeSinkConnection::Persist(persist.into_rust()?), + Kind::ContinualTask(continual_task) => { + ComputeSinkConnection::ContinualTask(continual_task.into_rust()?) + } Kind::CopyToS3Oneshot(s3) => ComputeSinkConnection::CopyToS3Oneshot(s3.into_rust()?), }) } @@ -243,3 +256,35 @@ impl RustType for PersistSinkConnection { + /// TODO(#25239): Add documentation. + // + // TODO(ct): This can be removed once we render the "input" sources without + // the hack. + pub input_id: GlobalId, + /// TODO(ct): Add documentation. + pub storage_metadata: S, +} + +impl RustType for ContinualTaskConnection { + fn into_proto(&self) -> ProtoContinualTaskConnection { + ProtoContinualTaskConnection { + input_id: Some(self.input_id.into_proto()), + storage_metadata: Some(self.storage_metadata.into_proto()), + } + } + + fn from_proto(proto: ProtoContinualTaskConnection) -> Result { + Ok(ContinualTaskConnection { + input_id: proto + .input_id + .into_rust_if_some("ProtoContinualTaskConnection::input_id")?, + storage_metadata: proto + .storage_metadata + .into_rust_if_some("ProtoContinualTaskConnection::output_metadata")?, + }) + } +} diff --git a/src/compute/src/render.rs b/src/compute/src/render.rs index 7a0e0db48cb64..25f760e8755c8 100644 --- a/src/compute/src/render.rs +++ b/src/compute/src/render.rs @@ -150,9 +150,11 @@ use crate::logging::compute::LogDataflowErrors; use crate::render::context::{ ArrangementFlavor, Context, MzArrangement, MzArrangementImport, ShutdownToken, }; +use crate::render::continual_task::ContinualTaskCtx; use crate::typedefs::{ErrSpine, KeyBatcher}; pub mod context; +pub(crate) mod continual_task; mod errors; mod flat_map; mod join; @@ -202,6 +204,10 @@ pub fn build_compute_dataflow( let build_name = format!("BuildRegion: {}", &dataflow.debug_name); timely_worker.dataflow_core(&name, worker_logging, Box::new(()), |_, scope| { + // TODO(ct): This should be a config of the source instead, but at least try + // to contain the hacks. + let mut ct_ctx = ContinualTaskCtx::new(&dataflow); + // The scope.clone() occurs to allow import in the region. // We build a region here to establish a pattern of a scope inside the dataflow, // so that other similar uses (e.g. with iterative scopes) do not require weird @@ -217,9 +223,16 @@ pub fn build_compute_dataflow( .expect("Linear operators should always be valid") }); + let ct_inserts_transformer = ct_ctx.get_ct_inserts_transformer(*source_id); + let snapshot_mode = if ct_inserts_transformer.is_none() { + SnapshotMode::Include + } else { + SnapshotMode::Exclude + }; + // Note: For correctness, we require that sources only emit times advanced by // `dataflow.as_of`. `persist_source` is documented to provide this guarantee. - let (mut ok_stream, err_stream, token) = persist_source::persist_source( + let (ok_stream, err_stream, token) = persist_source::persist_source( inner, *source_id, Arc::clone(&compute_state.persist_clients), @@ -227,7 +240,7 @@ pub fn build_compute_dataflow( &compute_state.worker_config, source.storage_metadata.clone(), dataflow.as_of.clone(), - SnapshotMode::Include, + snapshot_mode, dataflow.until.clone(), mfp.as_mut(), compute_state.dataflow_max_inflight_bytes(), @@ -235,6 +248,20 @@ pub fn build_compute_dataflow( |error| panic!("compute_import: {error}"), ); + let (mut ok_stream, err_stream) = match ct_inserts_transformer { + None => (ok_stream, err_stream), + Some(inserts_transformer_fn) => { + let (oks, errs, ct_times) = inserts_transformer_fn( + ok_stream.as_collection(), + err_stream.as_collection(), + ); + // TODO(ct): Ideally this would be encapsulated by ContinualTaskCtx, but + // the types are tricky. + ct_ctx.ct_times.push(ct_times.leave_region().leave_region()); + (oks.inner, errs.inner) + } + }; + // If `mfp` is non-identity, we need to apply what remains. // For the moment, assert that it is either trivial or `None`. assert!(mfp.map(|x| x.is_identity()).unwrap_or(true)); @@ -333,6 +360,7 @@ pub fn build_compute_dataflow( sink_id, &sink, start_signal.clone(), + ct_ctx.input_times(), ); } }); @@ -396,6 +424,7 @@ pub fn build_compute_dataflow( sink_id, &sink, start_signal.clone(), + ct_ctx.input_times(), ); } }); diff --git a/src/compute/src/render/continual_task.rs b/src/compute/src/render/continual_task.rs new file mode 100644 index 0000000000000..253f7ea0d1561 --- /dev/null +++ b/src/compute/src/render/continual_task.rs @@ -0,0 +1,839 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! A continual task presents as something like a `BEFORE TRIGGER`: it watches +//! some _input_ and whenever it changes at time `T`, executes a SQL txn, +//! writing to some _output_ at the same time `T`. It can also read anything in +//! materialize as a _reference_, most notably including the output. +//! +//! Only reacting to new inputs (and not the full history) makes a CT's +//! rehydration time independent of the size of the inputs (NB this is not true +//! for references), enabling things like writing UPSERT on top of an +//! append-only shard in SQL (ignore the obvious bug with my upsert impl): +//! +//! ```sql +//! CREATE CONTINUAL TASK upsert (key INT, val INT) ON INPUT append_only AS ( +//! DELETE FROM upsert WHERE key IN (SELECT key FROM append_only); +//! INSERT INTO upsert SELECT key, max(val) FROM append_only GROUP BY key; +//! ) +//! ``` +//! +//! Unlike a materialized view, the continual task does not update outputs if +//! references later change. This enables things like auditing: +//! +//! ```sql +//! CREATE CONTINUAL TASK audit_log (count INT8) ON INPUT anomalies AS ( +//! INSERT INTO audit_log SELECT * FROM anomalies; +//! ) +//! ``` +//! +//! Rough implementation overview: +//! - A CT is created and starts at some `start_ts` optionally later dropped and +//! stopped at some `end_ts`. +//! - A CT takes one or more _input_s. These must be persist shards (i.e. TABLE, +//! SOURCE, MV, but not VIEW). +//! - A CT has one or more _output_s. The outputs are (initially) owned by the +//! task and cannot be written to by other parts of the system. +//! - The task is run for each time one of the inputs changes starting at +//! `start_ts`. +//! - It is given the changes in its inputs at time `T` as diffs. +//! - These are presented as two SQL relations with just the inserts/deletes. +//! - NB: A full collection for the input can always be recovered by also +//! using the input as a "reference" (see below) and applying the diffs. +//! - The task logic is expressed as a SQL transaction that does all reads at +//! time `T-1` and commits all writes at `T` +//! - Intuition is that the logic runs before the input is written, like a +//! `CREATE TRIGGER ... BEFORE`. +//! - This logic can _reference_ any nameable object in the system, not just the +//! inputs. +//! - However, the logic/transaction can mutate only the outputs. +//! - Summary of differences between inputs and references: +//! - The task receives snapshot + changes for references (like regular +//! dataflow inputs today) but only changes for inputs. +//! - The task only produces output in response to changes in the inputs but +//! not in response to changes in the references. +//! - Inputs are reclocked by subtracting 1 from their timestamps, references +//! are not. +//! - Instead of re-evaluating the task logic from scratch for each input time, +//! we maintain the collection representing desired writes to the output(s) as +//! a dataflow. +//! - The task dataflow is tied to a `CLUSTER` and runs on each `REPLICA`. +//! - HA strategy: multi-replica clusters race to commit and the losers throw +//! away the result. + +use std::any::Any; +use std::cell::RefCell; +use std::collections::BTreeSet; +use std::rc::Rc; +use std::sync::Arc; + +use differential_dataflow::consolidation::consolidate_updates; +use differential_dataflow::difference::Semigroup; +use differential_dataflow::lattice::Lattice; +use differential_dataflow::{AsCollection, Collection, Hashable}; +use futures::{Future, FutureExt, StreamExt}; +use mz_compute_types::dataflows::DataflowDescription; +use mz_compute_types::sinks::{ComputeSinkConnection, ComputeSinkDesc, ContinualTaskConnection}; +use mz_ore::cast::CastFrom; +use mz_ore::collections::HashMap; +use mz_persist_client::error::UpperMismatch; +use mz_persist_client::write::WriteHandle; +use mz_persist_client::Diagnostics; +use mz_persist_types::codec_impls::UnitSchema; +use mz_repr::{Diff, GlobalId, Row, Timestamp}; +use mz_storage_types::controller::CollectionMetadata; +use mz_storage_types::errors::DataflowError; +use mz_storage_types::sources::SourceData; +use mz_timely_util::builder_async::{Button, Event, OperatorBuilder as AsyncOperatorBuilder}; +use timely::dataflow::channels::pact::{Exchange, Pipeline}; +use timely::dataflow::operators::{Filter, FrontierNotificator, Map, Operator}; +use timely::dataflow::{ProbeHandle, Scope}; +use timely::progress::frontier::AntichainRef; +use timely::progress::{Antichain, Timestamp as _}; +use timely::{Data, PartialOrder}; +use tracing::debug; + +use crate::compute_state::ComputeState; +use crate::render::sinks::SinkRender; +use crate::render::StartSignal; + +pub(crate) struct ContinualTaskCtx> { + name: Option, + dataflow_as_of: Option>, + ct_inputs: BTreeSet, + pub ct_times: Vec>, +} + +impl> ContinualTaskCtx { + pub fn new(dataflow: &DataflowDescription) -> Self { + let mut name = None; + let mut ct_inputs = BTreeSet::new(); + for (sink_id, sink) in &dataflow.sink_exports { + match &sink.connection { + ComputeSinkConnection::ContinualTask(ContinualTaskConnection { + input_id, .. + }) => { + ct_inputs.insert(*input_id); + // There's only one CT sink per dataflow at this point. + assert_eq!(name, None); + name = Some(sink_id.to_string()); + } + _ => continue, + } + } + let mut ret = ContinualTaskCtx { + name, + dataflow_as_of: None, + ct_inputs, + ct_times: Vec::new(), + }; + // Only clone the as_of if we're in a CT dataflow. + if ret.is_ct_dataflow() { + ret.dataflow_as_of = dataflow.as_of.clone(); + // Sanity check that we have a name if we're in a CT dataflow. + assert!(ret.name.is_some()); + } + ret + } + + pub fn is_ct_dataflow(&self) -> bool { + !self.ct_inputs.is_empty() + } + + pub fn get_ct_inserts_transformer>( + &self, + source_id: GlobalId, + ) -> Option< + impl FnOnce( + Collection, + Collection, + ) -> ( + Collection, + Collection, + Collection, + ), + > { + if !self.ct_inputs.contains(&source_id) { + return None; + } + + // Make a collection s.t, for each time T in the input, the output + // contains the inserts at T. + let inserts_source_fn = + move |oks: Collection, errs: Collection| { + let name = source_id.to_string(); + // Keep only the inserts. + // + // TODO(ct): At some point this will become a user option to instead + // keep only deletes. + let oks = oks.inner.filter(|(_, _, diff)| *diff > 0); + // Grab the original times for use in the sink operator. + let times = oks.map(|(_row, ts, diff)| ((), ts, diff)); + // SUBTLE: See the big module rustdoc for what's going on here. + let oks = step_backward(&name, oks.as_collection()); + let errs = step_backward(&name, errs); + // Then retract everything at the next timestamp. + let oks = oks.inner.flat_map(|(row, ts, diff)| { + let mut negation = diff.clone(); + differential_dataflow::Diff::negate(&mut negation); + [(row.clone(), ts.step_forward(), negation), (row, ts, diff)] + }); + (oks.as_collection(), errs, times.as_collection()) + }; + Some(inserts_source_fn) + } + + pub fn input_times(&self) -> Option> { + let (Some(name), Some(first)) = (self.name.as_ref(), self.ct_times.first()) else { + return None; + }; + let ct_times = differential_dataflow::collection::concatenate( + &mut first.scope(), + self.ct_times.iter().cloned(), + ); + // Reduce this down to one update per-time-per-worker before exchanging + // it, so we don't waste work on unnecessarily high data volumes. + let ct_times = times_reduce(name, ct_times); + Some(ct_times) + } +} + +impl SinkRender for ContinualTaskConnection +where + G: Scope, +{ + fn render_sink( + &self, + compute_state: &mut ComputeState, + _sink: &ComputeSinkDesc, + sink_id: GlobalId, + as_of: Antichain, + start_signal: StartSignal, + oks: Collection, + errs: Collection, + append_times: Option>, + ) -> Option> { + let name = sink_id.to_string(); + + // SUBTLE: See the big module rustdoc for what's going on here. + let oks = step_forward(&name, oks); + let errs = step_forward(&name, errs); + + let to_append = oks + .map(|x| SourceData(Ok(x))) + .concat(&errs.map(|x| SourceData(Err(x)))); + let append_times = append_times.expect("should be provided by ContinualTaskCtx"); + + let write_handle = { + let clients = Arc::clone(&compute_state.persist_clients); + let metadata = self.storage_metadata.clone(); + let handle_purpose = format!("ct_sink({})", name); + async move { + let client = clients + .open(metadata.persist_location) + .await + .expect("valid location"); + client + .open_writer( + metadata.data_shard, + metadata.relation_desc.into(), + UnitSchema.into(), + Diagnostics { + shard_name: sink_id.to_string(), + handle_purpose, + }, + ) + .await + .expect("codecs should match") + } + }; + + let collection = compute_state.expect_collection_mut(sink_id); + let mut probe = ProbeHandle::default(); + let to_append = to_append.probe_with(&mut probe); + collection.compute_probe = Some(probe); + let sink_write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum()))); + collection.sink_write_frontier = Some(Rc::clone(&sink_write_frontier)); + + // TODO(ct): Obey `compute_state.read_only_rx` + // + // Seemingly, the read-only env needs to tail the output shard and keep + // historical updates around until it sees that the output frontier + // advances beyond their times. + let sink_button = continual_task_sink( + &name, + to_append, + append_times, + as_of, + write_handle, + start_signal, + sink_write_frontier, + ); + Some(Rc::new(sink_button.press_on_drop())) + } +} + +fn continual_task_sink>( + name: &str, + to_append: Collection, + append_times: Collection, + as_of: Antichain, + write_handle: impl Future> + Send + 'static, + start_signal: StartSignal, + output_frontier: Rc>>, +) -> Button { + let scope = to_append.scope(); + let mut op = AsyncOperatorBuilder::new(format!("ct_sink({})", name), scope.clone()); + + // TODO(ct): This all works perfectly well data parallel (assuming we + // broadcast the append_times). We just need to hook it up to the + // multi-worker persist-sink, but that requires some refactoring. This would + // also remove the need for this to be an async timely operator. + let active_worker = name.hashed(); + let to_append_input = + op.new_input_for_many(&to_append.inner, Exchange::new(move |_| active_worker), []); + let append_times_input = op.new_input_for_many( + &append_times.inner, + Exchange::new(move |_| active_worker), + [], + ); + + let active_worker = usize::cast_from(active_worker) % scope.peers() == scope.index(); + let button = op.build(move |_capabilities| async move { + if !active_worker { + output_frontier.borrow_mut().clear(); + return; + } + + // SUBTLE: The start_signal below may not be unblocked by the compute + // controller until it thinks the inputs are "ready" (i.e. readable at + // the as_of), but if the CT is self-referential, one of the inputs will + // be the output (which starts at `T::minimum()`, not the as_of). To + // break this cycle, before we even get the start signal, go ahead and + // advance the output's (exclusive) upper to the first time that this CT + // might write: `as_of+1`. Because we don't want this to happen on + // restarts, only do it if the upper is `T::minimum()`. + let mut write_handle = write_handle.await; + { + let new_upper = as_of.into_iter().map(|x| x.step_forward()).collect(); + let res = write_handle + .compare_and_append_batch( + &mut [], + Antichain::from_elem(Timestamp::minimum()), + new_upper, + ) + .await + .expect("usage was valid"); + match res { + // We advanced the upper. + Ok(()) => {} + // Someone else advanced the upper. + Err(UpperMismatch { .. }) => {} + } + } + + let () = start_signal.await; + + #[derive(Debug)] + enum OpEvent { + ToAppend(Event>), + AppendTimes(Event>), + } + + impl OpEvent { + fn apply(self, state: &mut SinkState) { + debug!("ct_sink event {:?}", self); + match self { + OpEvent::ToAppend(Event::Data(_cap, x)) => state + .to_append + .extend(x.into_iter().map(|(k, t, d)| ((k, ()), t, d))), + OpEvent::ToAppend(Event::Progress(x)) => state.to_append_progress = x, + OpEvent::AppendTimes(Event::Data(_cap, x)) => state + .append_times + .extend(x.into_iter().map(|((), t, _d)| t)), + OpEvent::AppendTimes(Event::Progress(x)) => state.append_times_progress = x, + } + } + } + + let to_insert_input = to_append_input.map(OpEvent::ToAppend); + let append_times_input = append_times_input.map(OpEvent::AppendTimes); + let mut op_inputs = futures::stream::select(to_insert_input, append_times_input); + + let mut state = SinkState::new(); + loop { + if PartialOrder::less_than(&*output_frontier.borrow(), &state.output_progress) { + output_frontier.borrow_mut().clear(); + output_frontier + .borrow_mut() + .extend(state.output_progress.iter().cloned()); + } + let Some(event) = op_inputs.next().await else { + // Inputs exhausted, shutting down. + output_frontier.borrow_mut().clear(); + return; + }; + event.apply(&mut state); + // Also drain any other events that may be ready. + while let Some(Some(event)) = op_inputs.next().now_or_never() { + event.apply(&mut state); + } + debug!("ct_sink about to process {:?}", state); + let Some((new_upper, to_append)) = state.process() else { + continue; + }; + debug!("ct_sink got write {:?}: {:?}", new_upper, to_append); + + let mut expected_upper = write_handle.shared_upper(); + while PartialOrder::less_than(&expected_upper, &new_upper) { + let res = write_handle + .compare_and_append(&to_append, expected_upper.clone(), new_upper.clone()) + .await + .expect("usage was valid"); + match res { + Ok(()) => { + state.output_progress = new_upper; + break; + } + Err(err) => { + expected_upper = err.current; + continue; + } + } + } + } + }); + + button +} + +#[derive(Debug)] +struct SinkState { + /// The known times at which we're going to write data to the output. This + /// is guaranteed to include all times < append_times_progress, except that + /// ones < output_progress may have been truncated. + append_times: BTreeSet, + append_times_progress: Antichain, + + /// The data we've collected to append to the output. This is often + /// compacted to advancing times and is expected to be ~empty in the steady + /// state. + to_append: Vec<((K, V), T, D)>, + to_append_progress: Antichain, + + /// A lower bound on the upper of the output. + output_progress: Antichain, +} + +impl SinkState { + fn new() -> Self { + SinkState { + append_times: BTreeSet::new(), + append_times_progress: Antichain::from_elem(Timestamp::minimum()), + to_append: Vec::new(), + to_append_progress: Antichain::from_elem(Timestamp::minimum()), + output_progress: Antichain::from_elem(Timestamp::minimum()), + } + } + + /// Returns data to write to the output, if any, and the new upper to use. + /// + /// TODO(ct): Remove the Vec allocation here. + fn process(&mut self) -> Option<(Antichain, Vec<((&K, &V), &Timestamp, &D)>)> { + // We can only append at times >= the output_progress, so pop off + // anything unnecessary. + while let Some(x) = self.append_times.first() { + if self.output_progress.less_equal(x) { + break; + } + self.append_times.pop_first(); + } + + // Find the smallest append_time before append_time_progress. This is + // the next time we might need to write data at. Note that we can only + // act on append_times once the progress has passed them, because they + // could come out of order. + let write_ts = match self.append_times.first() { + Some(x) if !self.append_times_progress.less_equal(x) => x, + Some(_) | None => { + // The CT sink's contract is that it only writes data at times + // we received an input diff. There are none in + // `[output_progress, append_times_progress)`, so we can go + // ahead and advance the upper of the output, if it's not + // already. + // + // We could instead ensure liveness by basing this off of + // to_append, but for any CTs reading the output (expected to be + // a common case) we'd end up looping each timestamp through + // persist one-by-one. + if PartialOrder::less_than(&self.output_progress, &self.append_times_progress) { + return Some((self.append_times_progress.clone(), Vec::new())); + } + // Otherwise, nothing to do! + return None; + } + }; + + if self.to_append_progress.less_equal(write_ts) { + // Don't have all the necessary data yet. + if self.output_progress.less_than(write_ts) { + // We can advance the output upper up to the write_ts. For + // self-referential CTs this might be necessary to ensure + // dataflow progress. + return Some((Antichain::from_elem(write_ts.clone()), Vec::new())); + } + return None; + } + + // Time to write some data! Produce the collection as of write_ts by + // advancing timestamps, consolidating, and filtering out anything at + // future timestamps. + let as_of = &[write_ts.clone()]; + for (_, t, _) in &mut self.to_append { + t.advance_by(AntichainRef::new(as_of)) + } + // TODO(ct): Metrics for vec len and cap. + consolidate_updates(&mut self.to_append); + // TODO(ct): Resize the vec down if cap >> len? Or perhaps `Correction` + // might already be very close to what we need. + + let append_data = self + .to_append + .iter() + .filter_map(|((k, v), t, d)| (t <= write_ts).then_some(((k, v), t, d))) + .collect(); + Some((Antichain::from_elem(write_ts.step_forward()), append_data)) + } +} + +// TODO(ct): Write this as a non-async operator. +// +// Unfortunately, we can _almost_ use the stock `delay` operator, but not quite. +// This must advance both data and the output frontier forward, while delay only +// advances the data. +fn step_backward(name: &str, input: Collection) -> Collection +where + G: Scope, + D: Data, + R: Semigroup + 'static, +{ + let name = format!("ct_step_backward({})", name); + let mut builder = AsyncOperatorBuilder::new(name, input.scope()); + let (mut output, output_stream) = builder.new_output(); + let mut input = builder.new_input_for(&input.inner, Pipeline, &output); + builder.build(move |caps| async move { + let [mut cap]: [_; 1] = caps.try_into().expect("one capability per output"); + loop { + let Some(event) = input.next().await else { + return; + }; + match event { + Event::Data(_data_cap, mut data) => { + for (_, ts, _) in &mut data { + *ts = ts + .step_back() + .expect("should only receive data at times past the as_of"); + } + output.give_container(&cap, &mut data); + } + Event::Progress(new_progress) => { + let new_progress = new_progress.into_option().and_then(|x| x.step_back()); + let Some(new_progress) = new_progress else { + continue; + }; + if cap.time() < &new_progress { + cap.downgrade(&new_progress); + } + } + } + } + }); + + output_stream.as_collection() +} + +// TODO(ct): Write this as a non-async operator. +fn step_forward(name: &str, input: Collection) -> Collection +where + G: Scope, + D: Data, + R: Semigroup + 'static, +{ + let name = format!("ct_step_forward({})", name); + let mut builder = AsyncOperatorBuilder::new(name.clone(), input.scope()); + let mut input = builder.new_disconnected_input(&input.inner, Pipeline); + let (mut output, output_stream) = builder.new_output(); + builder.build(move |caps| async move { + let [mut cap]: [_; 1] = caps.try_into().expect("one capability per output"); + loop { + let Some(event) = input.next().await else { + return; + }; + match event { + Event::Data(_data_cap, mut data) => { + for (_, ts, _) in &mut data { + *ts = ts.step_forward(); + } + output.give_container(&cap, &mut data); + } + Event::Progress(progress) => { + if let Some(progress) = progress.into_option() { + cap.downgrade(&progress.step_forward()); + } + } + } + } + }); + + output_stream.as_collection() +} + +// This is essentially a specialized impl of consolidate, with a HashMap instead +// of the Trace. +fn times_reduce(name: &str, input: Collection) -> Collection +where + G: Scope, + R: Semigroup + 'static + std::fmt::Debug, +{ + let name = format!("ct_times_reduce({})", name); + input + .inner + .unary_frontier(Pipeline, &name, |_caps, _info| { + let mut notificator = FrontierNotificator::new(); + let mut stash = HashMap::<_, R>::new(); + let mut buf = Vec::new(); + move |input, output| { + while let Some((cap, data)) = input.next() { + data.swap(&mut buf); + for ((), ts, diff) in buf.drain(..) { + notificator.notify_at(cap.delayed(&ts)); + if let Some(sum) = stash.get_mut(&ts) { + sum.plus_equals(&diff); + } else { + stash.insert(ts, diff); + } + } + } + notificator.for_each(&[input.frontier()], |cap, _not| { + if let Some(diff) = stash.remove(cap.time()) { + output.session(&cap).give(((), cap.time().clone(), diff)); + } + }); + } + }) + .as_collection() +} + +#[cfg(test)] +mod tests { + use differential_dataflow::AsCollection; + use mz_repr::Timestamp; + use timely::dataflow::operators::capture::Extract; + use timely::dataflow::operators::{Capture, Input, ToStream}; + use timely::dataflow::ProbeHandle; + use timely::progress::Antichain; + use timely::Config; + + #[mz_ore::test] + fn step_forward() { + timely::execute(Config::thread(), |worker| { + let (mut input, probe, output) = worker.dataflow(|scope| { + let (handle, input) = scope.new_input(); + let mut probe = ProbeHandle::::new(); + let output = super::step_forward("test", input.as_collection()) + .probe_with(&mut probe) + .inner + .capture(); + (handle, probe, output) + }); + + let mut expected = Vec::new(); + for i in 0u64..10 { + let in_ts = Timestamp::new(i); + let out_ts = in_ts.step_forward(); + input.send((i, in_ts, 1)); + input.advance_to(in_ts.step_forward()); + + // We should get the data out advanced by `step_forward` and + // also, crucially, the output frontier should do the same (i.e. + // this is why we can't simply use `Collection::delay`). + worker.step_while(|| probe.less_than(&out_ts.step_forward())); + expected.push((i, out_ts, 1)); + } + // Closing the input should allow the output to advance and the + // dataflow to shut down. + input.close(); + while worker.step() {} + + let actual = output + .extract() + .into_iter() + .flat_map(|x| x.1) + .collect::>(); + assert_eq!(actual, expected); + }) + .unwrap(); + } + + #[mz_ore::test] + fn step_backward() { + timely::execute(Config::thread(), |worker| { + let (mut input, probe, output) = worker.dataflow(|scope| { + let (handle, input) = scope.new_input(); + let mut probe = ProbeHandle::::new(); + let output = super::step_backward("test", input.as_collection()) + .probe_with(&mut probe) + .inner + .capture(); + (handle, probe, output) + }); + + let mut expected = Vec::new(); + for i in 0u64..10 { + // Notice that these are declared backward: out_ts < in_ts + let out_ts = Timestamp::new(i); + let in_ts = out_ts.step_forward(); + input.send((i, in_ts, 1)); + input.advance_to(in_ts.step_forward()); + + // We should get the data out regressed by `step_backward`. + worker.step_while(|| probe.less_than(&out_ts.step_forward())); + expected.push((i, out_ts, 1)); + } + // Closing the input should allow the output to advance and the + // dataflow to shut down. + input.close(); + while worker.step() {} + + let actual = output + .extract() + .into_iter() + .flat_map(|x| x.1) + .collect::>(); + assert_eq!(actual, expected); + }) + .unwrap(); + } + + #[mz_ore::test] + fn times_reduce() { + let output = timely::execute_directly(|worker| { + worker.dataflow(|scope| { + let input = [ + ((), Timestamp::new(3), 1), + ((), Timestamp::new(2), 1), + ((), Timestamp::new(1), 1), + ((), Timestamp::new(2), 1), + ((), Timestamp::new(3), 1), + ((), Timestamp::new(3), 1), + ] + .to_stream(scope) + .as_collection(); + super::times_reduce("test", input).inner.capture() + }) + }); + let expected = vec![ + ((), Timestamp::new(1), 1), + ((), Timestamp::new(2), 2), + ((), Timestamp::new(3), 3), + ]; + let actual = output + .extract() + .into_iter() + .flat_map(|x| x.1) + .collect::>(); + assert_eq!(actual, expected); + } + + #[mz_ore::test] + fn ct_sink_state() { + #[track_caller] + fn assert_noop(state: &mut super::SinkState<&'static str, (), Timestamp, i64>) { + if let Some(ret) = state.process() { + panic!("should be nothing to write: {:?}", ret); + } + } + + #[track_caller] + fn assert_write( + state: &mut super::SinkState<&'static str, (), Timestamp, i64>, + expected_upper: u64, + expected_append: &[&str], + ) { + let (new_upper, to_append) = state.process().expect("should be something to write"); + assert_eq!( + new_upper, + Antichain::from_elem(Timestamp::new(expected_upper)) + ); + let to_append = to_append + .into_iter() + .map(|((k, ()), _ts, _diff)| *k) + .collect::>(); + assert_eq!(to_append, expected_append); + } + + let mut s = super::SinkState::new(); + + // Nothing to do at the initial state. + assert_noop(&mut s); + + // Getting data to append is not enough to do anything yet. + s.to_append.push((("a", ()), 1.into(), 1)); + s.to_append.push((("b", ()), 1.into(), 1)); + assert_noop(&mut s); + + // Knowing that this is the only data we'll get for that timestamp is + // still not enough. + s.to_append_progress = Antichain::from_elem(2.into()); + assert_noop(&mut s); + + // Even knowing that we got input at that time is not quite enough yet + // (we could be getting these out of order). + s.append_times.insert(1.into()); + assert_noop(&mut s); + + // Indeed, it did come out of order. Also note that this checks the == + // case for time vs progress. + s.append_times.insert(0.into()); + assert_noop(&mut s); + + // Okay, now we know that we've seen all the times we got input up to 3. + // This is enough to allow the empty write of `[0,1)`. + s.append_times_progress = Antichain::from_elem(3.into()); + assert_write(&mut s, 1, &[]); + + // That succeeded, now we can write the data at 1. + s.output_progress = Antichain::from_elem(1.into()); + assert_write(&mut s, 2, &["a", "b"]); + + // That succeeded, now we know about some empty time. + s.output_progress = Antichain::from_elem(2.into()); + assert_write(&mut s, 3, &[]); + + // That succeeded, now nothing to do. + s.output_progress = Antichain::from_elem(3.into()); + assert_noop(&mut s); + + // Find out about a new time to write at. Even without the data, we can + // do an empty write up to that time. + s.append_times.insert(5.into()); + s.append_times_progress = Antichain::from_elem(6.into()); + assert_write(&mut s, 5, &[]); + + // That succeeded, now nothing to do again. + s.output_progress = Antichain::from_elem(5.into()); + + // Retract one of the things currently in the collection and add a new + // thing, to verify the consolidate. + s.to_append.push((("a", ()), 5.into(), -1)); + s.to_append.push((("c", ()), 5.into(), 1)); + s.to_append_progress = Antichain::from_elem(6.into()); + assert_write(&mut s, 6, &["b", "c"]); + } +} diff --git a/src/compute/src/render/sinks.rs b/src/compute/src/render/sinks.rs index 08b77708bdf27..aecf6d43eadee 100644 --- a/src/compute/src/render/sinks.rs +++ b/src/compute/src/render/sinks.rs @@ -47,6 +47,7 @@ where sink_id: GlobalId, sink: &ComputeSinkDesc, start_signal: StartSignal, + ct_times: Option>, ) { soft_assert_or_log!( sink.non_null_assertions.is_strictly_sorted(), @@ -122,6 +123,9 @@ where let region_name = match sink.connection { ComputeSinkConnection::Subscribe(_) => format!("SubscribeSink({:?})", sink_id), ComputeSinkConnection::Persist(_) => format!("PersistSink({:?})", sink_id), + ComputeSinkConnection::ContinualTask(_) => { + format!("ContinualTask({:?})", sink_id) + } ComputeSinkConnection::CopyToS3Oneshot(_) => { format!("CopyToS3OneshotSink({:?})", sink_id) } @@ -140,6 +144,7 @@ where start_signal, ok_collection.enter_region(inner), err_collection.enter_region(inner), + ct_times.map(|x| x.enter_region(inner)), ); if let Some(sink_token) = sink_token { @@ -166,6 +171,9 @@ where start_signal: StartSignal, sinked_collection: Collection, err_collection: Collection, + // TODO(ct): Figure out a better way to smuggle this in, potentially by + // removing the `SinkRender` trait entirely. + ct_times: Option>, ) -> Option>; } @@ -178,6 +186,7 @@ where match connection { ComputeSinkConnection::Subscribe(connection) => Box::new(connection.clone()), ComputeSinkConnection::Persist(connection) => Box::new(connection.clone()), + ComputeSinkConnection::ContinualTask(connection) => Box::new(connection.clone()), ComputeSinkConnection::CopyToS3Oneshot(connection) => Box::new(connection.clone()), } } diff --git a/src/compute/src/sink/copy_to_s3_oneshot.rs b/src/compute/src/sink/copy_to_s3_oneshot.rs index 9c0a9cebe5eaa..3809f826d8c5c 100644 --- a/src/compute/src/sink/copy_to_s3_oneshot.rs +++ b/src/compute/src/sink/copy_to_s3_oneshot.rs @@ -45,6 +45,7 @@ where _start_signal: StartSignal, sinked_collection: Collection, err_collection: Collection, + _ct_times: Option>, ) -> Option> { // An encapsulation of the copy to response protocol. // Used to send rows and errors if this fails. diff --git a/src/compute/src/sink/persist_sink.rs b/src/compute/src/sink/persist_sink.rs index a2ee9846e4349..acb29945b982a 100644 --- a/src/compute/src/sink/persist_sink.rs +++ b/src/compute/src/sink/persist_sink.rs @@ -60,6 +60,7 @@ where start_signal: StartSignal, mut ok_collection: Collection, mut err_collection: Collection, + _ct_times: Option>, ) -> Option> { // Attach a probe reporting the compute frontier. // The `apply_refresh` operator can round up frontiers, making it impossible to accurately diff --git a/src/compute/src/sink/subscribe.rs b/src/compute/src/sink/subscribe.rs index 9768a09ba976b..11a6ac9d17c49 100644 --- a/src/compute/src/sink/subscribe.rs +++ b/src/compute/src/sink/subscribe.rs @@ -42,6 +42,7 @@ where _start_signal: StartSignal, sinked_collection: Collection, err_collection: Collection, + _ct_times: Option>, ) -> Option> { // An encapsulation of the Subscribe response protocol. // Used to send rows and progress messages, diff --git a/src/environmentd/src/http/sql.rs b/src/environmentd/src/http/sql.rs index 908f36db1c80c..60d21aa686e68 100644 --- a/src/environmentd/src/http/sql.rs +++ b/src/environmentd/src/http/sql.rs @@ -1222,6 +1222,7 @@ async fn execute_stmt( | ExecuteResponse::CreatedView { .. } | ExecuteResponse::CreatedViews { .. } | ExecuteResponse::CreatedMaterializedView { .. } + | ExecuteResponse::CreatedContinualTask { .. } | ExecuteResponse::CreatedType | ExecuteResponse::Comment | ExecuteResponse::Deleted(_) diff --git a/src/pgwire/src/protocol.rs b/src/pgwire/src/protocol.rs index 4f4deab78f7c5..12a4bcc84ae4f 100644 --- a/src/pgwire/src/protocol.rs +++ b/src/pgwire/src/protocol.rs @@ -1721,6 +1721,7 @@ where | ExecuteResponse::CreatedIndex { .. } | ExecuteResponse::CreatedIntrospectionSubscribe | ExecuteResponse::CreatedMaterializedView { .. } + | ExecuteResponse::CreatedContinualTask { .. } | ExecuteResponse::CreatedRole | ExecuteResponse::CreatedSchema { .. } | ExecuteResponse::CreatedSecret { .. } diff --git a/src/sql-lexer/src/keywords.txt b/src/sql-lexer/src/keywords.txt index 9199c08546729..abb29501cfee1 100644 --- a/src/sql-lexer/src/keywords.txt +++ b/src/sql-lexer/src/keywords.txt @@ -95,6 +95,7 @@ Confluent Connection Connections Constraint +Continual Copy Count Counter @@ -423,6 +424,7 @@ System Table Tables Tail +Task Temp Temporary Text diff --git a/src/sql-parser/src/ast/defs/statement.rs b/src/sql-parser/src/ast/defs/statement.rs index ce0501ca01e2a..3c914f6d5dca5 100644 --- a/src/sql-parser/src/ast/defs/statement.rs +++ b/src/sql-parser/src/ast/defs/statement.rs @@ -29,10 +29,11 @@ use crate::ast::display::{self, AstDisplay, AstFormatter, WithOptionName}; use crate::ast::{ AstInfo, ColumnDef, ConnectionOption, ConnectionOptionName, CreateConnectionOption, CreateConnectionType, CreateSinkConnection, CreateSourceConnection, CreateSourceOption, - CreateSourceOptionName, DeferredItemName, Expr, Format, FormatSpecifier, Ident, IntervalValue, - KeyConstraint, MaterializedViewOption, Query, SelectItem, SinkEnvelope, SourceEnvelope, - SourceIncludeMetadata, SubscribeOutput, TableAlias, TableConstraint, TableWithJoins, - UnresolvedDatabaseName, UnresolvedItemName, UnresolvedObjectName, UnresolvedSchemaName, Value, + CreateSourceOptionName, CteMutRecColumnDef, DeferredItemName, Expr, Format, FormatSpecifier, + Ident, IntervalValue, KeyConstraint, MaterializedViewOption, Query, SelectItem, SinkEnvelope, + SourceEnvelope, SourceIncludeMetadata, SubscribeOutput, TableAlias, TableConstraint, + TableWithJoins, UnresolvedDatabaseName, UnresolvedItemName, UnresolvedObjectName, + UnresolvedSchemaName, Value, }; /// A top-level statement (SELECT, INSERT, CREATE, etc.) @@ -54,6 +55,7 @@ pub enum Statement { CreateSink(CreateSinkStatement), CreateView(CreateViewStatement), CreateMaterializedView(CreateMaterializedViewStatement), + CreateContinualTask(CreateContinualTaskStatement), CreateTable(CreateTableStatement), CreateTableFromSource(CreateTableFromSourceStatement), CreateIndex(CreateIndexStatement), @@ -127,6 +129,7 @@ impl AstDisplay for Statement { Statement::CreateSink(stmt) => f.write_node(stmt), Statement::CreateView(stmt) => f.write_node(stmt), Statement::CreateMaterializedView(stmt) => f.write_node(stmt), + Statement::CreateContinualTask(stmt) => f.write_node(stmt), Statement::CreateTable(stmt) => f.write_node(stmt), Statement::CreateTableFromSource(stmt) => f.write_node(stmt), Statement::CreateIndex(stmt) => f.write_node(stmt), @@ -203,6 +206,7 @@ pub fn statement_kind_label_value(kind: StatementKind) -> &'static str { StatementKind::CreateSink => "create_sink", StatementKind::CreateView => "create_view", StatementKind::CreateMaterializedView => "create_materialized_view", + StatementKind::CreateContinualTask => "create_continual_task", StatementKind::CreateTable => "create_table", StatementKind::CreateTableFromSource => "create_table_from_source", StatementKind::CreateIndex => "create_index", @@ -1391,6 +1395,58 @@ impl AstDisplay for CreateMaterializedViewStatement { } impl_display_t!(CreateMaterializedViewStatement); +/// `CREATE CONTINUAL TASK` +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct CreateContinualTaskStatement { + pub name: UnresolvedItemName, + pub columns: Vec>, + pub in_cluster: Option, + + // The thing we get input diffs from + pub input: T::ItemName, + + // The txn to execute on each set of diffs + pub stmts: Vec>, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum ContinualTaskStmt { + Delete(DeleteStatement), + Insert(InsertStatement), + // TODO(ct): Update/upsert? +} + +impl AstDisplay for CreateContinualTaskStatement { + fn fmt(&self, f: &mut AstFormatter) { + f.write_str("CREATE CONTINUAL TASK "); + f.write_node(&self.name); + f.write_str(" ("); + f.write_node(&display::comma_separated(&self.columns)); + f.write_str(")"); + + if let Some(cluster) = &self.in_cluster { + f.write_str(" IN CLUSTER "); + f.write_node(cluster); + } + + f.write_str(" ON INPUT "); + f.write_node(&self.input); + + f.write_str(" AS ("); + for (idx, stmt) in self.stmts.iter().enumerate() { + if idx > 0 { + f.write_str("; "); + } + match stmt { + ContinualTaskStmt::Delete(stmt) => f.write_node(stmt), + ContinualTaskStmt::Insert(stmt) => f.write_node(stmt), + } + } + f.write_str(")"); + } +} +impl_display_t!(CreateContinualTaskStatement); + /// `ALTER SET CLUSTER` #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct AlterSetClusterStatement { diff --git a/src/sql-parser/src/parser.rs b/src/sql-parser/src/parser.rs index 66c61a33de552..979f5f56d2a0c 100644 --- a/src/sql-parser/src/parser.rs +++ b/src/sql-parser/src/parser.rs @@ -1889,6 +1889,9 @@ impl<'a> Parser<'a> { { self.parse_create_materialized_view() .map_parser_err(StatementKind::CreateMaterializedView) + } else if self.peek_keywords(&[CONTINUAL, TASK]) { + self.parse_create_continual_task() + .map_parser_err(StatementKind::CreateContinualTask) } else if self.peek_keywords(&[USER]) { parser_err!( self, @@ -3576,6 +3579,73 @@ impl<'a> Parser<'a> { )) } + fn parse_create_continual_task(&mut self) -> Result, ParserError> { + // TODO(ct): If exists. + self.expect_keywords(&[CONTINUAL, TASK])?; + + // TODO(ct): Multiple outputs. + let name = self.parse_item_name()?; + self.expect_token(&Token::LParen)?; + let columns = self.parse_comma_separated(|parser| { + // TODO(ct): NOT NULL, etc. + Ok(CteMutRecColumnDef { + name: parser.parse_identifier()?, + data_type: parser.parse_data_type()?, + }) + })?; + self.expect_token(&Token::RParen)?; + let in_cluster = self.parse_optional_in_cluster()?; + + // TODO(ct): Multiple inputs. + self.expect_keywords(&[ON, INPUT])?; + let input_table = self.parse_raw_name()?; + // TODO(ct): Allow renaming the inserts/deletes so that we can use + // something as both an "input" and a "reference". + + self.expect_keyword(AS)?; + + let mut stmts = Vec::new(); + let mut expecting_statement_delimiter = false; + self.expect_token(&Token::LParen)?; + // TODO(ct): Dedup this with parse_statements? + loop { + // ignore empty statements (between successive statement delimiters) + while self.consume_token(&Token::Semicolon) { + expecting_statement_delimiter = false; + } + + if self.consume_token(&Token::RParen) { + break; + } else if expecting_statement_delimiter { + self.expected(self.peek_pos(), "end of statement", self.peek_token())? + } + + let stmt = self.parse_statement().map_err(|err| err.error)?.ast; + match stmt { + Statement::Delete(stmt) => stmts.push(ContinualTaskStmt::Delete(stmt)), + Statement::Insert(stmt) => stmts.push(ContinualTaskStmt::Insert(stmt)), + _ => { + return parser_err!( + self, + self.peek_prev_pos(), + "unsupported query in CREATE CONTINUAL TASK" + ); + } + } + expecting_statement_delimiter = true; + } + + Ok(Statement::CreateContinualTask( + CreateContinualTaskStatement { + name, + columns, + in_cluster, + input: input_table, + stmts, + }, + )) + } + fn parse_materialized_view_option_name( &mut self, ) -> Result { diff --git a/src/sql-parser/tests/testdata/continual-task b/src/sql-parser/tests/testdata/continual-task new file mode 100644 index 0000000000000..858f54654e6ec --- /dev/null +++ b/src/sql-parser/tests/testdata/continual-task @@ -0,0 +1,39 @@ +# Copyright 2020 sqlparser-rs contributors. All rights reserved. +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# This file is derived from the sqlparser-rs project, available at +# https://github.com/andygrove/sqlparser-rs. It was incorporated +# directly into Materialize on December 21, 2019. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License in the LICENSE file at the +# root of this repository, or online at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +parse-statement +CREATE CONTINUAL TASK foo (key int, val int) ON INPUT append_only AS ( + DELETE FROM output WHERE key IN (SELECT key FROM inserts); + INSERT INTO output SELECT key, max(value) FROM inserts GROUP BY key; +); +---- +CREATE CONTINUAL TASK foo (key int4, val int4) ON INPUT append_only AS (DELETE FROM output WHERE key IN (SELECT key FROM inserts); INSERT INTO output SELECT key, max(value) FROM inserts GROUP BY key) +=> +CreateContinualTask(CreateContinualTaskStatement { name: UnresolvedItemName([Ident("foo")]), columns: [CteMutRecColumnDef { name: Ident("key"), data_type: Other { name: Name(UnresolvedItemName([Ident("int4")])), typ_mod: [] } }, CteMutRecColumnDef { name: Ident("val"), data_type: Other { name: Name(UnresolvedItemName([Ident("int4")])), typ_mod: [] } }], in_cluster: None, input: Name(UnresolvedItemName([Ident("append_only")])), stmts: [Delete(DeleteStatement { table_name: Name(UnresolvedItemName([Ident("output")])), alias: None, using: [], selection: Some(InSubquery { expr: Identifier([Ident("key")]), subquery: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Identifier([Ident("key")]), alias: None }], from: [TableWithJoins { relation: Table { name: Name(UnresolvedItemName([Ident("inserts")])), alias: None }, joins: [] }], selection: None, group_by: [], having: None, options: [] }), order_by: [], limit: None, offset: None }, negated: false }) }), Insert(InsertStatement { table_name: Name(UnresolvedItemName([Ident("output")])), columns: [], source: Query(Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Identifier([Ident("key")]), alias: None }, Expr { expr: Function(Function { name: Name(UnresolvedItemName([Ident("max")])), args: Args { args: [Identifier([Ident("value")])], order_by: [] }, filter: None, over: None, distinct: false }), alias: None }], from: [TableWithJoins { relation: Table { name: Name(UnresolvedItemName([Ident("inserts")])), alias: None }, joins: [] }], selection: None, group_by: [Identifier([Ident("key")])], having: None, options: [] }), order_by: [], limit: None, offset: None }), returning: [] })] }) + +parse-statement +CREATE CONTINUAL TASK "materialize"."public"."upsert" ("key" [s20 AS "pg_catalog"."int4"], "val" [s20 AS "pg_catalog"."int4"]) IN CLUSTER [s1] ON INPUT [u1 AS "materialize"."public"."append_only"] AS ( + DELETE FROM "upsert" WHERE "key" IN (SELECT "key" FROM [u1 AS "materialize"."public"."append_only"]); + INSERT INTO "upsert" SELECT "key", "pg_catalog"."max"("val") FROM [u1 AS "materialize"."public"."append_only"] GROUP BY "key" +) +---- +CREATE CONTINUAL TASK materialize.public.upsert (key [s20 AS pg_catalog.int4], val [s20 AS pg_catalog.int4]) IN CLUSTER [s1] ON INPUT [u1 AS materialize.public.append_only] AS (DELETE FROM upsert WHERE key IN (SELECT key FROM [u1 AS materialize.public.append_only]); INSERT INTO upsert SELECT key, pg_catalog.max(val) FROM [u1 AS materialize.public.append_only] GROUP BY key) +=> +CreateContinualTask(CreateContinualTaskStatement { name: UnresolvedItemName([Ident("materialize"), Ident("public"), Ident("upsert")]), columns: [CteMutRecColumnDef { name: Ident("key"), data_type: Other { name: Id("s20", UnresolvedItemName([Ident("pg_catalog"), Ident("int4")])), typ_mod: [] } }, CteMutRecColumnDef { name: Ident("val"), data_type: Other { name: Id("s20", UnresolvedItemName([Ident("pg_catalog"), Ident("int4")])), typ_mod: [] } }], in_cluster: Some(Resolved("s1")), input: Id("u1", UnresolvedItemName([Ident("materialize"), Ident("public"), Ident("append_only")])), stmts: [Delete(DeleteStatement { table_name: Name(UnresolvedItemName([Ident("upsert")])), alias: None, using: [], selection: Some(InSubquery { expr: Identifier([Ident("key")]), subquery: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Identifier([Ident("key")]), alias: None }], from: [TableWithJoins { relation: Table { name: Id("u1", UnresolvedItemName([Ident("materialize"), Ident("public"), Ident("append_only")])), alias: None }, joins: [] }], selection: None, group_by: [], having: None, options: [] }), order_by: [], limit: None, offset: None }, negated: false }) }), Insert(InsertStatement { table_name: Name(UnresolvedItemName([Ident("upsert")])), columns: [], source: Query(Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Identifier([Ident("key")]), alias: None }, Expr { expr: Function(Function { name: Name(UnresolvedItemName([Ident("pg_catalog"), Ident("max")])), args: Args { args: [Identifier([Ident("val")])], order_by: [] }, filter: None, over: None, distinct: false }), alias: None }], from: [TableWithJoins { relation: Table { name: Id("u1", UnresolvedItemName([Ident("materialize"), Ident("public"), Ident("append_only")])), alias: None }, joins: [] }], selection: None, group_by: [Identifier([Ident("key")])], having: None, options: [] }), order_by: [], limit: None, offset: None }), returning: [] })] }) diff --git a/src/sql/src/names.rs b/src/sql/src/names.rs index 75800afe84e13..576cf40aeb6ea 100644 --- a/src/sql/src/names.rs +++ b/src/sql/src/names.rs @@ -22,7 +22,7 @@ use mz_ore::str::StrExt; use mz_repr::role_id::RoleId; use mz_repr::ColumnName; use mz_repr::GlobalId; -use mz_sql_parser::ast::Expr; +use mz_sql_parser::ast::{CreateContinualTaskStatement, Expr}; use mz_sql_parser::ident; use proptest_derive::Arbitrary; use serde::{Deserialize, Serialize}; @@ -1569,6 +1569,17 @@ impl<'a> Fold for NameResolver<'a> { result } + fn fold_create_continual_task_statement( + &mut self, + stmt: CreateContinualTaskStatement, + ) -> CreateContinualTaskStatement { + // TODO(ct): Insert a fake CTE so that using the name of the continual + // task in the inserts and deletes resolves. + let cte_name = normalize::ident(stmt.name.0.last().expect("TODO(ct)").clone()); + self.ctes.insert(cte_name, LocalId::new(0)); + mz_sql_parser::ast::fold::fold_create_continual_task_statement(self, stmt) + } + fn fold_cte_id(&mut self, _id: ::CteId) -> ::CteId { panic!("this should have been handled when walking the CTE"); } diff --git a/src/sql/src/normalize.rs b/src/sql/src/normalize.rs index eae5029fb287d..09212b825fbb4 100644 --- a/src/sql/src/normalize.rs +++ b/src/sql/src/normalize.rs @@ -21,8 +21,9 @@ use mz_repr::{ColumnName, GlobalId}; use mz_sql_parser::ast::display::AstDisplay; use mz_sql_parser::ast::visit_mut::{self, VisitMut}; use mz_sql_parser::ast::{ - CreateConnectionStatement, CreateIndexStatement, CreateMaterializedViewStatement, - CreateSecretStatement, CreateSinkStatement, CreateSourceStatement, CreateSubsourceStatement, + ContinualTaskStmt, CreateConnectionStatement, CreateContinualTaskStatement, + CreateIndexStatement, CreateMaterializedViewStatement, CreateSecretStatement, + CreateSinkStatement, CreateSourceStatement, CreateSubsourceStatement, CreateTableFromSourceStatement, CreateTableStatement, CreateTypeStatement, CreateViewStatement, CreateWebhookSourceStatement, CteBlock, Function, FunctionArgs, Ident, IfExistsBehavior, MutRecBlock, Op, Query, Statement, TableFactor, UnresolvedItemName, UnresolvedSchemaName, @@ -415,6 +416,27 @@ pub fn create_statement( *if_exists = IfExistsBehavior::Error; } + Statement::CreateContinualTask(CreateContinualTaskStatement { + name, + columns: _, + input, + stmts, + in_cluster: _, + }) => { + *name = allocate_name(name)?; + let mut normalizer = QueryNormalizer::new(); + normalizer.visit_item_name_mut(input); + for stmt in stmts { + match stmt { + ContinualTaskStmt::Delete(stmt) => normalizer.visit_delete_statement_mut(stmt), + ContinualTaskStmt::Insert(stmt) => normalizer.visit_insert_statement_mut(stmt), + } + } + if let Some(err) = normalizer.err { + return Err(err); + } + } + Statement::CreateIndex(CreateIndexStatement { name: _, in_cluster: _, diff --git a/src/sql/src/plan.rs b/src/sql/src/plan.rs index 636215cad456e..897069e61a332 100644 --- a/src/sql/src/plan.rs +++ b/src/sql/src/plan.rs @@ -137,6 +137,7 @@ pub enum Plan { CreateTable(CreateTablePlan), CreateView(CreateViewPlan), CreateMaterializedView(CreateMaterializedViewPlan), + CreateContinualTask(CreateContinualTaskPlan), CreateIndex(CreateIndexPlan), CreateType(CreateTypePlan), Comment(CommentPlan), @@ -259,6 +260,7 @@ impl Plan { StatementKind::CreateDatabase => &[PlanKind::CreateDatabase], StatementKind::CreateIndex => &[PlanKind::CreateIndex], StatementKind::CreateMaterializedView => &[PlanKind::CreateMaterializedView], + StatementKind::CreateContinualTask => &[PlanKind::CreateContinualTask], StatementKind::CreateRole => &[PlanKind::CreateRole], StatementKind::CreateSchema => &[PlanKind::CreateSchema], StatementKind::CreateSecret => &[PlanKind::CreateSecret], @@ -327,6 +329,7 @@ impl Plan { Plan::CreateTable(_) => "create table", Plan::CreateView(_) => "create view", Plan::CreateMaterializedView(_) => "create materialized view", + Plan::CreateContinualTask(_) => "create continual task", Plan::CreateIndex(_) => "create index", Plan::CreateType(_) => "create type", Plan::Comment(_) => "comment", @@ -703,6 +706,16 @@ pub struct CreateMaterializedViewPlan { pub ambiguous_columns: bool, } +#[derive(Debug, Clone)] +pub struct CreateContinualTaskPlan { + pub name: QualifiedItemName, + pub desc: RelationDesc, + // TODO(ct): Multiple inputs. + pub input_id: GlobalId, + pub continual_task: MaterializedView, + // TODO(ct): replace, drop_ids, if_not_exists +} + #[derive(Debug, Clone)] pub struct CreateIndexPlan { pub name: QualifiedItemName, diff --git a/src/sql/src/plan/query.rs b/src/sql/src/plan/query.rs index 1dad7b41c41ea..e577eb593dec4 100644 --- a/src/sql/src/plan/query.rs +++ b/src/sql/src/plan/query.rs @@ -165,6 +165,57 @@ pub fn plan_root_query( }) } +/// TODO(ct): Dedup this with [plan_root_query]. +#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")] +pub fn plan_ct_query( + qcx: &mut QueryContext, + mut query: Query, +) -> Result, PlanError> { + transform_ast::transform(qcx.scx, &mut query)?; + let PlannedQuery { + mut expr, + scope, + order_by, + limit, + offset, + project, + group_size_hints, + } = plan_query(qcx, &query)?; + + let mut finishing = RowSetFinishing { + limit, + offset, + project, + order_by, + }; + + // Attempt to push the finishing's ordering past its projection. This allows + // data to be projected down on the workers rather than the coordinator. It + // also improves the optimizer's demand analysis, as the optimizer can only + // reason about demand information in `expr` (i.e., it can't see + // `finishing.project`). + try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by); + + expr.finish_maintained(&mut finishing, group_size_hints); + + let typ = qcx.relation_type(&expr); + let typ = RelationType::new( + finishing + .project + .iter() + .map(|i| typ.column_types[*i].clone()) + .collect(), + ); + let desc = RelationDesc::new(typ, scope.column_names()); + + Ok(PlannedRootQuery { + expr, + desc, + finishing, + scope, + }) +} + /// Attempts to push a projection through an order by. /// /// The returned bool indicates whether the pushdown was successful or not. @@ -6083,8 +6134,8 @@ impl QueryLifetime { /// Description of a CTE sufficient for query planning. #[derive(Debug, Clone)] pub struct CteDesc { - name: String, - desc: RelationDesc, + pub name: String, + pub desc: RelationDesc, } /// The state required when planning a `Query`. diff --git a/src/sql/src/plan/statement.rs b/src/sql/src/plan/statement.rs index fbad34087db56..029f19ba26035 100644 --- a/src/sql/src/plan/statement.rs +++ b/src/sql/src/plan/statement.rs @@ -158,6 +158,7 @@ pub fn describe( Statement::CreateMaterializedView(stmt) => { ddl::describe_create_materialized_view(&scx, stmt)? } + Statement::CreateContinualTask(stmt) => ddl::describe_create_continual_task(&scx, stmt)?, Statement::DropObjects(stmt) => ddl::describe_drop_objects(&scx, stmt)?, Statement::DropOwned(stmt) => ddl::describe_drop_owned(&scx, stmt)?, @@ -343,6 +344,7 @@ pub fn plan( Statement::CreateMaterializedView(stmt) => { ddl::plan_create_materialized_view(scx, stmt, params) } + Statement::CreateContinualTask(stmt) => ddl::plan_create_continual_task(scx, stmt, params), Statement::DropObjects(stmt) => ddl::plan_drop_objects(scx, stmt), Statement::DropOwned(stmt) => ddl::plan_drop_owned(scx, stmt), @@ -1037,6 +1039,7 @@ impl From<&Statement> for StatementClassifica Statement::CreateCluster(_) => DDL, Statement::CreateClusterReplica(_) => DDL, Statement::CreateConnection(_) => DDL, + Statement::CreateContinualTask(_) => DDL, Statement::CreateDatabase(_) => DDL, Statement::CreateIndex(_) => DDL, Statement::CreateRole(_) => DDL, diff --git a/src/sql/src/plan/statement/ddl.rs b/src/sql/src/plan/statement/ddl.rs index 0f52ad13da749..2f92a306fb6c8 100644 --- a/src/sql/src/plan/statement/ddl.rs +++ b/src/sql/src/plan/statement/ddl.rs @@ -22,7 +22,7 @@ use mz_adapter_types::compaction::{CompactionWindow, DEFAULT_LOGICAL_COMPACTION_ use mz_controller_types::{ is_cluster_size_v2, ClusterId, ReplicaId, DEFAULT_REPLICA_LOGGING_INTERVAL, }; -use mz_expr::{CollectionPlan, UnmaterializableFunc}; +use mz_expr::{CollectionPlan, LocalId, UnmaterializableFunc}; use mz_interchange::avro::{AvroSchemaGenerator, DocTarget}; use mz_ore::cast::{CastFrom, TryCastFrom}; use mz_ore::collections::{CollectionExt, HashSet}; @@ -54,26 +54,26 @@ use mz_sql_parser::ast::{ ClusterScheduleOptionValue, ColumnDef, ColumnOption, CommentObjectType, CommentStatement, ConnectionOption, ConnectionOptionName, CreateClusterReplicaStatement, CreateClusterStatement, CreateConnectionOption, CreateConnectionOptionName, CreateConnectionStatement, - CreateConnectionType, CreateDatabaseStatement, CreateIndexStatement, - CreateMaterializedViewStatement, CreateRoleStatement, CreateSchemaStatement, - CreateSecretStatement, CreateSinkConnection, CreateSinkOption, CreateSinkOptionName, - CreateSinkStatement, CreateSourceConnection, CreateSourceOption, CreateSourceOptionName, - CreateSourceStatement, CreateSubsourceOption, CreateSubsourceOptionName, - CreateSubsourceStatement, CreateTableFromSourceStatement, CreateTableStatement, CreateTypeAs, - CreateTypeListOption, CreateTypeListOptionName, CreateTypeMapOption, CreateTypeMapOptionName, - CreateTypeStatement, CreateViewStatement, CreateWebhookSourceStatement, CsrConfigOption, - CsrConfigOptionName, CsrConnection, CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf, - CsvColumns, DeferredItemName, DocOnIdentifier, DocOnSchema, DropObjectsStatement, - DropOwnedStatement, Expr, Format, FormatSpecifier, Ident, IfExistsBehavior, IndexOption, - IndexOptionName, KafkaSinkConfigOption, KeyConstraint, LoadGeneratorOption, - LoadGeneratorOptionName, MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption, - MySqlConfigOptionName, PgConfigOption, PgConfigOptionName, ProtobufSchema, QualifiedReplica, - RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue, ReplicaDefinition, - ReplicaOption, ReplicaOptionName, RoleAttribute, SetRoleVar, SourceErrorPolicy, - SourceIncludeMetadata, Statement, TableConstraint, TableFromSourceOption, - TableFromSourceOptionName, TableOption, TableOptionName, UnresolvedDatabaseName, - UnresolvedItemName, UnresolvedObjectName, UnresolvedSchemaName, Value, ViewDefinition, - WithOptionValue, + CreateConnectionType, CreateContinualTaskStatement, CreateDatabaseStatement, + CreateIndexStatement, CreateMaterializedViewStatement, CreateRoleStatement, + CreateSchemaStatement, CreateSecretStatement, CreateSinkConnection, CreateSinkOption, + CreateSinkOptionName, CreateSinkStatement, CreateSourceConnection, CreateSourceOption, + CreateSourceOptionName, CreateSourceStatement, CreateSubsourceOption, + CreateSubsourceOptionName, CreateSubsourceStatement, CreateTableFromSourceStatement, + CreateTableStatement, CreateTypeAs, CreateTypeListOption, CreateTypeListOptionName, + CreateTypeMapOption, CreateTypeMapOptionName, CreateTypeStatement, CreateViewStatement, + CreateWebhookSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection, + CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf, CsvColumns, DeferredItemName, + DocOnIdentifier, DocOnSchema, DropObjectsStatement, DropOwnedStatement, Expr, Format, + FormatSpecifier, Ident, IfExistsBehavior, IndexOption, IndexOptionName, KafkaSinkConfigOption, + KeyConstraint, LoadGeneratorOption, LoadGeneratorOptionName, MaterializedViewOption, + MaterializedViewOptionName, MySqlConfigOption, MySqlConfigOptionName, PgConfigOption, + PgConfigOptionName, ProtobufSchema, QualifiedReplica, RefreshAtOptionValue, + RefreshEveryOptionValue, RefreshOptionValue, ReplicaDefinition, ReplicaOption, + ReplicaOptionName, RoleAttribute, SetRoleVar, SourceErrorPolicy, SourceIncludeMetadata, + Statement, TableConstraint, TableFromSourceOption, TableFromSourceOptionName, TableOption, + TableOptionName, UnresolvedDatabaseName, UnresolvedItemName, UnresolvedObjectName, + UnresolvedSchemaName, Value, ViewDefinition, WithOptionValue, }; use mz_sql_parser::ident; use mz_sql_parser::parser::StatementParseResult; @@ -124,7 +124,9 @@ use crate::names::{ }; use crate::normalize::{self, ident}; use crate::plan::error::PlanError; -use crate::plan::query::{plan_expr, scalar_type_from_catalog, ExprContext, QueryLifetime}; +use crate::plan::query::{ + plan_expr, scalar_type_from_catalog, scalar_type_from_sql, CteDesc, ExprContext, QueryLifetime, +}; use crate::plan::scope::Scope; use crate::plan::statement::ddl::connection::{INALTERABLE_OPTIONS, MUTUALLY_EXCLUSIVE_SETS}; use crate::plan::statement::{scl, StatementContext, StatementDesc}; @@ -139,15 +141,16 @@ use crate::plan::{ AlterSystemResetPlan, AlterSystemSetPlan, AlterTablePlan, ClusterSchedule, CommentPlan, ComputeReplicaConfig, ComputeReplicaIntrospectionConfig, ConnectionDetails, CreateClusterManagedPlan, CreateClusterPlan, CreateClusterReplicaPlan, - CreateClusterUnmanagedPlan, CreateClusterVariant, CreateConnectionPlan, CreateDatabasePlan, - CreateIndexPlan, CreateMaterializedViewPlan, CreateRolePlan, CreateSchemaPlan, - CreateSecretPlan, CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan, - CreateViewPlan, DataSourceDesc, DropObjectsPlan, DropOwnedPlan, FullItemName, Index, Ingestion, - MaterializedView, Params, Plan, PlanClusterOption, PlanNotice, QueryContext, ReplicaConfig, - Secret, Sink, Source, Table, TableDataSource, Type, VariableValue, View, WebhookBodyFormat, - WebhookHeaderFilters, WebhookHeaders, WebhookValidation, + CreateClusterUnmanagedPlan, CreateClusterVariant, CreateConnectionPlan, + CreateContinualTaskPlan, CreateDatabasePlan, CreateIndexPlan, CreateMaterializedViewPlan, + CreateRolePlan, CreateSchemaPlan, CreateSecretPlan, CreateSinkPlan, CreateSourcePlan, + CreateTablePlan, CreateTypePlan, CreateViewPlan, DataSourceDesc, DropObjectsPlan, + DropOwnedPlan, FullItemName, Index, Ingestion, MaterializedView, Params, Plan, + PlanClusterOption, PlanNotice, QueryContext, ReplicaConfig, Secret, Sink, Source, Table, + TableDataSource, Type, VariableValue, View, WebhookBodyFormat, WebhookHeaderFilters, + WebhookHeaders, WebhookValidation, }; -use crate::session::vars; +use crate::session::vars::{self, ENABLE_CREATE_CONTINUAL_TASK}; use crate::session::vars::{ ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_KAFKA_SINK_HEADERS, ENABLE_KAFKA_SINK_PARTITION_BY, ENABLE_REFRESH_EVERY_MVS, @@ -2434,6 +2437,13 @@ pub fn describe_create_materialized_view( Ok(StatementDesc::new(None)) } +pub fn describe_create_continual_task( + _: &StatementContext, + _: CreateContinualTaskStatement, +) -> Result { + Ok(StatementDesc::new(None)) +} + pub fn plan_create_materialized_view( scx: &StatementContext, mut stmt: CreateMaterializedViewStatement, @@ -2706,6 +2716,180 @@ generate_extracted_config!( (Refresh, RefreshOptionValue, AllowMultiple) ); +pub fn plan_create_continual_task( + scx: &StatementContext, + mut stmt: CreateContinualTaskStatement, + params: &Params, +) -> Result { + scx.require_feature_flag(&ENABLE_CREATE_CONTINUAL_TASK)?; + let cluster_id = match &stmt.in_cluster { + None => scx.catalog.resolve_cluster(None)?.id(), + Some(in_cluster) => in_cluster.id, + }; + stmt.in_cluster = Some(ResolvedClusterName { + id: cluster_id, + print_name: None, + }); + + let create_sql = + normalize::create_statement(scx, Statement::CreateContinualTask(stmt.clone()))?; + + let partial_name = normalize::unresolved_item_name(stmt.name)?; + let name = scx.allocate_qualified_name(partial_name.clone())?; + let desc = { + let mut desc_columns = Vec::with_capacity(stmt.columns.capacity()); + for col in stmt.columns.iter() { + desc_columns.push(( + normalize::column_name(col.name.clone()), + ColumnType { + scalar_type: scalar_type_from_sql(scx, &col.data_type)?, + nullable: true, + }, + )); + } + RelationDesc::from_names_and_types(desc_columns) + }; + let input = scx.get_item_by_resolved_name(&stmt.input)?; + + let mut qcx = QueryContext::root(scx, QueryLifetime::MaterializedView); + qcx.ctes.insert( + LocalId::new(0), + CteDesc { + name: name.item.clone(), + desc: desc.clone(), + }, + ); + + let mut exprs = Vec::new(); + for stmt in &stmt.stmts { + let query = continual_task_query(&name, stmt).ok_or_else(|| sql_err!("TODO(ct)"))?; + let query::PlannedRootQuery { + mut expr, + desc: desc_query, + finishing, + scope: _, + } = query::plan_ct_query(&mut qcx, query)?; + // We get back a trivial finishing, see comment in `plan_view`. + assert!(finishing.is_trivial(expr.arity())); + // TODO(ct): Is this right? + expr.bind_parameters(params)?; + // TODO(ct): Make this error message more closely match the various ones + // given for INSERT/DELETE. + if desc_query.iter_types().ne(desc.iter_types()) { + sql_bail!( + "CONTINUAL TASK query columns did not match: {:?} vs {:?}", + desc_query.iter().collect::>(), + desc.iter().collect::>() + ); + } + match stmt { + ast::ContinualTaskStmt::Insert(_) => exprs.push(expr), + ast::ContinualTaskStmt::Delete(_) => exprs.push(expr.negate()), + } + } + // TODO(ct): Collect things by output and assert that there is only one (or + // support multiple outputs). + let expr = exprs + .into_iter() + .reduce(|acc, expr| acc.union(expr)) + .ok_or_else(|| sql_err!("TODO(ct)"))?; + + let column_names: Vec = desc.iter_names().cloned().collect(); + if let Some(dup) = column_names.iter().duplicates().next() { + sql_bail!("column {} specified more than once", dup.as_str().quoted()); + } + + // Check for an object in the catalog with this same name + let full_name = scx.catalog.resolve_full_name(&name); + let partial_name = PartialItemName::from(full_name.clone()); + // For PostgreSQL compatibility, we need to prevent creating this when there + // is an existing object *or* type of the same name. + if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) { + return Err(PlanError::ItemAlreadyExists { + name: full_name.to_string(), + item_type: item.item_type(), + }); + } + + Ok(Plan::CreateContinualTask(CreateContinualTaskPlan { + name, + desc, + input_id: input.id(), + continual_task: MaterializedView { + create_sql, + expr, + column_names, + cluster_id, + non_null_assertions: Vec::new(), + compaction_window: None, + refresh_schedule: None, + as_of: None, + }, + })) +} + +fn continual_task_query<'a>( + ct_name: &QualifiedItemName, + stmt: &'a ast::ContinualTaskStmt, +) -> Option> { + match stmt { + ast::ContinualTaskStmt::Insert(ast::InsertStatement { + table_name: _, + columns, + source, + returning, + }) => { + if !columns.is_empty() || !returning.is_empty() { + return None; + } + match source { + ast::InsertSource::Query(query) => Some(query.clone()), + ast::InsertSource::DefaultValues => None, + } + } + ast::ContinualTaskStmt::Delete(ast::DeleteStatement { + table_name: _, + alias, + using, + selection, + }) => { + if !using.is_empty() { + return None; + } + // Construct a `SELECT *` with the `DELETE` selection as a `WHERE`. + let from = ast::TableWithJoins { + relation: ast::TableFactor::Table { + // TODO(ct): Huge hack. + name: ResolvedItemName::Cte { + id: LocalId::new(0), + name: ct_name.item.clone(), + }, + alias: alias.clone(), + }, + joins: Vec::new(), + }; + let select = ast::Select { + from: vec![from], + selection: selection.clone(), + distinct: None, + projection: vec![ast::SelectItem::Wildcard], + group_by: Vec::new(), + having: None, + options: Vec::new(), + }; + let query = ast::Query { + ctes: ast::CteBlock::Simple(Vec::new()), + body: ast::SetExpr::Select(Box::new(select)), + order_by: Vec::new(), + limit: None, + offset: None, + }; + // Then negate it to turn it into retractions (after planning it). + Some(query) + } + } +} + pub fn describe_create_sink( _: &StatementContext, _: CreateSinkStatement, diff --git a/src/sql/src/rbac.rs b/src/sql/src/rbac.rs index 372cb8f772522..4c008aa40549e 100644 --- a/src/sql/src/rbac.rs +++ b/src/sql/src/rbac.rs @@ -600,6 +600,27 @@ fn generate_rbac_requirements( item_usage: &CREATE_ITEM_USAGE, ..Default::default() }, + Plan::CreateContinualTask(plan::CreateContinualTaskPlan { + name, + desc: _, + input_id: _, + continual_task, + }) => RbacRequirements { + privileges: vec![ + ( + SystemObjectId::Object(name.qualifiers.clone().into()), + AclMode::CREATE, + role_id, + ), + ( + SystemObjectId::Object(continual_task.cluster_id.into()), + AclMode::CREATE, + role_id, + ), + ], + item_usage: &CREATE_ITEM_USAGE, + ..Default::default() + }, Plan::CreateIndex(plan::CreateIndexPlan { name, index, diff --git a/src/sql/src/session/vars/definitions.rs b/src/sql/src/session/vars/definitions.rs index 6759680ce94e2..0f3ee002e5577 100644 --- a/src/sql/src/session/vars/definitions.rs +++ b/src/sql/src/session/vars/definitions.rs @@ -2122,6 +2122,12 @@ feature_flags!( default: true, enable_for_item_parsing: false, }, + { + name: enable_create_continual_task, + desc: "CREATE CONTINUAL TASK", + default: false, + enable_for_item_parsing: true, + }, ); impl From<&super::SystemVars> for OptimizerFeatures { diff --git a/test/sqllogictest/ct_audit_log.slt b/test/sqllogictest/ct_audit_log.slt new file mode 100644 index 0000000000000..80f7dc1b0b330 --- /dev/null +++ b/test/sqllogictest/ct_audit_log.slt @@ -0,0 +1,53 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +mode cockroach + +simple conn=mz_system,user=mz_system +ALTER SYSTEM SET enable_create_continual_task = true +---- +COMPLETE 0 + +statement ok +CREATE TABLE mv_input (key INT) + +statement ok +INSERT INTO mv_input VALUES (1); + +statement ok +CREATE MATERIALIZED VIEW anomalies AS SELECT sum(key)::INT FROM mv_input; + +query I +SELECT * FROM anomalies +---- +1 + +statement ok +CREATE CONTINUAL TASK audit_log (count INT) ON INPUT anomalies AS ( + INSERT INTO audit_log SELECT * FROM anomalies; +) + +query I +SELECT * FROM audit_log +---- +1 + +statement ok +INSERT INTO mv_input VALUES (2), (3) + +query I +SELECT * FROM anomalies +---- +6 + +query I +SELECT * FROM audit_log +---- +1 +6 diff --git a/test/sqllogictest/ct_errors.slt b/test/sqllogictest/ct_errors.slt new file mode 100644 index 0000000000000..5a1ab3cfeda66 --- /dev/null +++ b/test/sqllogictest/ct_errors.slt @@ -0,0 +1,36 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +mode cockroach + +statement ok +CREATE TABLE foo (key INT) + +# Feature flag is off by default +statement error CREATE CONTINUAL TASK is not supported +CREATE CONTINUAL TASK nope (key INT) ON INPUT foo AS ( + INSERT INTO nope SELECT * FROM foo; +) + +simple conn=mz_system,user=mz_system +ALTER SYSTEM SET enable_create_continual_task = true +---- +COMPLETE 0 + +# INSERT columns do not match +statement error CONTINUAL TASK query columns did not match +CREATE CONTINUAL TASK nope (key STRING) ON INPUT foo AS ( + INSERT INTO nope SELECT * FROM foo; +) + +# TODO(ct): Make this error (or work!) instead of panic +# statement error something +# CREATE CONTINUAL TASK nope (key INT) ON INPUT foo AS ( +# INSERT INTO nope SELECT null::INT +# ) diff --git a/test/sqllogictest/ct_stream_table_join.slt b/test/sqllogictest/ct_stream_table_join.slt new file mode 100644 index 0000000000000..2078b4b2893f9 --- /dev/null +++ b/test/sqllogictest/ct_stream_table_join.slt @@ -0,0 +1,49 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +mode cockroach + +simple conn=mz_system,user=mz_system +ALTER SYSTEM SET enable_create_continual_task = true +---- +COMPLETE 0 + +statement ok +CREATE TABLE big (key INT) + +statement ok +CREATE TABLE small (key INT, val STRING) + +statement ok +INSERT INTO small VALUES (1, 'v0') + +statement ok +CREATE CONTINUAL TASK stj (key INT, val STRING) ON INPUT big AS ( + INSERT INTO stj SELECT b.key, s.val FROM big b JOIN small s ON b.key = s.key; +) + +statement ok +INSERT INTO big VALUES (1) + +query IT +SELECT * FROM stj +---- +1 v0 + +statement ok +UPDATE small SET val = 'v1' + +statement ok +INSERT INTO big VALUES (1) + +query IT +SELECT * FROM stj +---- +1 v0 +1 v1 diff --git a/test/sqllogictest/ct_upsert.slt b/test/sqllogictest/ct_upsert.slt new file mode 100644 index 0000000000000..a3fad1a33db8f --- /dev/null +++ b/test/sqllogictest/ct_upsert.slt @@ -0,0 +1,41 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +mode cockroach + +simple conn=mz_system,user=mz_system +ALTER SYSTEM SET enable_create_continual_task = true +---- +COMPLETE 0 + +statement ok +CREATE TABLE append_only (key INT, val INT) + +statement ok +CREATE CONTINUAL TASK upsert (key INT, val INT) ON INPUT append_only AS ( + DELETE FROM upsert WHERE key IN (SELECT key FROM append_only); + INSERT INTO upsert SELECT key, max(val) FROM append_only GROUP BY key; +) + +statement ok +INSERT INTO append_only VALUES (1, 2), (1, 1) + +query II +SELECT * FROM upsert +---- +1 2 + +statement ok +INSERT INTO append_only VALUES (1, 3), (2, 4) + +query IT +SELECT * FROM upsert +---- +1 3 +2 4