Skip to content

Commit

Permalink
Add more logs
Browse files Browse the repository at this point in the history
  • Loading branch information
jkosh44 committed Sep 5, 2024
1 parent 0d3b33a commit 9b00e09
Show file tree
Hide file tree
Showing 19 changed files with 501 additions and 177 deletions.
4 changes: 2 additions & 2 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2340,7 +2340,7 @@ mod tests {
Catalog::open_debug_catalog(persist_client.clone(), organization_id.clone())
.await
.expect("unable to open debug catalog");
let item = catalog
let (item, _) = catalog
.state()
.deserialize_item(&create_sql)
.expect("unable to parse view");
Expand Down Expand Up @@ -3227,7 +3227,7 @@ mod tests {
let schema_spec = schema.id().clone();
let schema_name = &schema.name().schema;
let database_spec = ResolvedDatabaseSpecifier::Id(database_id);
let mv = catalog
let (mv, _) = catalog
.state()
.deserialize_item(&format!(
"CREATE MATERIALIZED VIEW {database_name}.{schema_name}.{mv_name} AS SELECT name FROM mz_tables"
Expand Down
260 changes: 216 additions & 44 deletions src/adapter/src/catalog/apply.rs

Large diffs are not rendered by default.

99 changes: 97 additions & 2 deletions src/adapter/src/catalog/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
// by the Apache License, Version 2.0.

use std::collections::BTreeMap;
use std::time::{Duration, Instant};
use tracing::info;

use futures::future::BoxFuture;
use mz_catalog::durable::Item;
Expand All @@ -24,7 +26,6 @@ use mz_sql_parser::ast::{Raw, Statement};
use mz_storage_types::connections::ConnectionContext;
use prost::Message;
use semver::Version;
use tracing::info;

// DO NOT add any more imports from `crate` outside of `crate::catalog`.
use crate::catalog::{CatalogState, ConnCatalog};
Expand All @@ -38,6 +39,7 @@ where
&'a Vec<(Item, Statement<Raw>)>,
) -> BoxFuture<'a, Result<(), anyhow::Error>>,
{
let parse_start = Instant::now();
let mut updated_items = BTreeMap::new();
let items_with_statements = tx
.get_items()
Expand All @@ -52,14 +54,49 @@ where
// migration is removed.
let items_with_statements_ref = items_with_statements.clone();

info!(
"STARTUP LOOK: rewrite AST items parse items took: {:?}",
parse_start.elapsed()
);

let mut closure_dur = Duration::new(0, 0);
let mut to_ast_dur = Duration::new(0, 0);
let mut insert_dur = Duration::new(0, 0);

for (mut item, mut stmt) in items_with_statements {
let clo_start = Instant::now();
f(tx, item.id, &mut stmt, &items_with_statements_ref).await?;
closure_dur += clo_start.elapsed();

let start = Instant::now();
item.create_sql = stmt.to_ast_string_stable();
to_ast_dur += start.elapsed();

let start = Instant::now();
updated_items.insert(item.id, item);
insert_dur += start.elapsed();
}

info!(
"STARTUP LOOK: rewrite AST items run closure took: {:?}",
closure_dur
);
info!(
"STARTUP LOOK: rewrite AST items to AST took: {:?}",
to_ast_dur
);
info!(
"STARTUP LOOK: rewrite AST items insert took: {:?}",
insert_dur
);

let start = Instant::now();
tx.update_items(updated_items)?;
info!(
"STARTUP LOOK: rewrite AST items tx update items took: {:?}",
start.elapsed()
);

Ok(())
}

Expand All @@ -78,16 +115,47 @@ where
{
let mut updated_items = BTreeMap::new();
let items = tx.get_items();

let mut parse_dur = Duration::new(0, 0);
let mut closure_dur = Duration::new(0, 0);
let mut to_ast_dur = Duration::new(0, 0);
let mut insert_dur = Duration::new(0, 0);

for mut item in items {
let start = Instant::now();
let mut stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
parse_dur += start.elapsed();

let start = Instant::now();
f(tx, &cat, item.id, &mut stmt).await?;
closure_dur += start.elapsed();

let start = Instant::now();
item.create_sql = stmt.to_ast_string_stable();
to_ast_dur += start.elapsed();

let start = Instant::now();
updated_items.insert(item.id, item);
insert_dur += start.elapsed();
}

info!(
"STARTUP LOOK: rewrite items parse items took: {:?}",
parse_dur
);
info!(
"STARTUP LOOK: rewrite items run closure took: {:?}",
closure_dur
);
info!("STARTUP LOOK: rewrite items to AST took: {:?}", to_ast_dur);
info!("STARTUP LOOK: rewrite items insert took: {:?}", insert_dur);

let start = Instant::now();
tx.update_items(updated_items)?;
info!(
"STARTUP LOOK: rewrite items tx update items took: {:?}",
start.elapsed()
);
Ok(())
}

Expand All @@ -98,17 +166,23 @@ pub(crate) async fn migrate(
_boot_ts: Timestamp,
_connection_context: &ConnectionContext,
) -> Result<(), anyhow::Error> {
let start = Instant::now();
let catalog_version = tx.get_catalog_content_version();
let catalog_version = match catalog_version {
Some(v) => Version::parse(&v)?,
None => Version::new(0, 0, 0),
};
info!(
"STARTUP LOOK: AST migration get catalog version took: {:?}",
start.elapsed()
);

info!(
"migrating statements from catalog version {:?}",
catalog_version
);

let rewrite_ast_start = Instant::now();
rewrite_ast_items(tx, |_tx, _id, stmt, all_items_and_statements| {
Box::pin(async move {
// Add per-item AST migrations below.
Expand All @@ -125,7 +199,12 @@ pub(crate) async fn migrate(
})
})
.await?;
info!(
"STARTUP LOOK: AST migration rewrite ast took: {:?}",
rewrite_ast_start.elapsed()
);

let temp_cat_start = Instant::now();
// Load up a temporary catalog.
let mut state = state.clone();
let item_updates = tx
Expand All @@ -137,10 +216,16 @@ pub(crate) async fn migrate(
})
.collect();
// The catalog is temporary, so we can throw out the builtin updates.
let _ = state.apply_updates_for_bootstrap(item_updates).await;
let (_, apply_timing) = state.apply_updates_for_bootstrap(item_updates).await;
info!(
"STARTUP LOOK: AST migration load temporary catalog took: {:?}, {:?}",
temp_cat_start.elapsed(),
apply_timing
);

info!("migrating from catalog version {:?}", catalog_version);

let rewrite_start = Instant::now();
let conn_cat = state.for_system_session();

rewrite_items(tx, &conn_cat, |_tx, _conn_cat, _id, _stmt| {
Expand All @@ -165,6 +250,16 @@ pub(crate) async fn migrate(
})
.await?;

info!(
"STARTUP LOOK: AST migration rewrite took: {:?}",
rewrite_start.elapsed()
);

info!(
"STARTUP LOOK: AST migration total took: {:?}",
start.elapsed()
);

// Add whole-catalog migrations below.
//
// Each migration should be a function that takes `tx` and `conn_cat` as
Expand Down
Loading

0 comments on commit 9b00e09

Please sign in to comment.