-
Notifications
You must be signed in to change notification settings - Fork 468
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ct: add strawman impl of CREATE CONTINUAL TASK #29518
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks promising! I need to read it again, but the general idea makes sense to me. What about adding a feature flag so we can gate the whole implementation and merge without needing to worry about breaking existing things? Leaving some comments, but don't treat them as instructions!
pub enum ContinualTaskStmt<T: AstInfo> { | ||
Delete(DeleteStatement<T>), | ||
Insert(InsertStatement<T>), | ||
// TODO(ct): Update/upsert? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Blue/green might be the easy answer to altering CTs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, though the thing you commented on is about the statements in a continual task definition, i.e., the DELETE FROM
and INSERT INTO
from Dan's example:
CREATE CONTINUAL TASK upsert (key INT, val INT) ON INPUT append_only AS (
DELETE FROM upsert WHERE key IN (SELECT key FROM append_only);
INSERT INTO upsert SELECT key, max(val) FROM append_only GROUP BY key;
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
☝️
Event::Progress(progress) => { | ||
if let Some(progress) = progress.into_option() { | ||
cap.downgrade(&progress.step_forward()); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clarify: Does this operator need to maintain a capability? Stepping the time forward maintains the invariant that Collection
requires.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, it doesn't. I was just being lazy because a capability at T
that is still outstanding on this operator's input needs to get mapped to T.step_forward()
on the output, which I think requires telling timely something via new_input_connection
that I might need your help writing.
let Some(new_progress) = new_progress else { | ||
continue; | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this drop the cap once the input advanced to the empty frontier? Maybe that's handled by the async operator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so. Once the input is empty, the input.next().await
return above exits us from the closure, dropping the cap. I'd like to rewrite this as a non-async operator though, so that you can feel confidant in the impl. I'm just a little rusty on those so this was faster during prototyping :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Posting what I have, didn't get very far yet.
In general, I worry a bit that we'll now do a cursory review only -- because many parts are meant to be not production ready yet and nothing of this will be switched on in prod anyway -- but then have no way of ensuring that we come back and make all of the parts production ready without missing any. I guess the TODO
s help with that a bit.
pub enum ContinualTaskStmt<T: AstInfo> { | ||
Delete(DeleteStatement<T>), | ||
Insert(InsertStatement<T>), | ||
// TODO(ct): Update/upsert? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, though the thing you commented on is about the statements in a continual task definition, i.e., the DELETE FROM
and INSERT INTO
from Dan's example:
CREATE CONTINUAL TASK upsert (key INT, val INT) ON INPUT append_only AS (
DELETE FROM upsert WHERE key IN (SELECT key FROM append_only);
INSERT INTO upsert SELECT key, max(val) FROM append_only GROUP BY key;
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about adding a feature flag so we can gate the whole implementation and merge without needing to worry about breaking existing things?
Done!
In general, I worry a bit that we'll now do a cursory review only -- because many parts are meant to be not production ready yet and nothing of this will be switched on in prod anyway -- but then have no way of ensuring that we come back and make all of the parts production ready without missing any. I guess the TODOs help with that a bit.
Yeah, I'm sympathetic to this! Otoh, I'm basically constitutionally incapable of polishing a big feature in a branch until every loose end is tied up. Happy to discuss what a middle ground looks like here that'll make you feel more confidant in the outcome. In the past, that's indeed looked like liberal use of TODOs (which the TODO(ct)
system is nice for distinguishing from the normal sort of TODOs that never get circled back to), but I'm open to other ideas.
If it makes you feel better, this approach has the distinct advantage of front-loading some of the unknown unknowns, both in the technical implementation and in the definition of the feature itself. It's also how persist and txn-wal were developed (and many crdb features).
pub enum ContinualTaskStmt<T: AstInfo> { | ||
Delete(DeleteStatement<T>), | ||
Insert(InsertStatement<T>), | ||
// TODO(ct): Update/upsert? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
☝️
let Some(new_progress) = new_progress else { | ||
continue; | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so. Once the input is empty, the input.next().await
return above exits us from the closure, dropping the cap. I'd like to rewrite this as a non-async operator though, so that you can feel confidant in the impl. I'm just a little rusty on those so this was faster during prototyping :)
Event::Progress(progress) => { | ||
if let Some(progress) = progress.into_option() { | ||
cap.downgrade(&progress.step_forward()); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, it doesn't. I was just being lazy because a capability at T
that is still outstanding on this operator's input needs to get mapped to T.step_forward()
on the output, which I think requires telling timely something via new_input_connection
that I might need your help writing.
I would be happy if after we consider the feature code complete there was some opportunity to give all the code related to it a final read, to make sure that things are consistent and we didn't forget to remove some of the hacks. I think the hard part will be collecting all the relevant changes and separating them from unrelated changes made during the same time.
This statement is actually extremely helpful in alleviating my concerns :) |
Adding @ParkMyCar for the adapter bits! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would be happy if after we consider the feature code complete there was some opportunity to give all the code related to it a final read, to make sure that things are consistent and we didn't forget to remove some of the hacks. I think the hard part will be collecting all the relevant changes and separating them from unrelated changes made during the same time.
Sounds good, modulo that there will probably be several incremental points where we ship things, rather than one "feature code complete" one.
As for the latter, I've been trying to keep the code as self-contained in a few places as possible. If we think that's not enough (and maybe not, given that a few of the hacks are necessarily in weird places), we could invent something like the TODO(ct)
system where I stick a marker everywhere we touch along the way, and then clean them all up in one big batch and/or as we're satisfied that they are hack-free?
It's also how persist and txn-wal were developed (and many crdb features).
This statement is actually extremely helpful in alleviating my concerns :)
<3
I'd be content with keeping things self-contained and adding |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fine to merge with CatalogItem{,Type}::MaterializedView or make ::ContinualTask from the start? I believe this one is a large amount of boilerplate.
IMO it's fine to merge with just ::MaterializedView
for now, but before we enable this in Prod, or even widely in staging, I would really want to make a ::ContinualTask
variant.
impl Staged
? I think this has something to do with blocking the coord loop and is probably fine to punt to a TODO(ct)
Exactly, and totally fine to punt for now!
LocalId/CTE hack for resolving self-references.
Need to understand Continual Tasks and LocalId
s a bit better, but I agree we should find a fix for this. Fine with merging as-is for now since AFAICT we're not durably recording these LocalId
s anywhere
Better way to inject something for the optimizer to resolve the id to than this CatalogEntry hack?
I don't think so! What you have currently is the only way I can think of, the Compute folks might have an alternate way though
I don't love
plan_ct_query
What don't you love about it? IMO it looked relatively straight forward, although de-duping with plan_root_query
would be nice
let mut stmts = Vec::new(); | ||
let mut expecting_statement_delimiter = false; | ||
self.expect_token(&Token::LParen)?; | ||
// TODO(ct): Dedup this with parse_statements? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 it seems like parse_statements(...)
would be a drop in replacement? Was there anything you ran into initially where parse_statements
didn't work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was, but I don't recall. Maaaaayyyybe because the exit condition was self.peek_token().is_none()
instead of self.consume_token(&Token::RParen)
?
Some("CONTINUAL") => { | ||
assert_eq!(tokens.next(), Some("TASK")); | ||
// TODO(ct): CatalogItemType::ContinualTask | ||
CatalogItemType::MaterializedView | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know it's a decent amount of boilerplate, but I really would prefer a CatalogItemType::ContinualTask
. No need to do it in this PR but it would be great as a follow up!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, that's the plan for sure. The TODO(ct) here should cover this.
// TODO(ct): Figure out how to make this survive restarts. The | ||
// expr we saved still had the LocalId placeholders for the | ||
// output, but we don't have access to the real Id here. | ||
let optimized_expr = OptimizedMirRelationExpr::declare_optimized( | ||
mz_expr::MirRelationExpr::constant(Vec::new(), desc.typ().clone()), | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm probably missing a bit here, but why can't we re-optimize the raw SQL? Is it because they're self-referential? This is how we persist information about all other catalog items.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, because they're self-referential. We talked about this offline, too, and feels like there's indeed some path to saving everything with ids filled in, which would make this work out of the box.
|
||
// TODO(ct): Big oof. Dedup a bunch of this with MVs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If ContinualTasks
really are almost identical to MaterializedViews
, then maybe we change naming within the adapter to something like ContinualDataflow
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indexes and subscribes are also a continual dataflows, so if we end up merging code paths for MVs and CTs, I think we should consider indexes/subscribes as well. There is probably a reason why sequencing for MVs and indexes is separate today (subscribes are a bit special because they are not added to the catalog), does anyone know that reason?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All this makes sense on the surface, but I haven't looked at all the details enough to have any particular opinion yet beyond "feels like at least some de-duplication is possible". It also probably depends on whether we give CTs their own Optimizer and also how the impl Staged
stuff shakes out.
pub fn hack_add_ct(&mut self, id: GlobalId, entry: CatalogEntry) { | ||
self.state.entry_by_id.insert(id, entry); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make sure I understand, this hack exists because Continual Tasks are the first kind of object that are self referential? Can you add the reason we have the hack as a doc comment on this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup and done!
src/sql/src/normalize.rs
Outdated
// WIP do we need to normalize columns and input? | ||
columns: _, | ||
input: _, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to double check, but I'm pretty sure columns
no, input
yes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another batch of comments. The only part I haven't quite grokked yet is the rendering/sink stuff. Btw, it would be very helpful to have this:
//! WIP overview of how this all fits together
|
||
// TODO(ct): Big oof. Dedup a bunch of this with MVs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indexes and subscribes are also a continual dataflows, so if we end up merging code paths for MVs and CTs, I think we should consider indexes/subscribes as well. There is probably a reason why sequencing for MVs and indexes is separate today (subscribes are a bit special because they are not added to the catalog), does anyone know that reason?
owner_id: *session.current_role_id(), | ||
privileges: PrivilegeMap::new(), | ||
}; | ||
catalog_mut.hack_add_ct(sink_id, fake_entry); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something I discussed with @antiguru recently: It probably isn't necessary to pass the entire catalog to the optimizer. We do it because it is convenient, but it is one of the reasons why we can't move the optimizer into a separate crate yet.
If we'd instead pass a OptimizerCollectionContext
(idk, naming is hard) that only contains the catalog parts actually required, we also wouldn't need this hack and could just add the output collection to the context too.
Another way to avoid this hack, I think: Have a special optimizer for continual tasks (optimize::continual_task::Optimizer
; we should probably do that anyway for consistency) and make it aware of the fact the the output collection can be an input too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something I discussed with @antiguru recently: It probably isn't necessary to pass the entire catalog to the optimizer. We do it because it is convenient, but it is one of the reasons why we can't move the optimizer into a separate crate yet.
If we'd instead pass a
OptimizerCollectionContext
(idk, naming is hard) that only contains the catalog parts actually required, we also wouldn't need this hack and could just add the output collection to the context too.
I might have missed something but I just quickly checked and it seems like Optimizer just gets the CatalogState
from this to hand to DataflowBuilder
and then DataflowBuilder
only uses get_entry
, resolve_full_name
, and get_indexes_on
from the CatalogState
. If that's true, this would be an easy-ish refactor? (Definitely something for a followup, even if it's that easy.)
Another way to avoid this hack, I think: Have a special optimizer for continual tasks (
optimize::continual_task::Optimizer
; we should probably do that anyway for consistency) and make it aware of the fact the the output collection can be an input too.
I'm truly still undecided on what my opinion for this one even is. On one hand, yeah separate feature => separate optimizer, seems obvious. On the other hand, the optimization part of CTs is basically identical to MVs and I worry that going down this road will prevent us from reusing a bunch of existing infra, in particular stuff like EXPLAIN
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If that's true, this would be an easy-ish refactor? (Definitely something for a followup, even if it's that easy.)
Might be true, I also don't have a lot more context. In my mind the optimizer needs the catalog only to know the types of the objects a dataflow is interacting with, and which indexes are available. I'd say it's definitely worth to take a shot at the refactor and see if we encounter any blockers.
On the other hand, the optimization part of CTs is basically identical to MVs and I worry that going down this road will prevent us from reusing a bunch of existing infra, in particular stuff like EXPLAIN
Hm, I didn't consider the EXPLAIN stuff. I wonder if it's not possible to make that generic over the concrete optimizer type. We already have the Optimize
trait and at a glance that should be sufficient for this purpose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might have missed something but I just quickly checked and it seems like Optimizer just gets the
CatalogState
from this to hand toDataflowBuilder
and thenDataflowBuilder
only usesget_entry
,resolve_full_name
, andget_indexes_on
from theCatalogState
. If that's true, this would be an easy-ish refactor? (Definitely something for a followup, even if it's that easy.)
Wow yeah maybe it really is that easy? #29598
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I didn't consider the EXPLAIN stuff. I wonder if it's not possible to make that generic over the concrete optimizer type. We already have the Optimize trait and at a glance that should be sufficient for this purpose.
Oh cool, good to know. It's possible this all works out, I haven't yet finished thinking specifically about it. Was just talking out why I haven't immediately jumped to forking out a CT Optimizer
I assume you mean We'd need another dataflow operator before the sink that performs the rounding. That could go inside Here is a more helpful answer: How about we get rid of the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another batch of comments. The only part I haven't quite grokked yet is the rendering/sink stuff. Btw, it would be very helpful to have this:
//! WIP overview of how this all fits together
I took a swing at filling this in, but I'm not convinced that what I wrote is particularly illuminating. Unless we can improve it, I almost worry that the natural rot will outweigh the benefits of having it.
How
ct_input
gets plumbed down to the sink. Threading it throughSinkRender
feels bad.I assume you mean
ct_times
? Not a full answer, but I have been thinking that it would be nice if theoks
/errs
input of the sink was already "rounded" to the input times correctly. I imagine with thect_times
logic removed the sink would look a lot like a regular persist sink, and we could maybe even reuse the existing one. Though it's very possible I'm missing something since I haven't studied the sink implementation closely yet.We'd need another dataflow operator before the sink that performs the rounding. That could go inside
SinkRender
or anywhere before it, in principle. Though we only have precedence for putting it insideSinkRender
(apply_refresh
in the MV sink), so that doesn't help a lot if the goal is to keep theSinkRender
interface clean.
Ah yup, the name of that changed several times in my branch cleanups. Conveniently, the pre-rounding I think is exactly equivalent to my "this should all work data-parallel TODO", so I think we're on the same page.
Here is a more helpful answer: How about we get rid of the
SinkRender
trait? At least in compute I don't think it provides anything useful, just adds a bunch of boilerplate and various parameters that only apply to some of the sink implementations.
Wfm! Though I'll do this one as a followup too.
IMO it's fine to merge with just
::MaterializedView
for now, but before we enable this in Prod, or even widely in staging, I would really want to make a::ContinualTask
variant.
Yup, that's the plan for sure. I made there there's a TODO(ct) to cover this.
impl Staged
? I think this has something to do with blocking the coord loop and is probably fine to punt to a TODO(ct)Exactly, and totally fine to punt for now!
👍
LocalId/CTE hack for resolving self-references.
Need to understand Continual Tasks and
LocalId
s a bit better, but I agree we should find a fix for this. Fine with merging as-is for now since AFAICT we're not durably recording theseLocalId
s anywhere
We talked about this offline and current thinking is that we might be able to use purification to contain the hack to just name resolution.
Better way to inject something for the optimizer to resolve the id to than this CatalogEntry hack?
I don't think so! What you have currently is the only way I can think of, the Compute folks might have an alternate way though
Talked about this too. Some possibilities are modeling this like temporary_schemas
or unresolvable_ids
. Another option that Jan mentioned this morning is for the optimizer to take a more limited trait of what it needs from the catalog.
I don't love
plan_ct_query
What don't you love about it? IMO it looked relatively straight forward, although de-duping with
plan_root_query
would be nice
The duping is what I don't love :). I guess my issue is there's many ways we could de-dup it, but I don't have the context to have an option on which one is best.
Some("CONTINUAL") => { | ||
assert_eq!(tokens.next(), Some("TASK")); | ||
// TODO(ct): CatalogItemType::ContinualTask | ||
CatalogItemType::MaterializedView | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, that's the plan for sure. The TODO(ct) here should cover this.
// TODO(ct): Figure out how to make this survive restarts. The | ||
// expr we saved still had the LocalId placeholders for the | ||
// output, but we don't have access to the real Id here. | ||
let optimized_expr = OptimizedMirRelationExpr::declare_optimized( | ||
mz_expr::MirRelationExpr::constant(Vec::new(), desc.typ().clone()), | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, because they're self-referential. We talked about this offline, too, and feels like there's indeed some path to saving everything with ids filled in, which would make this work out of the box.
pub fn hack_add_ct(&mut self, id: GlobalId, entry: CatalogEntry) { | ||
self.state.entry_by_id.insert(id, entry); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup and done!
|
||
// TODO(ct): Big oof. Dedup a bunch of this with MVs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All this makes sense on the surface, but I haven't looked at all the details enough to have any particular opinion yet beyond "feels like at least some de-duplication is possible". It also probably depends on whether we give CTs their own Optimizer and also how the impl Staged
stuff shakes out.
owner_id: *session.current_role_id(), | ||
privileges: PrivilegeMap::new(), | ||
}; | ||
catalog_mut.hack_add_ct(sink_id, fake_entry); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something I discussed with @antiguru recently: It probably isn't necessary to pass the entire catalog to the optimizer. We do it because it is convenient, but it is one of the reasons why we can't move the optimizer into a separate crate yet.
If we'd instead pass a
OptimizerCollectionContext
(idk, naming is hard) that only contains the catalog parts actually required, we also wouldn't need this hack and could just add the output collection to the context too.
I might have missed something but I just quickly checked and it seems like Optimizer just gets the CatalogState
from this to hand to DataflowBuilder
and then DataflowBuilder
only uses get_entry
, resolve_full_name
, and get_indexes_on
from the CatalogState
. If that's true, this would be an easy-ish refactor? (Definitely something for a followup, even if it's that easy.)
Another way to avoid this hack, I think: Have a special optimizer for continual tasks (
optimize::continual_task::Optimizer
; we should probably do that anyway for consistency) and make it aware of the fact the the output collection can be an input too.
I'm truly still undecided on what my opinion for this one even is. On one hand, yeah separate feature => separate optimizer, seems obvious. On the other hand, the optimization part of CTs is basically identical to MVs and I worry that going down this road will prevent us from reusing a bunch of existing infra, in particular stuff like EXPLAIN
src/sql/src/normalize.rs
Outdated
// WIP do we need to normalize columns and input? | ||
columns: _, | ||
input: _, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
let mut stmts = Vec::new(); | ||
let mut expecting_statement_delimiter = false; | ||
self.expect_token(&Token::LParen)?; | ||
// TODO(ct): Dedup this with parse_statements? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was, but I don't recall. Maaaaayyyybe because the exit condition was self.peek_token().is_none()
instead of self.consume_token(&Token::RParen)
?
Gonna take this opportunity to rebase in a more recent main and force-push. I usually try to hold off on that for longer because it breaks github reviews, but flipping branches across the python deps change is melting my laptop. As per my usual though, all changes have been pushed as append-only commits so should be easy to see what's changed since the last time each of you looked at this. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My naive understanding was that sequencer/inner
was more or less meant for impl Staged
. If impl Staged
is a long punt should this be moved to sequencer/create_continual_task.rs
?
I found a way to make the current implementation panic! CREATE TABLE t (a int);
CREATE CONTINUAL TASK test (a int) ON INPUT t as (
INSERT INTO test SELECT 1
); Panics with:
I think the issue is that the constructed dataflow does not read from the input |
let sink_write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum()))); | ||
collection.sink_write_frontier = Some(Rc::clone(&sink_write_frontier)); | ||
|
||
// TODO(ct): Obey `compute_state.read_only_rx` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we just going to skip all appends in read-only mode? I guess that won't work because that can get you into situations where updates are missing for some times, e.g.:
- Old env appends at time
T
. - New env (read-only) already has data for time
T + 1
but skips the append because of read-only mode. - New env becomes read-write, old env goes away.
- New env appends at time
T + 2
, leaving the output empty for timeT + 1
.
Does the read-only env need to tail the output shard and keep historical updates around until it sees that the output frontier advances beyond their times?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that sounds roughly correct. I think it practice it just means that the impl will skip any writes that come out of process, and that'll handle keeping around the necessary detail. Would be good to learn about output progress caused by other processes and update state.output_progress.
That's trivial (WriteHandle::shared_upper) until we do the TODO to split the persist_sink bits out of this operator and make it non-async. Though maybe when we split the persist_sink out, we just get this for "free"? It might just work out that this operator can send along the writes and they get buffered by the sink (dropping any that the shard passes) until it is allowed to write.
Added a bit of detail to the TODO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Though maybe when we split the persist_sink out, we just get this for "free"? It might just work out that this operator can send along the writes and they get buffered by the sink (dropping any that the shard passes) until it is allowed to write.
We're talking about the storage persist_sink, not the self-correcting one, right? Does that already have a way to observe the frontier of the output shard without doing appends itself?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't yet figured out if I think we can/should reuse the storage persist_sink out of the box, or take this as an opportunity to finally extract some common bits and make a shard_sink
that lives in src/persist-client. But worst case, yeah, WriteHandle::shared_upper
gets you the pubsub-updated latest upper of the shard (i.e. no crdb traffic) and WriteHandle::wait_for_upper_past
can be used to find out when that upper changes (no crdb traffic in the common case of an upper advancing once per second)
// We can also advance the output upper up to the write_ts if it's not | ||
// there already. | ||
if self.output_progress.less_than(write_ts) { | ||
return Some((Antichain::from_elem(write_ts.clone()), Vec::new())); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the reason for doing this instead of appending the data for the write_ts
immediately?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good callout. Answered in the comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That reasoning makes sense, though I'm still wondering why this approach wouldn't work:
if self.to_append_progress.less_equal(write_ts) {
// Don't have all the necessary data yet.
if self.output_progress.less_than(write_ts) {
// We can advance the output upper up to the write_ts.
// For self-referential CTs this might be necessary to ensure dataflow progress.
return Some((Antichain::from_elem(write_ts.clone()), Vec::new()));
}
return None;
}
// Time to write some data!
[...]
I.e., write the data immediately if we can't and only do the output frontier bumping if we can't yet. Probably doesn't matter too much for performance, but I also find this logic more intuitive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, yeah, that's also fine. I double-checked with the unit tests and sqllogictests. I find both versions equally readable, so switched to yours.
} | ||
// TODO(ct): Metrics for vec len and cap. | ||
consolidate_updates(&mut self.to_append); | ||
// WIP resize the vec down if cap >> len? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be possible to use ConsolidatingVec
to that end. That would also solve another issue I think we have now, namely that to_append
is never consolidated unless new updates arrive in the input.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be possible to use ConsolidatingVec to that end.
Looks like no 😞. ConsolidatingVec has already erased the timestamps, but this needs to keep them around so it can do the filter vs write_ts
below.
Oh, Correction
might be almost exactly what we need though!
That would also solve another issue I think we have now, namely that to_append is never consolidated unless new updates arrive in the input.
Yeah, the ideal behavior here is to consolidate back down to empty at every write_ts+1
, probably keeping some minimum cap in the alloc to prevent thrashing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ConsolidatingVec has already erased the timestamps, but this needs to keep them around so it can do the filter vs write_ts below.
Oh, I was thinking you could use a ConsolidatingVec<(Row, Timestamp)>
. Would that not work?
Oh, Correction might be almost exactly what we need though!
Also good, though note that the MV persist sink will hopefully replace that with an implementation that can spill to disk. I'm not sure if that would also be suitable for the CT sink, it might be unnecessarily complex. But we can of course always copy the current implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh clever, ConsolidatingVec<(Row, Timestamp)>
should work AFAICT. I tried quickly typing it up and it passes the unit tests and also the sqllogictests, so I think we have our answer. I shared your concerns about Correction
.
Hooking this up requires exposing bits of ConsolidatingVec
in a way that I'm not comfortable sneaking into a PR that's already stamped, so I'll leave the switch for a followup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Chatted with @danhhz and we have a path forward on all of the TODO(ct)
s in the Adapter code, so I'm happy with merging as-is and iterating from there!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No concerns from me about merging this behind a feature flag, provided the Nightlies agree!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found a way to make the current implementation panic!
Good find, added a TODO(ct) for it in ct_errors.slt. Plenty of ways to make this panic right now. As I mentioned earlier today, one of the benefits of our rollout plan here is that for the first few milestones, we decide entirely what is typed and we can type only things that we know to work :). Another way (until this latest revision) is/was
CREATE TABLE foo (key INT, val INT);
CREATE CONTINUAL TASK bar (key STRING, val STRING) ON INPUT foo AS (
INSERT INTO bar SELECT * FROM foo;
);
And another that I discovered quite by accident is something like this (the source would normally be monotonic but it's not when used as a CT input)
CREATE SOURCE append_only FROM LOAD GENERATOR KEY VALUE (
KEYS 10,
VALUE SIZE 10,
BATCH SIZE 1,
PARTITIONS 10,
TICK INTERVAL '1s',
SNAPSHOT ROUNDS 1,
SEED 0
) INCLUDE OFFSET;
CREATE CONTINUAL TASK upsert (key UINT8, val UINT8) ON INPUT append_only AS (
DELETE FROM upsert WHERE key IN (SELECT partition FROM append_only);
INSERT INTO upsert SELECT partition, max(a.offset) FROM append_only a GROUP BY partition;
)
let sink_write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum()))); | ||
collection.sink_write_frontier = Some(Rc::clone(&sink_write_frontier)); | ||
|
||
// TODO(ct): Obey `compute_state.read_only_rx` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that sounds roughly correct. I think it practice it just means that the impl will skip any writes that come out of process, and that'll handle keeping around the necessary detail. Would be good to learn about output progress caused by other processes and update state.output_progress.
That's trivial (WriteHandle::shared_upper) until we do the TODO to split the persist_sink bits out of this operator and make it non-async. Though maybe when we split the persist_sink out, we just get this for "free"? It might just work out that this operator can send along the writes and they get buffered by the sink (dropping any that the shard passes) until it is allowed to write.
Added a bit of detail to the TODO
// We can also advance the output upper up to the write_ts if it's not | ||
// there already. | ||
if self.output_progress.less_than(write_ts) { | ||
return Some((Antichain::from_elem(write_ts.clone()), Vec::new())); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good callout. Answered in the comment
owner_id: *session.current_role_id(), | ||
privileges: PrivilegeMap::new(), | ||
}; | ||
catalog_mut.hack_add_ct(sink_id, fake_entry); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I didn't consider the EXPLAIN stuff. I wonder if it's not possible to make that generic over the concrete optimizer type. We already have the Optimize trait and at a glance that should be sufficient for this purpose.
Oh cool, good to know. It's possible this all works out, I haven't yet finished thinking specifically about it. Was just talking out why I haven't immediately jumped to forking out a CT Optimizer
I think I've removed all WIPs, so gonna pull this out of draft and kick off a nightlies. Pretty sure I missed at least one or two of the review threads, so will circle back on those |
Okay, I think I've addressed all the review comment threads. Lemme know if I've missed anything! |
I'm not 100% clear on what the intended semantics are meant to be for this. Should I expect I've have not read through every discussion yet, so apologies if this is already answered. |
Probably best to start with https://www.notion.so/materialize/Continual-Tasks-via-Diffs-a9c6890799014f67b3cd73e858c98900#813713b29d8d435398303d17de45e26b and then I'm happy to hash out any questions you have that aren't answered there (I think this specific one is) |
Nightlies look happy. Moritz mentioned in zoom that he'd looked over the basic structure and was okay with his full review being addressed post-merge. Since he's out today and on a plane on Friday, I think I'm going to take him up on that offer and get this merged to establish a foothold for CTs. TFTRs all! \o/ |
I guess before answering this, did you add me as a reviewer intentionally or did I get added automatically and Parker's review is sufficient? |
Oh, you got added automatically :). Parker already agreed to be the Adapter reviewer for CT work |
Strawman because: - I personally find it much easier to start with a crappy thing and incrementally improve it than to iteration on a huge branch forever. - Allows for more easily collaborating on the remaining work. - Also to build excitement internally! A continual task presents as something like a `BEFORE TRIGGER`: it watches some _input_ and whenever it changes at time `T`, executes a SQL txn, writing to some _output_ at the same time `T`. It can also read anything in materialize as a _reference_, most notably including the output. Only reacting to new inputs (and not the full history) makes a CT's rehydration time independent of the size of the inputs (NB this is not true for references), enabling things like writing UPSERT on top of an append-only shard in SQL (ignore the obvious bug with my upsert impl): ```sql CREATE CONTINUAL TASK upsert (key INT, val INT) ON INPUT append_only AS ( DELETE FROM upsert WHERE key IN (SELECT key FROM append_only); INSERT INTO upsert SELECT key, max(val) FROM append_only GROUP BY key; ) ``` Unlike a materialized view, the continual task does not update outputs if references later change. This enables things like auditing: ```sql CREATE CONTINUAL TASK audit_log (count INT8) ON INPUT anomalies AS ( INSERT INTO audit_log SELECT * FROM anomalies; ) ``` As mentioned above, this is in no way the final form of CTs. There's lots of big open questions left on what the feature should look like as presented to users. However, we'll start shipping it by exposing incrementally less limited (and more powerful) surface areas publicly: e.g. perhaps a RETENTION WINDOW on sources.
Strawman because:
incrementally improve it than to iterate on a huge branch forever.
A continual task presents as something like a
BEFORE TRIGGER
: itwatches some input and whenever it changes at time
T
, executes a SQLtxn, writing to some output at the same time
T
. It can also readanything in materialize as a reference, most notably including the
output.
Only reacting to new inputs (and not the full history) makes a CT's
rehydration time independent of the size of the inputs (NB this is not
true for references), enabling things like writing UPSERT on top of an
append-only shard in SQL (ignore the obvious bug with my upsert impl):
Unlike a materialized view, the continual task does not update outputs
if references later change. This enables things like auditing:
As mentioned above, this is in no way the final form of CTs. There's
lots of big open questions left on what the feature should look like as
presented to users. However, we'll start shipping it by exposing
incrementally less limited (and more powerful) surface areas publicly:
e.g. perhaps a RETENTION WINDOW on sources.
Touches MaterializeInc/database-issues#8427
Motivation
Tips for reviewer
The goal for this PR is to get something reasonable merged with minimal added complexity to prod codepaths.
SQL Council and Team Testing, probably too early for y'all to do much here, but I promise I'll loop each of you in well before this gets anywhere remotely near prod.
My general convention is that anything marked
TODO
is something intended for later andWIP
is something intended to be addressed before merging. In general, I'll probably lean toward leaving some of the feedback as TODOs. However, as this is likely the only time that the plumbing bits will get a close read, please point out whatever you see. I've also recently adopted a convention of usingTODO(project)
instead of a bareTODO
for things that I think we'll want to fix before some milestone X. It's then an easy git grep to during each milestone to triage which ones need to be fixed now vs punted, and this system worked quite well for txn-wal.The first commit has just the parser stuff, since it was nicely separable. Then the second commit has only the boilerplate-y plumbing. Most of the real meat of the PR is in the third commit. I initially tried to break this last one up into a few different commits, but it ended up not really worth it.
Moritz, Jan, Parker, in addition to your normal code review feedback, here's a list of the things I'd love opinions from you on before merging this:
ct_input
gets plumbed down to the sink. Threading it throughSinkRender
feels bad.CatalogItem{,Type}::MaterializedView
or make::ContinualTask
from the start? I believe this one is a large amount of boilerplate.impl Staged
? I think this has something to do with blocking the coord loop and is probably fine to punt to a TODO(ct)ComputeSinkConnection
equivalent toSourceInstanceDesc
?plan_ct_query
Checklist
$T ⇔ Proto$T
mapping (possibly in a backwards-incompatible way), then it is tagged with aT-proto
label.