-
Notifications
You must be signed in to change notification settings - Fork 35
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Create snapshots on leading Partition Processors #2253
base: main
Are you sure you want to change the base?
Conversation
@@ -255,6 +256,7 @@ pub struct PartitionProcessor<Codec, InvokerSender> { | |||
max_command_batch_size: usize, | |||
partition_store: PartitionStore, | |||
inflight_create_snapshot_task: Option<TaskHandle<anyhow::Result<()>>>, | |||
last_snapshot_lsn_watch: (watch::Sender<Option<Lsn>>, watch::Receiver<Option<Lsn>>), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a better way to do this? I want to update the LSN from a background task spawned to manage the snapshot creation (and soon, upload to blob store).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels to me that the entire snapshotting mechanism should be external to the partition processor. The only two things that PP needs to know:
1- On start, which archived LSN (if known) it was started from. The caller (PPM) would decide on this value based on whether it has downloaded a snapshot or not (albeit, PP will need to update its FSM in this case, and it's subject to signal loss if node crashed before rocksdb flush)
2- After a snapshot is uploaded, PPM would inform PP that the last archived is updated to a newer lsn
.
crates/types/src/config/worker.rs
Outdated
/// When using the replicated loglet in a distributed cluster, snapshots provide a mechanism for | ||
/// safely trimming the log and for bootstrapping new worker nodes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be confusing to mention the replicated loglet in this context. This feature is independent from replicated-loglet in principle. We'd be able to trim the log if we can take snapshots in a cluster setup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great point, udpated!
crates/types/src/config/worker.rs
Outdated
/// snapshots for a given partition. | ||
/// | ||
/// Default: `None` | ||
pub records_per_snapshot: Option<u64>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This reads as if the snapshot "contains records". I would prefer if we describe this in terms of a trigger for new snapshot every N log records.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha, will come up with something better!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to snapshot_interval_num_records
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for creating this PR @pcholakov. I am wondering whether we can move the snapshot creation logic completely to the PPM
to free the PP
from the burden of thinking about snapshots?
@@ -69,6 +71,16 @@ impl SnapshotProducer { | |||
"Partition snapshot written" | |||
); | |||
|
|||
let previous_archived_snapshot_lsn = partition_store.get_archived_lsn().await?; | |||
let mut tx = partition_store.transaction(); | |||
tx.put_archived_lsn(snapshot.min_applied_lsn).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume that you are relying on the PartitionProcessor
that there is no concurrency between SnapshotProducers
. Otherwise there is the risk that an older snapshot overwrites a newer one here because there can now be multiple writers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that partition_store.transaction()
only gives you transaction like guarantees if there is a single writer. Concurrent writers can overwrite changes from each other w/o the system noticing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, indeed, that is the case. I'll have to think about this a bit more carefully when we move this responsibility up into the PartitionProcessorManager
.
// ignore errors and don't request further snapshots if internal queue is full; we will try again later | ||
if self | ||
.tx | ||
.try_send(ProcessorsManagerCommand::CreateSnapshot(*partition_id, tx)) | ||
.is_err() | ||
{ | ||
break; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we going through the PartitionProcessor
to create the snapshot? Could this be handled completely outside of the PartitionProcessor
? That way, it happens completely asynchronous to the partition processor event loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No strong reason to do so immediately, though two weak intuitions lead me to think that ParitionProcessor
is the better owner of this responsibility than the PartitionProcessorManager
:
- PP is the logical owner of the partition's
PartitionStore
once running. Sure, we can clone the handle and pass references around, but I feel that I want to keep interactions with it mostly clustered in thePartitionProcessor
. To my eyes this is different to restoring a snapshot on bootstrap, which happens before we start thePartitionProcessor
(hence why that responsibility currently resides inPartitionStoreManager::restore_partition_store_snapshot
) - at some point we might want to create snapshots from a deterministic LSN by sending a
CreateSnapshot
command through the log, andPartitionProcessor
is the place to handle that
To be sure, neither of these is a very strong argument - and I'm not attached to it remaining here! It's a pretty easy change to make given that the snapshot logic mostly lives in SnapshotProducer
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding creating snapshots for deterministic LSNs, I think we need a few more things (a) being able to create a snapshot for a given LSN and (b) probably the globally total ordered log to be able to give a LSN a global meaning. Given that it's not very clear when these things are going to happen, I wouldn't over index on it too much right now. Moreover, it could also be solved differently by a combination of snapshots and a log segment that brings all nodes to a specific LSN.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After seeing how the snapshot restore/bootstrap process works, I'm now much more convinced that ownership of snapshotting overall resides with the PartitionProcessorManager
. Let's keep the PP state machine as pure and focused as possible.
.add(Lsn::from(records_per_snapshot)) | ||
{ | ||
debug!(%partition_id, "Triggering snapshot on partition leader after records_per_snapshot exceeded (last snapshot: {}, current LSN: {})", status.last_archived_log_lsn.unwrap_or(Lsn::OLDEST), status.last_applied_log_lsn.unwrap_or(Lsn::INVALID)); | ||
let (tx, _) = oneshot::channel(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we are not interested in the result of the CreateSnapshot
call here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No - and I possibly should have left a comment explaining why :-) (which is essentially that we can't do much other than log it, and that already happens in the SnapshotProducer
)
_ = latest_snapshot_check_interval.tick() => { | ||
self.request_partition_snapshots(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I assume we could also react to status updates to trigger the check instead of a time based policy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would be nice, I'll consider this when I move owning this into the PartitionProcessorManager
.
Interesting, both you and @tillrohrmann picked up on this. My intuition was exactly the opposite: #2253 (comment). I'm happy to go with the majority view but LMK if you're swayed by those arguments :-)
I'll revisit this when I start work on bootstrap again; right now the FSM variable is implicitly loaded from the snapshot and left untouched, so we have exactly the behavior you describe. Explicit >> implicit though, so I'll make sure it's clear what's happening.
That makes sense if we move the snapshotting responsibility to PPM. (Or more accurately, have the PPM own the Thanks for the feedback, @AhmedSoliman! |
3889982
to
1d97374
Compare
Apologies for the force-push, it was easier to rebase on main than merge.
I'm convinced, but let me do that as a separate follow-up PR to keep this focused on just the config change, please. |
A new configurable snapshot interval is added, expressed as a minimum number of records after which a snapshot of the partition store will be automatically taken.
1d97374
to
17e437f
Compare
Introduce a new configurable number of records property after which a snapshot of the partition store will be automatically taken.
This change introduces the following new config section (default is none, sample value to illustrate):
Testing with a local test cluster with 4 partitions across 3 worker nodes:
Closes #2246