diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 77a251b1363b5..8d6c1e33ab764 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -942,126 +942,67 @@ KTable toTable(final Named named, final Materialized> materialized); /** - * Group the records of this {@code KStream} on a new key that is selected using the provided {@link KeyValueMapper} - * and default serializers and deserializers. + * Group the records by their current key into a {@link KGroupedStream} while preserving the original values. * {@link KGroupedStream} can be further grouped with other streams to form a {@link CogroupedKStream}. - * Grouping a stream on the record key is required before an aggregation operator can be applied to the data - * (cf. {@link KGroupedStream}). - * The {@link KeyValueMapper} selects a new key (which may or may not be of the same type) while preserving the - * original values. - * If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedStream} - *

- * Because a new key is selected, an internal repartitioning topic may need to be created in Kafka if a - * later operator depends on the newly selected key. - * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in - * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, + * (Co-)Grouping a stream on the record key is required before a windowing or aggregation operator can be applied + * to the data (cf. {@link KGroupedStream}). + * By default, the current key is used as grouping key, but a new grouping key can be set via + * {@link #groupBy(KeyValueMapper)}. + * In either case, if the grouping key is {@code null}, the record will be dropped. + * + *

If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)}, + * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or + * {@link #process(ProcessorSupplier, String...)}) Kafka Streams will automatically repartition the data, i.e., + * it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic such that + * the resulting {@link KGroupedStream} is correctly partitioned by the grouping key, before the downstream + * windowing/aggregation will be applied. + * + *

This internal repartition topic will be named "${applicationId}-<name>-repartition", + * where "applicationId" is user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, * "<name>" is an internally generated name, and "-repartition" is a fixed suffix. - *

- * You can retrieve all generated internal topic names via {@link Topology#describe()}. - *

- * All data of this stream will be redistributed through the repartitioning topic by writing all records to it, - * and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned on the new key. - *

- * This operation is equivalent to calling {@link #selectKey(KeyValueMapper)} followed by {@link #groupByKey()}. - * If the key type is changed, it is recommended to use {@link #groupBy(KeyValueMapper, Grouped)} instead. + * The number of partitions for the repartition topic is determined based on the upstream topics partition numbers. + * Furthermore, the topic will be created with infinite retention time and data will be automatically purged + * by Kafka Streams. + * + *

You can retrieve all generated internal topic names via {@link Topology#describe()}. + * To explicitly set key/value serdes or to customize the name of the repartition topic, use {@link #groupByKey(Grouped)}. + * For more control over the repartitioning, use {@link #repartition(Repartitioned)} before {@code groupByKey()}. * - * @param keySelector a {@link KeyValueMapper} that computes a new key for grouping - * @param the key type of the result {@link KGroupedStream} - * @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream} + * @return A {@link KGroupedStream} that contains the grouped records of the original {@code KStream}. */ - KGroupedStream groupBy(final KeyValueMapper keySelector); + KGroupedStream groupByKey(); /** - * Group the records of this {@code KStream} on a new key that is selected using the provided {@link KeyValueMapper} - * and {@link Serde}s as specified by {@link Grouped}. - * {@link KGroupedStream} can be further grouped with other streams to form a {@link CogroupedKStream}. - * Grouping a stream on the record key is required before an aggregation operator can be applied to the data - * (cf. {@link KGroupedStream}). - * The {@link KeyValueMapper} selects a new key (which may or may not be of the same type) while preserving the - * original values. - * If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}. - *

- * Because a new key is selected, an internal repartitioning topic may need to be created in Kafka if a later - * operator depends on the newly selected key. - * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in - * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, - * "<name>" is either provided via {@link org.apache.kafka.streams.kstream.Grouped#as(String)} or an - * internally generated name. - *

- * You can retrieve all generated internal topic names via {@link Topology#describe()}. - *

- * All data of this stream will be redistributed through the repartitioning topic by writing all records to it, - * and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned on the new key. - *

- * This operation is equivalent to calling {@link #selectKey(KeyValueMapper)} followed by {@link #groupByKey()}. + * See {@link #groupByKey()}. * - * @param keySelector a {@link KeyValueMapper} that computes a new key for grouping - * @param grouped the {@link Grouped} instance used to specify {@link org.apache.kafka.common.serialization.Serdes} - * and part of the name for a repartition topic if repartitioning is required. - * @param the key type of the result {@link KGroupedStream} - * @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream} + *

Takes an additional {@link Grouped} parameter, that allows to explicitly set key/value serdes or to customize + * the name of the potentially created internal repartition topic. */ - KGroupedStream groupBy(final KeyValueMapper keySelector, - final Grouped grouped); + KGroupedStream groupByKey(final Grouped grouped); /** - * Group the records by their current key into a {@link KGroupedStream} while preserving the original values - * and default serializers and deserializers. - * {@link KGroupedStream} can be further grouped with other streams to form a {@link CogroupedKStream}. - * Grouping a stream on the record key is required before an aggregation operator can be applied to the data - * (cf. {@link KGroupedStream}). - * If a record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}. - *

- * If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)}, - * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or - * {@link #process(ProcessorSupplier, String...)}) an internal repartitioning topic may need to be created in - * Kafka if a later operator depends on the newly selected key. - * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in - * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, - * "<name>" is an internally generated name, and "-repartition" is a fixed suffix. - *

- * You can retrieve all generated internal topic names via {@link Topology#describe()}. - *

- * For this case, all data of this stream will be redistributed through the repartitioning topic by writing all - * records to it, and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned - * correctly on its key. - * If the last key changing operator changed the key type, it is recommended to use - * {@link #groupByKey(org.apache.kafka.streams.kstream.Grouped)} instead. + * Group the records of this {@code KStream} on a new key (in contrast to {@link #groupByKey()}). + * This operation is semantically equivalent to {@link #selectKey(KeyValueMapper)} followed by {@link #groupByKey()}. * - * @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream} - * @see #groupBy(KeyValueMapper) + *

Because a new key is selected, an internal repartitioning topic will be created in Kafka. + * See {@link #groupByKey()} for more details about auto-repartitioning. + * + * @param keySelector + * a {@link KeyValueMapper} that computes a new key for grouping + * + * @param the new key type of the result {@link KGroupedStream} */ - KGroupedStream groupByKey(); + KGroupedStream groupBy(final KeyValueMapper keySelector); /** - * Group the records by their current key into a {@link KGroupedStream} while preserving the original values - * and using the serializers as defined by {@link Grouped}. - * {@link KGroupedStream} can be further grouped with other streams to form a {@link CogroupedKStream}. - * Grouping a stream on the record key is required before an aggregation operator can be applied to the data - * (cf. {@link KGroupedStream}). - * If a record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}. - *

- * If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)}, - * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or - * {@link #process(ProcessorSupplier, String...)}) an internal repartitioning topic may need to be created in - * Kafka if a later operator depends on the newly selected key. - * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in - * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, - * <name> is either provided via {@link org.apache.kafka.streams.kstream.Grouped#as(String)} or an internally - * generated name, and "-repartition" is a fixed suffix. - *

- * You can retrieve all generated internal topic names via {@link Topology#describe()}. - *

- * For this case, all data of this stream will be redistributed through the repartitioning topic by writing all - * records to it, and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned - * correctly on its key. + * See {@link #groupBy(KeyValueMapper)}. * - * @param grouped the {@link Grouped} instance used to specify {@link Serdes} - * and part of the name for a repartition topic if repartitioning is required. - * @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream} - * @see #groupBy(KeyValueMapper) + *

Takes an additional {@link Grouped} parameter, that allows to explicitly set key/value serdes or to customize + * the name of the created internal repartition topic. */ - KGroupedStream groupByKey(final Grouped grouped); + KGroupedStream groupBy(final KeyValueMapper keySelector, + final Grouped grouped); /** * Join records of this stream with another {@code KStream}'s records using windowed inner equi join with default @@ -2283,7 +2224,7 @@ KStream join(final KTable table, * {@link ValueJoinerWithKey} will be called to compute a value (with arbitrary type) for the result record. * The key of the result record is the same as for both joining input records. * Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. - * + * * If an {@code KStream} input record key or value is {@code null} the record will not be included in the join * operation and thus no output record will be added to the resulting {@code KStream}. *

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 931c94c1a4313..35ef36bb81613 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -682,48 +682,48 @@ public KTable toTable(final Named named, } @Override - public KGroupedStream groupBy(final KeyValueMapper keySelector) { - return groupBy(keySelector, Grouped.with(null, valueSerde)); + public KGroupedStream groupByKey() { + return groupByKey(Grouped.with(keySerde, valueSerde)); } @Override - public KGroupedStream groupBy(final KeyValueMapper keySelector, - final Grouped grouped) { - Objects.requireNonNull(keySelector, "keySelector can't be null"); + public KGroupedStream groupByKey(final Grouped grouped) { Objects.requireNonNull(grouped, "grouped can't be null"); - final GroupedInternal groupedInternal = new GroupedInternal<>(grouped); - final ProcessorGraphNode selectKeyMapNode = internalSelectKey(keySelector, new NamedInternal(groupedInternal.name())); - selectKeyMapNode.keyChangingOperation(true); - - builder.addGraphNode(graphNode, selectKeyMapNode); + final GroupedInternal groupedInternal = new GroupedInternal<>(grouped); return new KGroupedStreamImpl<>( - selectKeyMapNode.nodeName(), + name, subTopologySourceNodes, groupedInternal, - true, - selectKeyMapNode, + repartitionRequired, + graphNode, builder); } @Override - public KGroupedStream groupByKey() { - return groupByKey(Grouped.with(keySerde, valueSerde)); + public KGroupedStream groupBy(final KeyValueMapper keySelector) { + return groupBy(keySelector, Grouped.with(null, valueSerde)); } @Override - public KGroupedStream groupByKey(final Grouped grouped) { + public KGroupedStream groupBy(final KeyValueMapper keySelector, + final Grouped grouped) { + Objects.requireNonNull(keySelector, "keySelector can't be null"); Objects.requireNonNull(grouped, "grouped can't be null"); - final GroupedInternal groupedInternal = new GroupedInternal<>(grouped); + final GroupedInternal groupedInternal = new GroupedInternal<>(grouped); + final ProcessorGraphNode selectKeyMapNode = internalSelectKey(keySelector, new NamedInternal(groupedInternal.name())); + selectKeyMapNode.keyChangingOperation(true); + + builder.addGraphNode(graphNode, selectKeyMapNode); return new KGroupedStreamImpl<>( - name, + selectKeyMapNode.nodeName(), subTopologySourceNodes, groupedInternal, - repartitionRequired, - graphNode, + true, + selectKeyMapNode, builder); }