Skip to content

Commit

Permalink
Merge pull request #29518 from danhhz/ct
Browse files Browse the repository at this point in the history
ct: add strawman impl of CREATE CONTINUAL TASK
  • Loading branch information
danhhz authored Sep 19, 2024
2 parents 286001b + a2feb1c commit 7b492cc
Show file tree
Hide file tree
Showing 39 changed files with 1,924 additions and 53 deletions.
10 changes: 10 additions & 0 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,16 @@ impl Catalog {
}
policies
}

/// TODO(ct): This exists for continual tasks because they are the first
/// self-referential thing in mz. We use this to inject something for the
/// optimizer to resolve the CT's own id to before we've saved it to the
/// catalog. The one usage of this is careful to destroy this copy of
/// catalog immediately after. There are better ways to do this, but it was
/// the easiest path to get the skeleton of the feature merged.
pub fn hack_add_ct(&mut self, id: GlobalId, entry: CatalogEntry) {
self.state.entry_by_id.insert(id, entry);
}
}

#[derive(Debug)]
Expand Down
17 changes: 10 additions & 7 deletions src/adapter/src/catalog/builtin_table_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1245,16 +1245,19 @@ impl CatalogState {
})
.into_element()
.ast;
let query = match &create_stmt {
Statement::CreateMaterializedView(stmt) => &stmt.query,
let query_string = match &create_stmt {
Statement::CreateMaterializedView(stmt) => {
let mut query_string = stmt.query.to_ast_string_stable();
// PostgreSQL appends a semicolon in `pg_matviews.definition`, we
// do the same for compatibility's sake.
query_string.push(';');
query_string
}
// TODO(ct): Remove.
Statement::CreateContinualTask(_) => "TODO(ct)".into(),
_ => unreachable!(),
};

let mut query_string = query.to_ast_string_stable();
// PostgreSQL appends a semicolon in `pg_matviews.definition`, we
// do the same for compatibility's sake.
query_string.push(';');

let mut updates = Vec::new();

updates.push(BuiltinTableUpdate {
Expand Down
32 changes: 29 additions & 3 deletions src/adapter/src/catalog/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use mz_controller::clusters::{
UnmanagedReplicaLocation,
};
use mz_controller_types::{ClusterId, ReplicaId};
use mz_expr::OptimizedMirRelationExpr;
use mz_ore::collections::CollectionExt;
use mz_ore::now::NOW_ZERO;
use mz_ore::soft_assert_no_log;
Expand Down Expand Up @@ -65,9 +66,9 @@ use mz_sql::names::{
ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
};
use mz_sql::plan::{
CreateConnectionPlan, CreateIndexPlan, CreateMaterializedViewPlan, CreateSecretPlan,
CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, Params,
Plan, PlanContext,
CreateConnectionPlan, CreateContinualTaskPlan, CreateIndexPlan, CreateMaterializedViewPlan,
CreateSecretPlan, CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan,
CreateViewPlan, Params, Plan, PlanContext,
};
use mz_sql::rbac;
use mz_sql::session::metadata::SessionMetadata;
Expand Down Expand Up @@ -946,6 +947,31 @@ impl CatalogState {
initial_as_of,
})
}
Plan::CreateContinualTask(CreateContinualTaskPlan {
desc,
continual_task,
..
}) => {
// TODO(ct): Figure out how to make this survive restarts. The
// expr we saved still had the LocalId placeholders for the
// output, but we don't have access to the real Id here.
let optimized_expr = OptimizedMirRelationExpr::declare_optimized(
mz_expr::MirRelationExpr::constant(Vec::new(), desc.typ().clone()),
);
// TODO(ct): CatalogItem::ContinualTask
CatalogItem::MaterializedView(MaterializedView {
create_sql: continual_task.create_sql,
raw_expr: Arc::new(continual_task.expr.clone()),
optimized_expr: Arc::new(optimized_expr),
desc,
resolved_ids,
cluster_id: continual_task.cluster_id,
non_null_assertions: continual_task.non_null_assertions,
custom_logical_compaction_window: continual_task.compaction_window,
refresh_schedule: continual_task.refresh_schedule,
initial_as_of: continual_task.as_of.map(Antichain::from_elem),
})
}
Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index {
create_sql: index.create_sql,
on: index.on,
Expand Down
5 changes: 5 additions & 0 deletions src/adapter/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,8 @@ pub enum ExecuteResponse {
CreatedViews,
/// The requested materialized view was created.
CreatedMaterializedView,
/// The requested continual task was created.
CreatedContinualTask,
/// The requested type was created.
CreatedType,
/// The requested prepared statement was removed.
Expand Down Expand Up @@ -490,6 +492,7 @@ impl TryInto<ExecuteResponse> for ExecuteResponseKind {
ExecuteResponseKind::CreatedMaterializedView => {
Ok(ExecuteResponse::CreatedMaterializedView)
}
ExecuteResponseKind::CreatedContinualTask => Ok(ExecuteResponse::CreatedContinualTask),
ExecuteResponseKind::CreatedType => Ok(ExecuteResponse::CreatedType),
ExecuteResponseKind::Deallocate => Err(()),
ExecuteResponseKind::DeclaredCursor => Ok(ExecuteResponse::DeclaredCursor),
Expand Down Expand Up @@ -551,6 +554,7 @@ impl ExecuteResponse {
CreatedView { .. } => Some("CREATE VIEW".into()),
CreatedViews { .. } => Some("CREATE VIEWS".into()),
CreatedMaterializedView { .. } => Some("CREATE MATERIALIZED VIEW".into()),
CreatedContinualTask { .. } => Some("CREATE CONTINUAL TASK".into()),
CreatedType => Some("CREATE TYPE".into()),
Deallocate { all } => Some(format!("DEALLOCATE{}", if *all { " ALL" } else { "" })),
DeclaredCursor => Some("DECLARE CURSOR".into()),
Expand Down Expand Up @@ -639,6 +643,7 @@ impl ExecuteResponse {
CreateTable => &[CreatedTable],
CreateView => &[CreatedView],
CreateMaterializedView => &[CreatedMaterializedView],
CreateContinualTask => &[CreatedContinualTask],
CreateIndex => &[CreatedIndex],
CreateType => &[CreatedType],
PlanKind::Deallocate => &[ExecuteResponseKind::Deallocate],
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/coord/catalog_serving.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ pub fn auto_run_on_catalog_server<'a, 's, 'p>(
| Plan::CreateRole(_)
| Plan::CreateCluster(_)
| Plan::CreateClusterReplica(_)
| Plan::CreateContinualTask(_)
| Plan::CreateSource(_)
| Plan::CreateSources(_)
| Plan::CreateSecret(_)
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/coord/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,7 @@ impl Coordinator {
| Statement::CreateDatabase(_)
| Statement::CreateIndex(_)
| Statement::CreateMaterializedView(_)
| Statement::CreateContinualTask(_)
| Statement::CreateRole(_)
| Statement::CreateSchema(_)
| Statement::CreateSecret(_)
Expand Down
6 changes: 6 additions & 0 deletions src/adapter/src/coord/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,12 @@ impl Coordinator {
self.sequence_create_materialized_view(ctx, plan, resolved_ids)
.await;
}
Plan::CreateContinualTask(plan) => {
let res = self
.sequence_create_continual_task(ctx.session(), plan, resolved_ids)
.await;
ctx.retire(res);
}
Plan::CreateIndex(plan) => {
self.sequence_create_index(ctx, plan, resolved_ids).await;
}
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ use crate::util::{viewable_variables, ClientTransmitter, ResultExt};
use crate::{guard_write_critical_section, PeekResponseUnary, ReadHolds};

mod cluster;
mod create_continual_task;
mod create_index;
mod create_materialized_view;
mod create_view;
Expand Down
208 changes: 208 additions & 0 deletions src/adapter/src/coord/sequencer/inner/create_continual_task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::sync::Arc;

use mz_catalog::memory::objects::{
CatalogEntry, CatalogItem, MaterializedView, Table, TableDataSource,
};
use mz_compute_types::sinks::{
ComputeSinkConnection, ContinualTaskConnection, PersistSinkConnection,
};
use mz_expr::visit::Visit;
use mz_expr::{Id, LocalId};
use mz_ore::instrument;
use mz_repr::adt::mz_acl_item::PrivilegeMap;
use mz_repr::optimize::OverrideFrom;
use mz_sql::names::ResolvedIds;
use mz_sql::plan::{self, HirRelationExpr};
use mz_sql::session::metadata::SessionMetadata;
use mz_storage_client::controller::{CollectionDescription, DataSource, DataSourceOther};

use crate::catalog;
use crate::command::ExecuteResponse;
use crate::coord::Coordinator;
use crate::error::AdapterError;
use crate::optimize::dataflows::dataflow_import_id_bundle;
use crate::optimize::{self, Optimize};
use crate::session::Session;
use crate::util::ResultExt;

// TODO(ct): Big oof. Dedup a bunch of this with MVs.
impl Coordinator {
#[instrument]
pub(crate) async fn sequence_create_continual_task(
&mut self,
session: &Session,
plan: plan::CreateContinualTaskPlan,
resolved_ids: ResolvedIds,
) -> Result<ExecuteResponse, AdapterError> {
let plan::CreateContinualTaskPlan {
name,
desc,
input_id,
continual_task:
plan::MaterializedView {
create_sql,
cluster_id,
mut expr,
column_names,
non_null_assertions,
compaction_window: _,
refresh_schedule,
as_of: _,
},
} = plan;

// Collect optimizer parameters.
let compute_instance = self
.instance_snapshot(cluster_id)
.expect("compute instance does not exist");

let debug_name = self.catalog().resolve_full_name(&name, None).to_string();
let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
.override_from(&self.catalog.get_cluster(cluster_id).config.features());

let view_id = self.allocate_transient_id();
let catalog_mut = self.catalog_mut();
let sink_id = catalog_mut.allocate_user_id().await?;

// Put a placeholder in the catalog so the optimizer can find something
// for the sink_id.
let fake_entry = CatalogEntry {
item: CatalogItem::Table(Table {
create_sql: Some(create_sql.clone()),
desc: desc.clone(),
conn_id: None,
resolved_ids: resolved_ids.clone(),
custom_logical_compaction_window: None,
is_retained_metrics_object: false,
data_source: TableDataSource::TableWrites {
defaults: Vec::new(),
},
}),
referenced_by: Vec::new(),
used_by: Vec::new(),
id: sink_id,
oid: 0,
name: name.clone(),
owner_id: *session.current_role_id(),
privileges: PrivilegeMap::new(),
};
catalog_mut.hack_add_ct(sink_id, fake_entry);

// Build an optimizer for this CONTINUAL TASK.
let mut optimizer = optimize::materialized_view::Optimizer::new(
Arc::new(catalog_mut.clone()),
compute_instance,
sink_id,
view_id,
column_names,
non_null_assertions,
refresh_schedule.clone(),
debug_name,
optimizer_config,
self.optimizer_metrics(),
);

// Replace our placeholder fake ctes with the real output id, now that
// we have it.
expr.visit_mut_post(&mut |expr| match expr {
HirRelationExpr::Get { id, .. } if *id == Id::Local(LocalId::new(0)) => {
*id = Id::Global(sink_id);
}
_ => {}
})?;

// HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local and global)
let raw_expr = expr.clone();
let local_mir_plan = optimizer.catch_unwind_optimize(expr)?;
let global_mir_plan = optimizer.catch_unwind_optimize(local_mir_plan.clone())?;
// MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
let global_lir_plan = optimizer.catch_unwind_optimize(global_mir_plan.clone())?;
let (mut df_desc, _df_meta) = global_lir_plan.unapply();

// Timestamp selection
let mut id_bundle = dataflow_import_id_bundle(&df_desc, cluster_id.clone());
// Can't acquire a read hold on ourselves because we don't exist yet.
//
// It is not necessary to take a read hold on the CT output in the
// coordinator, since the current scheme takes read holds in the
// coordinator only to ensure inputs don't get compacted until the
// compute controller has installed its own read holds, which happens
// below with the `ship_dataflow` call.
id_bundle.storage_ids.remove(&sink_id);
let read_holds = self.acquire_read_holds(&id_bundle);
let as_of = read_holds.least_valid_read();
df_desc.set_as_of(as_of.clone());

// TODO(ct): HACKs
for sink in df_desc.sink_exports.values_mut() {
match &mut sink.connection {
ComputeSinkConnection::Persist(PersistSinkConnection {
storage_metadata, ..
}) => {
sink.connection =
ComputeSinkConnection::ContinualTask(ContinualTaskConnection {
input_id,
storage_metadata: *storage_metadata,
})
}
_ => unreachable!("MV should produce persist sink connection"),
}
}

let ops = vec![catalog::Op::CreateItem {
id: sink_id,
name: name.clone(),
item: CatalogItem::MaterializedView(MaterializedView {
// TODO(ct): This doesn't give the `DELETE FROM` / `INSERT INTO`
// names the `[u1 AS "materialize"."public"."append_only"]`
// style expansion. Bug?
create_sql,
raw_expr: Arc::new(raw_expr),
optimized_expr: Arc::new(local_mir_plan.expr()),
desc: desc.clone(),
resolved_ids,
cluster_id,
non_null_assertions: Vec::new(),
custom_logical_compaction_window: None,
refresh_schedule,
initial_as_of: Some(as_of.clone()),
}),
owner_id: *session.current_role_id(),
}];

let () = self
.catalog_transact_with_side_effects(Some(session), ops, |coord| async {
coord
.controller
.storage
.create_collections(
coord.catalog.state().storage_metadata(),
None,
vec![(
sink_id,
CollectionDescription {
desc,
data_source: DataSource::Other(DataSourceOther::Compute),
since: Some(as_of),
status_collection_id: None,
},
)],
)
.await
.unwrap_or_terminate("cannot fail to append");

coord.ship_dataflow(df_desc, cluster_id.clone(), None).await;
})
.await?;
Ok(ExecuteResponse::CreatedContinualTask)
}
}
1 change: 1 addition & 0 deletions src/adapter/src/statement_logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ impl From<&ExecuteResponse> for StatementEndedExecutionReason {
| ExecuteResponse::CreatedView
| ExecuteResponse::CreatedViews
| ExecuteResponse::CreatedMaterializedView
| ExecuteResponse::CreatedContinualTask
| ExecuteResponse::CreatedType
| ExecuteResponse::Deallocate { .. }
| ExecuteResponse::DeclaredCursor
Expand Down
5 changes: 5 additions & 0 deletions src/catalog/src/durable/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1150,6 +1150,11 @@ impl ItemValue {
assert_eq!(tokens.next(), Some("VIEW"));
CatalogItemType::MaterializedView
}
Some("CONTINUAL") => {
assert_eq!(tokens.next(), Some("TASK"));
// TODO(ct): CatalogItemType::ContinualTask
CatalogItemType::MaterializedView
}
Some("INDEX") => CatalogItemType::Index,
Some("TYPE") => CatalogItemType::Type,
Some("FUNCTION") => CatalogItemType::Func,
Expand Down
Loading

0 comments on commit 7b492cc

Please sign in to comment.