Skip to content

Commit

Permalink
feat: add deduplicate opt for create topic cli
Browse files Browse the repository at this point in the history
  • Loading branch information
fraidev committed Feb 11, 2025
1 parent d376949 commit ab6efe0
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 6 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 45 additions & 0 deletions crates/fluvio-cli/src/client/topic/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,18 @@ use fluvio_sc_schema::shared::validate_resource_name;
use fluvio_sc_schema::mirror::MirrorSpec;
use fluvio_sc_schema::topic::HomeMirrorConfig;
use fluvio_sc_schema::topic::MirrorConfig;
use fluvio_sc_schema::topic::Bounds;
use fluvio_sc_schema::topic::Deduplication;
use fluvio_sc_schema::topic::Filter;
use fluvio_sc_schema::topic::Transform;

use fluvio::Fluvio;
use fluvio::FluvioAdmin;
use fluvio::metadata::topic::TopicSpec;
use crate::CliError;

const DEFAULT_DEDUP_FILTER: &str = "fluvio/[email protected]";

#[derive(Debug, Parser)]
pub struct CreateTopicOpt {
/// The name of the Topic to create
Expand Down Expand Up @@ -229,6 +235,12 @@ impl CreateTopicOpt {
topic_spec.set_compression_type(compression_type);
}

if self.setting.dedup {
let deduplication =
create_deduplication(self.setting.dedup_count, Some(self.setting.dedup_age));
topic_spec.set_deduplication(Some(deduplication));
}

topic_spec.set_system(self.setting.system);

if self.setting.segment_size.is_some() || self.setting.max_partition_size.is_some() {
Expand Down Expand Up @@ -258,6 +270,21 @@ fn validate(name: &str, _spec: &TopicSpec) -> Result<()> {
Ok(())
}

fn create_deduplication(dedup_count: u64, dedup_age: Option<Duration>) -> Deduplication {
Deduplication {
bounds: Bounds {
count: dedup_count,
age: dedup_age,
},
filter: Filter {
transform: Transform {
uses: DEFAULT_DEDUP_FILTER.to_string(),
with: Default::default(),
},
},
}
}

#[derive(Debug, Parser)]
#[group(id = "config-arg")]
pub struct TopicConfigOpt {
Expand All @@ -280,6 +307,24 @@ pub struct TopicConfigOpt {
#[arg(long, value_name = "bytes")]
max_partition_size: Option<bytesize::ByteSize>,

/// Deduplicate records in the topic
#[arg(long)]
dedup: bool,

/// Number of records to keep in deduplication filter
#[arg(
long,
value_name = "integer",
requires = "deduplicate",
default_value = "5"
)]
dedup_count: u64,

/// Age of records to keep in deduplication filter
/// Ex: '1h', '2d 10s', '7 days' (default)
#[arg(long, value_name = "time", value_parser=parse_duration, requires = "deduplicate", default_value = "5s")]
dedup_age: Duration,

/// Flag to create a system topic
/// System topics are for internal operations
#[arg(long, short = 's', hide = true)]
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-controlplane-metadata/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "fluvio-controlplane-metadata"
edition = "2021"
version = "0.30.1"
version = "0.30.2"
authors = ["Fluvio Contributors <[email protected]>"]
description = "Metadata definition for Fluvio control plane"
repository = "https://github.com/infinyon/fluvio"
Expand Down
4 changes: 2 additions & 2 deletions crates/fluvio-controlplane-metadata/src/topic/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ deduplication:
age: 1m
filter:
transform:
uses: infinyon/[email protected]
uses: fluvio/dedup-bloom[email protected]
"#;

//when
Expand Down Expand Up @@ -373,7 +373,7 @@ compression:
},
filter: Filter {
transform: Transform {
uses: "infinyon/[email protected]".to_string(),
uses: "fluvio/dedup-bloom[email protected]".to_string(),
with: Default::default(),
},
},
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-sc/src/services/public_api/topic/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ async fn validate_topic_request<C: MetadataItem>(
sm_name.to_string(),
ErrorCode::DeduplicationSmartModuleNotLoaded,
Some(format!(
"{}\nHint: try `fluvio hub download {sm_name}` and repeat this operation",
"{}\nHint: try `fluvio hub sm download {sm_name}` and repeat this operation",
ErrorCode::DeduplicationSmartModuleNotLoaded
)),
);
Expand Down
2 changes: 1 addition & 1 deletion tests/cli/fluvio_smoke_tests/topic-basic.bats
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ setup_file() {
TOPIC_NAME_SYSTEM=$(random_string)
export TOPIC_NAME_SYSTEM

DEDUP_FILTER_NAME="dedup-filter"
DEDUP_FILTER_NAME="dedup-bloom-filter"
export DEDUP_FILTER_NAME

cat <<EOF >$TOPIC_CONFIG_PATH
Expand Down

0 comments on commit ab6efe0

Please sign in to comment.