Skip to content

Commit

Permalink
MINOR: cleanup KStream JavaDocs (3/N) - groupBy[Key] (#18705)
Browse files Browse the repository at this point in the history
Reviewers: Alieh Saeedi <[email protected]>, Lucas Brutschy <[email protected]>
  • Loading branch information
mjsax authored Jan 31, 2025
1 parent 0d1e7e0 commit 281a3c6
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 125 deletions.
151 changes: 46 additions & 105 deletions streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -942,126 +942,67 @@ KTable<K, V> toTable(final Named named,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

/**
* Group the records of this {@code KStream} on a new key that is selected using the provided {@link KeyValueMapper}
* and default serializers and deserializers.
* Group the records by their current key into a {@link KGroupedStream} while preserving the original values.
* {@link KGroupedStream} can be further grouped with other streams to form a {@link CogroupedKStream}.
* Grouping a stream on the record key is required before an aggregation operator can be applied to the data
* (cf. {@link KGroupedStream}).
* The {@link KeyValueMapper} selects a new key (which may or may not be of the same type) while preserving the
* original values.
* If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}
* <p>
* Because a new key is selected, an internal repartitioning topic may need to be created in Kafka if a
* later operator depends on the newly selected key.
* This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
* {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
* (Co-)Grouping a stream on the record key is required before a windowing or aggregation operator can be applied
* to the data (cf. {@link KGroupedStream}).
* By default, the current key is used as grouping key, but a new grouping key can be set via
* {@link #groupBy(KeyValueMapper)}.
* In either case, if the grouping key is {@code null}, the record will be dropped.
*
* <p>If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)},
* {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or
* {@link #process(ProcessorSupplier, String...)}) Kafka Streams will automatically repartition the data, i.e.,
* it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic such that
* the resulting {@link KGroupedStream} is correctly partitioned by the grouping key, before the downstream
* windowing/aggregation will be applied.
*
* <p>This internal repartition topic will be named "${applicationId}-&lt;name&gt;-repartition",
* where "applicationId" is user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
* "&lt;name&gt;" is an internally generated name, and "-repartition" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* <p>
* All data of this stream will be redistributed through the repartitioning topic by writing all records to it,
* and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned on the new key.
* <p>
* This operation is equivalent to calling {@link #selectKey(KeyValueMapper)} followed by {@link #groupByKey()}.
* If the key type is changed, it is recommended to use {@link #groupBy(KeyValueMapper, Grouped)} instead.
* The number of partitions for the repartition topic is determined based on the upstream topics partition numbers.
* Furthermore, the topic will be created with infinite retention time and data will be automatically purged
* by Kafka Streams.
*
* <p>You can retrieve all generated internal topic names via {@link Topology#describe()}.
* To explicitly set key/value serdes or to customize the name of the repartition topic, use {@link #groupByKey(Grouped)}.
* For more control over the repartitioning, use {@link #repartition(Repartitioned)} before {@code groupByKey()}.
*
* @param keySelector a {@link KeyValueMapper} that computes a new key for grouping
* @param <KR> the key type of the result {@link KGroupedStream}
* @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream}
* @return A {@link KGroupedStream} that contains the grouped records of the original {@code KStream}.
*/
<KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> keySelector);
KGroupedStream<K, V> groupByKey();

/**
* Group the records of this {@code KStream} on a new key that is selected using the provided {@link KeyValueMapper}
* and {@link Serde}s as specified by {@link Grouped}.
* {@link KGroupedStream} can be further grouped with other streams to form a {@link CogroupedKStream}.
* Grouping a stream on the record key is required before an aggregation operator can be applied to the data
* (cf. {@link KGroupedStream}).
* The {@link KeyValueMapper} selects a new key (which may or may not be of the same type) while preserving the
* original values.
* If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}.
* <p>
* Because a new key is selected, an internal repartitioning topic may need to be created in Kafka if a later
* operator depends on the newly selected key.
* This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
* {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
* "&lt;name&gt;" is either provided via {@link org.apache.kafka.streams.kstream.Grouped#as(String)} or an
* internally generated name.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* <p>
* All data of this stream will be redistributed through the repartitioning topic by writing all records to it,
* and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned on the new key.
* <p>
* This operation is equivalent to calling {@link #selectKey(KeyValueMapper)} followed by {@link #groupByKey()}.
* See {@link #groupByKey()}.
*
* @param keySelector a {@link KeyValueMapper} that computes a new key for grouping
* @param grouped the {@link Grouped} instance used to specify {@link org.apache.kafka.common.serialization.Serdes}
* and part of the name for a repartition topic if repartitioning is required.
* @param <KR> the key type of the result {@link KGroupedStream}
* @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream}
* <p>Takes an additional {@link Grouped} parameter, that allows to explicitly set key/value serdes or to customize
* the name of the potentially created internal repartition topic.
*/
<KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> keySelector,
final Grouped<KR, V> grouped);
KGroupedStream<K, V> groupByKey(final Grouped<K, V> grouped);

