Skip to content

Commit

Permalink
[restatectl] Introduces replicated-loglet top-level command
Browse files Browse the repository at this point in the history
Currently this only implements a very basic `info`. Note that the command is built merely as an example, it's UI/UX and behaviour will change by follow-ups.

Pass by stringified log_id (`<log_id>_<segment>`)
```
cargo run -q --bin restatectl -- replicated-loglet info 1_0                                                                              ✘ 1
Log Configuration (v1)
Loglet Referenced in:
    - LogId=1, Segment=0

Loglet Parameters:
    Loglet Id: 1_0 (4294967296)
    Sequencer: N1:1
    Replication: {node: 2}
    Node Set: [N3, N2, N1]
```

Or raw loglet id (`<loglet_id>`)
```
cargo run -q --bin restatectl -- replicated-loglet info 1_0                                                                              ✘ 1
Log Configuration (v1)
Loglet Referenced in:
    - LogId=1, Segment=0

Loglet Parameters:
    Loglet Id: 1_0 (4294967296)
    Sequencer: N1:1
    Replication: {node: 2}
    Node Set: [N3, N2, N1]

```
  • Loading branch information
AhmedSoliman committed Nov 7, 2024
1 parent 58afe5c commit f063825
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 0 deletions.
20 changes: 20 additions & 0 deletions crates/types/src/replicated_loglet/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

use std::collections::HashSet;
use std::fmt::{Display, Formatter};
use std::str::FromStr;

use super::ReplicationProperty;
use crate::logs::metadata::SegmentIndex;
Expand Down Expand Up @@ -109,6 +110,25 @@ impl Display for ReplicatedLogletId {
}
}

impl FromStr for ReplicatedLogletId {
type Err = <u64 as FromStr>::Err;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if s.contains('_') {
let parts: Vec<&str> = s.split('_').collect();
let log_id: u32 = parts[0].parse()?;
let segment_index: u32 = parts[1].parse()?;
Ok(ReplicatedLogletId::new(
LogId::from(log_id),
SegmentIndex::from(segment_index),
))
} else {
// treat the string as raw replicated log-id
let id: u64 = s.parse()?;
Ok(ReplicatedLogletId(id))
}
}
}

#[serde_with::serde_as]
#[derive(
serde::Serialize,
Expand Down
4 changes: 4 additions & 0 deletions tools/restatectl/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::commands::log::Logs;
use crate::commands::metadata::Metadata;
use crate::commands::node::Nodes;
use crate::commands::partition::Partitions;
use crate::commands::replicated_loglet::ReplicatedLoglet;
use crate::commands::snapshot::Snapshot;

#[derive(Run, Parser, Clone)]
Expand Down Expand Up @@ -67,6 +68,9 @@ pub enum Command {
/// Partition processor snapshots
#[clap(subcommand)]
Snapshots(Snapshot),
/// Commands that operate on replicated loglets
#[clap(subcommand)]
ReplicatedLoglet(ReplicatedLoglet),
}

fn init(common_opts: &CommonOpts) {
Expand Down
1 change: 1 addition & 0 deletions tools/restatectl/src/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ pub mod log;
pub mod metadata;
pub mod node;
pub mod partition;
pub mod replicated_loglet;
pub mod snapshot;
73 changes: 73 additions & 0 deletions tools/restatectl/src/commands/replicated_loglet/info.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use anyhow::Context;
use cling::prelude::*;

use restate_admin::cluster_controller::protobuf::cluster_ctrl_svc_client::ClusterCtrlSvcClient;
use restate_admin::cluster_controller::protobuf::ListLogsRequest;
use restate_cli_util::{c_indentln, c_println};
use restate_types::logs::metadata::Logs;
use restate_types::replicated_loglet::ReplicatedLogletId;
use restate_types::storage::StorageCodec;
use restate_types::Versioned;
use tonic::codec::CompressionEncoding;

use crate::app::ConnectionInfo;
use crate::util::grpc_connect;

#[derive(Run, Parser, Collect, Clone, Debug)]
#[cling(run = "get_info")]
pub struct InfoOpts {
/// The replicated loglet id
loglet_id: ReplicatedLogletId,
}

async fn get_info(connection: &ConnectionInfo, opts: &InfoOpts) -> anyhow::Result<()> {
let channel = grpc_connect(connection.cluster_controller.clone())
.await
.with_context(|| {
format!(
"cannot connect to cluster controller at {}",
connection.cluster_controller
)
})?;
let mut client =
ClusterCtrlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip);

let req = ListLogsRequest::default();
let response = client.list_logs(req).await?.into_inner();

let mut buf = response.logs;
let logs = StorageCodec::decode::<Logs, _>(&mut buf)?;
c_println!("Log Configuration ({})", logs.version());

let Some(loglet_ref) = logs.get_replicated_loglet(&opts.loglet_id) else {
return Err(anyhow::anyhow!("loglet {} not found", opts.loglet_id));
};

c_println!("Loglet Referenced in: ");
for (log_id, segment) in &loglet_ref.references {
c_indentln!(2, "- LogId={}, Segment={}", log_id, segment);
}
c_println!();
c_println!("Loglet Parameters:");
c_indentln!(
2,
"Loglet Id: {} ({})",
loglet_ref.params.loglet_id,
*loglet_ref.params.loglet_id
);
c_indentln!(2, "Sequencer: {}", loglet_ref.params.sequencer);
c_indentln!(2, "Replication: {}", loglet_ref.params.replication);
c_indentln!(2, "Node Set: {}", loglet_ref.params.nodeset);

Ok(())
}
19 changes: 19 additions & 0 deletions tools/restatectl/src/commands/replicated_loglet/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

mod info;

use cling::prelude::*;

#[derive(Run, Subcommand, Clone)]
pub enum ReplicatedLoglet {
/// View loglet info
Info(info::InfoOpts),
}

0 comments on commit f063825

Please sign in to comment.