/**
* Group the records by their current key into a {@link KGroupedStream} while preserving the original values
* and default serializers and deserializers.
* {@link KGroupedStream} can be further grouped with other streams to form a {@link CogroupedKStream}.
* Grouping a stream on the record key is required before an aggregation operator can be applied to the data
* (cf. {@link KGroupedStream}).
* If a record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}.
* <p>
* If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)},
* {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or
* {@link #process(ProcessorSupplier, String...)}) an internal repartitioning topic may need to be created in
* Kafka if a later operator depends on the newly selected key.
* This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
* {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
* "&lt;name&gt;" is an internally generated name, and "-repartition" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* <p>
* For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
* records to it, and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned
* correctly on its key.
* If the last key changing operator changed the key type, it is recommended to use
* {@link #groupByKey(org.apache.kafka.streams.kstream.Grouped)} instead.
* Group the records of this {@code KStream} on a new key (in contrast to {@link #groupByKey()}).
* This operation is semantically equivalent to {@link #selectKey(KeyValueMapper)} followed by {@link #groupByKey()}.
*
* @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream}
* @see #groupBy(KeyValueMapper)
* <p>Because a new key is selected, an internal repartitioning topic will be created in Kafka.
* See {@link #groupByKey()} for more details about auto-repartitioning.
*
* @param keySelector
* a {@link KeyValueMapper} that computes a new key for grouping
*
* @param <KOut> the new key type of the result {@link KGroupedStream}
*/
KGroupedStream<K, V> groupByKey();
<KOut> KGroupedStream<KOut, V> groupBy(final KeyValueMapper<? super K, ? super V, KOut> keySelector);

/**
* Group the records by their current key into a {@link KGroupedStream} while preserving the original values
* and using the serializers as defined by {@link Grouped}.
* {@link KGroupedStream} can be further grouped with other streams to form a {@link CogroupedKStream}.
* Grouping a stream on the record key is required before an aggregation operator can be applied to the data
* (cf. {@link KGroupedStream}).
* If a record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}.
* <p>
* If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)},
* {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or
* {@link #process(ProcessorSupplier, String...)}) an internal repartitioning topic may need to be created in
* Kafka if a later operator depends on the newly selected key.
* This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
* {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
* &lt;name&gt; is either provided via {@link org.apache.kafka.streams.kstream.Grouped#as(String)} or an internally
* generated name, and "-repartition" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* <p>
* For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
* records to it, and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned
* correctly on its key.
* See {@link #groupBy(KeyValueMapper)}.
*
* @param grouped the {@link Grouped} instance used to specify {@link Serdes}
* and part of the name for a repartition topic if repartitioning is required.
* @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream}
* @see #groupBy(KeyValueMapper)
* <p>Takes an additional {@link Grouped} parameter, that allows to explicitly set key/value serdes or to customize
* the name of the created internal repartition topic.
*/
KGroupedStream<K, V> groupByKey(final Grouped<K, V> grouped);
<KOut> KGroupedStream<KOut, V> groupBy(final KeyValueMapper<? super K, ? super V, KOut> keySelector,
final Grouped<KOut, V> grouped);

/**
* Join records of this stream with another {@code KStream}'s records using windowed inner equi join with default
Expand Down Expand Up @@ -2283,7 +2224,7 @@ <VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
* {@link ValueJoinerWithKey} will be called to compute a value (with arbitrary type) for the result record.
* The key of the result record is the same as for both joining input records.
* Note that the key is read-only and should not be modified, as this can lead to undefined behaviour.
*
*
* If an {@code KStream} input record key or value is {@code null} the record will not be included in the join
* operation and thus no output record will be added to the resulting {@code KStream}.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -682,48 +682,48 @@ public KTable<K, V> toTable(final Named named,
}

@Override
public <KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> keySelector) {
return groupBy(keySelector, Grouped.with(null, valueSerde));
public KGroupedStream<K, V> groupByKey() {
return groupByKey(Grouped.with(keySerde, valueSerde));
}

@Override
public <KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> keySelector,
final Grouped<KR, V> grouped) {
Objects.requireNonNull(keySelector, "keySelector can't be null");
public KGroupedStream<K, V> groupByKey(final Grouped<K, V> grouped) {
Objects.requireNonNull(grouped, "grouped can't be null");

final GroupedInternal<KR, V> groupedInternal = new GroupedInternal<>(grouped);
final ProcessorGraphNode<K, V> selectKeyMapNode = internalSelectKey(keySelector, new NamedInternal(groupedInternal.name()));
selectKeyMapNode.keyChangingOperation(true);

builder.addGraphNode(graphNode, selectKeyMapNode);
final GroupedInternal<K, V> groupedInternal = new GroupedInternal<>(grouped);

return new KGroupedStreamImpl<>(
selectKeyMapNode.nodeName(),
name,
subTopologySourceNodes,
groupedInternal,
true,
selectKeyMapNode,
repartitionRequired,
graphNode,
builder);
}

@Override
public KGroupedStream<K, V> groupByKey() {
return groupByKey(Grouped.with(keySerde, valueSerde));
public <KOUT> KGroupedStream<KOUT, V> groupBy(final KeyValueMapper<? super K, ? super V, KOUT> keySelector) {
return groupBy(keySelector, Grouped.with(null, valueSerde));
}

@Override
public KGroupedStream<K, V> groupByKey(final Grouped<K, V> grouped) {
public <KOut> KGroupedStream<KOut, V> groupBy(final KeyValueMapper<? super K, ? super V, KOut> keySelector,
final Grouped<KOut, V> grouped) {
Objects.requireNonNull(keySelector, "keySelector can't be null");
Objects.requireNonNull(grouped, "grouped can't be null");

final GroupedInternal<K, V> groupedInternal = new GroupedInternal<>(grouped);
final GroupedInternal<KOut, V> groupedInternal = new GroupedInternal<>(grouped);
final ProcessorGraphNode<K, V> selectKeyMapNode = internalSelectKey(keySelector, new NamedInternal(groupedInternal.name()));
selectKeyMapNode.keyChangingOperation(true);

builder.addGraphNode(graphNode, selectKeyMapNode);

return new KGroupedStreamImpl<>(
name,
selectKeyMapNode.nodeName(),
subTopologySourceNodes,
groupedInternal,
repartitionRequired,
graphNode,
true,
selectKeyMapNode,
builder);
}

Expand Down

0 comments on commit 281a3c6

Please sign in to comment.