From 974f8aba507a7e788a0dabaa8ea34d66302540ee Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Fri, 24 Jan 2025 22:59:32 -0800 Subject: [PATCH] MINOR: cleanup KStream JavaDocs (10/N) - stream-table-left-join --- .../apache/kafka/streams/kstream/KStream.java | 331 +++--------------- .../kstream/internals/AbstractStream.java | 4 +- .../kstream/internals/KStreamImpl.java | 42 +-- .../kstream/internals/KStreamImplTest.java | 58 +-- .../kstream/internals/KTableImplTest.java | 75 ++-- 5 files changed, 141 insertions(+), 369 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 304ce8bb709bf..3c90d85b5b762 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -2029,25 +2029,31 @@ KStream join(final KTable table, final Joined joined); /** - * Join records of this stream with {@link KTable}'s records using non-windowed left equi join with default - * serializers and deserializers. - * In contrast to {@link #join(KTable, ValueJoiner) inner-join}, all records from this stream will produce an - * output record (cf. below). - * The join is a primary key table lookup join with join attribute {@code stream.key == table.key}. + * Join records of this stream with {@link KTable}'s records using non-windowed left equi-join. + * In contrast to an {@link #join(KTable, ValueJoiner) inner join}, all records from this stream will produce an + * output record (more details below). + * The join is a primary key table lookup join with join attribute {@code streamRecord.key == tableRecord.key}. * "Table lookup join" means, that results are only computed if {@code KStream} records are processed. - * This is done by performing a lookup for matching records in the current (i.e., processing time) internal - * {@link KTable} state. + * This is done by performing a lookup for matching records into the internal {@link KTable} state. * In contrast, processing {@link KTable} input records will only update the internal {@link KTable} state and * will not produce any result records. - *

- * For each {@code KStream} record whether or not it finds a corresponding record in {@link KTable} the provided + * + *

For each {@code KStream} record, regardless if it finds a joining record in the {@link KTable}, the provided * {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. - * If no {@link KTable} record was found during lookup, a {@code null} value will be provided to {@link ValueJoiner}. - * The key of the result record is the same as for both joining input records. - * If an {@code KStream} input record value is {@code null} the record will not be included in the join - * operation and thus no output record will be added to the resulting {@code KStream}. - *

- * Example: + * If no {@link KTable} record with matching key was found during the lookup, {@link ValueJoiner} will be called + * with a {@code null} value for the table record. + * The key of the result record is the same as for both joining input records, + * or the {@code KStreams} input record's key for a left-join result. + * If you need read access to the join key, use {@link #leftJoin(KTable, ValueJoinerWithKey)}. + * If a {@code KStream} input record's value is {@code null} the input record will be dropped, and no join + * computation is triggered. + * Note, that {@code null} keys for {@code KStream} input records are supported (in contrast to + * {@link #join(KTable, ValueJoiner) inner join}) resulting in a left join result. + * If a {@link KTable} input record's key is {@code null} the input record will be dropped, and the table state + * won't be updated. + * {@link KTable} input records with {@code null} values are considered deletes (so-called tombstone) for the table. + * + *

Example: * * * @@ -2074,283 +2080,54 @@ KStream join(final KTable table, * * *
KStream<K1:ValueJoiner(C,b)>
- * Both input streams (or to be more precise, their underlying source topics) need to have the same number of - * partitions. - * If this is not the case, you would need to call {@link #repartition(Repartitioned)} for this {@code KStream} - * before doing the join, specifying the same number of partitions via {@link Repartitioned} parameter as the given - * {@link KTable}. - * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner); - * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}. - * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an - * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. - * The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is - * user-specified in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated - * name, and "-repartition" is a fixed suffix. - *

- * You can retrieve all generated internal topic names via {@link Topology#describe()}. - *

- * Repartitioning can happen only for this {@code KStream} but not for the provided {@link KTable}. - * For this case, all data of the stream will be redistributed through the repartitioning topic by writing all - * records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned - * correctly on its key. * - * @param table the {@link KTable} to be joined with this stream - * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records - * @param the value type of the table - * @param the value type of the result stream - * @return a {@code KStream} that contains join-records for each key and values computed by the given - * {@link ValueJoiner}, one output for each input {@code KStream} record + * By default, {@code KStream} records are processed by performing a lookup for matching records in the + * current (i.e., processing time) internal {@link KTable} state. + * This default implementation does not handle out-of-order records in either input of the join well. + * See {@link #leftJoin(KTable, ValueJoiner, Joined)} on how to configure a stream-table join to handle out-of-order + * data. + * + *

For more details, about co-partitioning requirements, (auto-)repartitioning, and more see + * {@link #join(KStream, ValueJoiner, JoinWindows)}. + * + * @return A {@code KStream} that contains join-records, one for each matched stream record plus one for each + * non-matching stream record, with the corresponding key and a value computed by the given {@link ValueJoiner}. + * * @see #join(KTable, ValueJoiner) - * @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner) */ - KStream leftJoin(final KTable table, - final ValueJoiner joiner); + KStream leftJoin(final KTable table, + final ValueJoiner joiner); /** - * Join records of this stream with {@link KTable}'s records using non-windowed left equi join with default - * serializers and deserializers. - * In contrast to {@link #join(KTable, ValueJoinerWithKey) inner-join}, all records from this stream will produce an - * output record (cf. below). - * The join is a primary key table lookup join with join attribute {@code stream.key == table.key}. - * "Table lookup join" means, that results are only computed if {@code KStream} records are processed. - * This is done by performing a lookup for matching records in the current (i.e., processing time) internal - * {@link KTable} state. - * In contrast, processing {@link KTable} input records will only update the internal {@link KTable} state and - * will not produce any result records. - *

- * For each {@code KStream} record whether or not it finds a corresponding record in {@link KTable} the provided - * {@link ValueJoinerWithKey} will be called to compute a value (with arbitrary type) for the result record. - * If no {@link KTable} record was found during lookup, a {@code null} value will be provided to {@link ValueJoinerWithKey}. - * The key of the result record is the same as for both joining input records. - * Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. - * If an {@code KStream} input record value is {@code null} the record will not be included in the join - * operation and thus no output record will be added to the resulting {@code KStream}. - *

- * Example: - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - *
KStreamKTablestateresult
<K1:A><K1:ValueJoinerWithKey(K1,A,null)>
<K1:b><K1:b>
<K1:C><K1:b><K1:ValueJoinerWithKey(K1,C,b)>
- * Both input streams (or to be more precise, their underlying source topics) need to have the same number of - * partitions. - * If this is not the case, you would need to call {@link #repartition(Repartitioned)} for this {@code KStream} - * before doing the join, specifying the same number of partitions via {@link Repartitioned} parameter as the given - * {@link KTable}. - * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner); - * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)}. - * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an - * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. - * The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is - * user-specified in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated - * name, and "-repartition" is a fixed suffix. - *

- * You can retrieve all generated internal topic names via {@link Topology#describe()}. - *

- * Repartitioning can happen only for this {@code KStream} but not for the provided {@link KTable}. - * For this case, all data of the stream will be redistributed through the repartitioning topic by writing all - * records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned - * correctly on its key. + * See {@link #leftJoin(KTable, ValueJoiner)}. * - * @param table the {@link KTable} to be joined with this stream - * @param joiner a {@link ValueJoinerWithKey} that computes the join result for a pair of matching records - * @param the value type of the table - * @param the value type of the result stream - * @return a {@code KStream} that contains join-records for each key and values computed by the given - * {@link ValueJoinerWithKey}, one output for each input {@code KStream} record - * @see #join(KTable, ValueJoinerWithKey) - * @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoinerWithKey) + *

Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning. */ - KStream leftJoin(final KTable table, - final ValueJoinerWithKey joiner); + KStream leftJoin(final KTable table, + final ValueJoinerWithKey joiner); /** - * Join records of this stream with {@link KTable}'s records using non-windowed left equi join with default - * serializers and deserializers. - * In contrast to {@link #join(KTable, ValueJoiner) inner-join}, all records from this stream will produce an - * output record (cf. below). - * The join is a primary key table lookup join with join attribute {@code stream.key == table.key}. - * "Table lookup join" means, that results are only computed if {@code KStream} records are processed. - * This is done by performing a lookup for matching records in the current (i.e., processing time) internal - * {@link KTable} state. - * In contrast, processing {@link KTable} input records will only update the internal {@link KTable} state and - * will not produce any result records. - *

- * For each {@code KStream} record whether or not it finds a corresponding record in {@link KTable} the provided - * {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. - * If no {@link KTable} record was found during lookup, a {@code null} value will be provided to {@link ValueJoiner}. - * The key of the result record is the same as for both joining input records. - * If an {@code KStream} input record value is {@code null} the record will not be included in the join - * operation and thus no output record will be added to the resulting {@code KStream}. - *

- * Example: - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - *
KStreamKTablestateresult
<K1:A><K1:ValueJoiner(A,null)>
<K1:b><K1:b>
<K1:C><K1:b><K1:ValueJoiner(C,b)>
- * Both input streams (or to be more precise, their underlying source topics) need to have the same number of - * partitions. - * If this is not the case, you would need to call {@link #repartition(Repartitioned)} for this {@code KStream} - * before doing the join, specifying the same number of partitions via {@link Repartitioned} parameter as the given - * {@link KTable}. - * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner); - * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}. - * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an - * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. - * The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is - * user-specified in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated - * name, and "-repartition" is a fixed suffix. - *

- * You can retrieve all generated internal topic names via {@link Topology#describe()}. - *

- * Repartitioning can happen only for this {@code KStream} but not for the provided {@link KTable}. - * For this case, all data of the stream will be redistributed through the repartitioning topic by writing all - * records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned - * correctly on its key. + * Join records of this stream with {@link KTable}'s records using non-windowed left equi-join. + * In contrast to {@link #leftJoin(KTable, ValueJoiner)}, but only if the used {@link KTable} is backed by a + * {@link org.apache.kafka.streams.state.VersionedKeyValueStore VersionedKeyValueStore}, the additional + * {@link Joined} parameter allows to specify a join grace-period, to handle out-of-order data gracefully. * - * @param table the {@link KTable} to be joined with this stream - * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records - * @param joined a {@link Joined} instance that defines the serdes to - * be used to serialize/deserialize inputs and outputs of the joined streams - * @param the value type of the table - * @param the value type of the result stream - * @return a {@code KStream} that contains join-records for each key and values computed by the given - * {@link ValueJoiner}, one output for each input {@code KStream} record - * @see #join(KTable, ValueJoiner, Joined) - * @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner) + *

For details about left-stream-table-join semantics see {@link #leftJoin(KTable, ValueJoiner)}. + * For co-partitioning requirements, (auto-)repartitioning, and more see {@link #join(KTable, ValueJoiner)}. + * If you specify a grace-period to handle out-of-order data, see {@link #join(KTable, ValueJoiner, Joined)}. */ - KStream leftJoin(final KTable table, - final ValueJoiner joiner, - final Joined joined); + KStream leftJoin(final KTable table, + final ValueJoiner joiner, + final Joined joined); /** - * Join records of this stream with {@link KTable}'s records using non-windowed left equi join with default - * serializers and deserializers. - * In contrast to {@link #join(KTable, ValueJoinerWithKey) inner-join}, all records from this stream will produce an - * output record (cf. below). - * The join is a primary key table lookup join with join attribute {@code stream.key == table.key}. - * "Table lookup join" means, that results are only computed if {@code KStream} records are processed. - * This is done by performing a lookup for matching records in the current (i.e., processing time) internal - * {@link KTable} state. - * In contrast, processing {@link KTable} input records will only update the internal {@link KTable} state and - * will not produce any result records. - *

- * For each {@code KStream} record whether or not it finds a corresponding record in {@link KTable} the provided - * {@link ValueJoinerWithKey} will be called to compute a value (with arbitrary type) for the result record. - * If no {@link KTable} record was found during lookup, a {@code null} value will be provided to {@link ValueJoinerWithKey}. - * The key of the result record is the same as for both joining input records. - * Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. - * If an {@code KStream} input record value is {@code null} the record will not be included in the join - * operation and thus no output record will be added to the resulting {@code KStream}. - *

- * Example: - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - *
KStreamKTablestateresult
<K1:A><K1:ValueJoinerWithKey(K1,A,null)>
<K1:b><K1:b>
<K1:C><K1:b><K1:ValueJoinerWithKey(K1,C,b)>
- * Both input streams (or to be more precise, their underlying source topics) need to have the same number of - * partitions. - * If this is not the case, you would need to call {@link #repartition(Repartitioned)} for this {@code KStream} - * before doing the join, specifying the same number of partitions via {@link Repartitioned} parameter as the given - * {@link KTable}. - * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner); - * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)}. - * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an - * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. - * The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is - * user-specified in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated - * name, and "-repartition" is a fixed suffix. - *

- * You can retrieve all generated internal topic names via {@link Topology#describe()}. - *

- * Repartitioning can happen only for this {@code KStream} but not for the provided {@link KTable}. - * For this case, all data of the stream will be redistributed through the repartitioning topic by writing all - * records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned - * correctly on its key. + * See {@link #leftJoin(KTable, ValueJoiner, Joined)}. * - * @param table the {@link KTable} to be joined with this stream - * @param joiner a {@link ValueJoinerWithKey} that computes the join result for a pair of matching records - * @param joined a {@link Joined} instance that defines the serdes to - * be used to serialize/deserialize inputs and outputs of the joined streams - * @param the value type of the table - * @param the value type of the result stream - * @return a {@code KStream} that contains join-records for each key and values computed by the given - * {@link ValueJoinerWithKey}, one output for each input {@code KStream} record - * @see #join(KTable, ValueJoinerWithKey, Joined) - * @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoinerWithKey) + *

Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning. */ - KStream leftJoin(final KTable table, - final ValueJoinerWithKey joiner, - final Joined joined); + KStream leftJoin(final KTable table, + final ValueJoinerWithKey joiner, + final Joined joined); /** * Join records of this stream with {@link GlobalKTable}'s records using non-windowed inner equi join. diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java index 2590c5d70aafc..2133a51d3c885 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java @@ -100,12 +100,12 @@ static ValueJoinerWithKey rever } static ValueMapperWithKey withKey(final ValueMapper valueMapper) { - Objects.requireNonNull(valueMapper, "valueMapper can't be null"); + Objects.requireNonNull(valueMapper, "valueMapper cannot be null"); return (readOnlyKey, value) -> valueMapper.apply(value); } static ValueJoinerWithKey toValueJoinerWithKey(final ValueJoiner valueJoiner) { - Objects.requireNonNull(valueJoiner, "joiner can't be null"); + Objects.requireNonNull(valueJoiner, "joiner cannot be null"); return (readOnlyKey, value1, value2) -> valueJoiner.apply(value1, value2); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index ee2c2b5a14d70..cb8f1986bbc90 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -965,7 +965,7 @@ public KStream join( final KTable table, final ValueJoiner joiner ) { - return join(table, toValueJoinerWithKey(joiner)); + return join(table, toValueJoinerWithKey(joiner), Joined.with(null, null, null)); } @Override @@ -982,9 +982,6 @@ public KStream join( final ValueJoiner joiner, final Joined joined ) { - Objects.requireNonNull(table, "table can't be null"); - Objects.requireNonNull(joiner, "joiner can't be null"); - Objects.requireNonNull(joined, "joined can't be null"); return join(table, toValueJoinerWithKey(joiner), joined); } @@ -994,9 +991,9 @@ public KStream join( final ValueJoinerWithKey joiner, final Joined joined ) { - Objects.requireNonNull(table, "table can't be null"); - Objects.requireNonNull(joiner, "joiner can't be null"); - Objects.requireNonNull(joined, "joined can't be null"); + Objects.requireNonNull(table, "table cannot be null"); + Objects.requireNonNull(joiner, "joiner cannot be null"); + Objects.requireNonNull(joined, "joined cannot be null"); final JoinedInternal joinedInternal = new JoinedInternal<>(joined); final String name = joinedInternal.name(); @@ -1014,34 +1011,31 @@ public KStream join( } @Override - public KStream leftJoin(final KTable table, final ValueJoiner joiner) { - return leftJoin(table, toValueJoinerWithKey(joiner)); + public KStream leftJoin(final KTable table, final ValueJoiner joiner) { + return leftJoin(table, toValueJoinerWithKey(joiner), Joined.with(null, null, null)); } @Override - public KStream leftJoin(final KTable table, final ValueJoinerWithKey joiner) { + public KStream leftJoin(final KTable table, final ValueJoinerWithKey joiner) { return leftJoin(table, joiner, Joined.with(null, null, null)); } @Override - public KStream leftJoin(final KTable table, - final ValueJoiner joiner, - final Joined joined) { - Objects.requireNonNull(table, "table can't be null"); - Objects.requireNonNull(joiner, "joiner can't be null"); - Objects.requireNonNull(joined, "joined can't be null"); - + public KStream leftJoin(final KTable table, + final ValueJoiner joiner, + final Joined joined) { return leftJoin(table, toValueJoinerWithKey(joiner), joined); } @Override - public KStream leftJoin(final KTable table, - final ValueJoinerWithKey joiner, - final Joined joined) { - Objects.requireNonNull(table, "table can't be null"); - Objects.requireNonNull(joiner, "joiner can't be null"); - Objects.requireNonNull(joined, "joined can't be null"); - final JoinedInternal joinedInternal = new JoinedInternal<>(joined); + public KStream leftJoin(final KTable table, + final ValueJoinerWithKey joiner, + final Joined joined) { + Objects.requireNonNull(table, "table cannot be null"); + Objects.requireNonNull(joiner, "joiner cannot be null"); + Objects.requireNonNull(joined, "joined cannot be null"); + + final JoinedInternal joinedInternal = new JoinedInternal<>(joined); final String name = joinedInternal.name(); if (repartitionRequired) { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index a351a6a812c39..934a63911d156 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -230,7 +230,7 @@ public void shouldNotAllowNullMapperOnMapValues() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.mapValues((ValueMapper) null)); - assertThat(exception.getMessage(), equalTo("valueMapper can't be null")); + assertThat(exception.getMessage(), equalTo("valueMapper cannot be null")); } @Test @@ -246,7 +246,7 @@ public void shouldNotAllowNullMapperOnMapValuesWithNamed() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.mapValues((ValueMapper) null, Named.as("valueMapper"))); - assertThat(exception.getMessage(), equalTo("valueMapper can't be null")); + assertThat(exception.getMessage(), equalTo("valueMapper cannot be null")); } @Test @@ -304,7 +304,7 @@ public void shouldNotAllowNullMapperOnFlatMapValues() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.flatMapValues((ValueMapper>) null)); - assertThat(exception.getMessage(), equalTo("valueMapper can't be null")); + assertThat(exception.getMessage(), equalTo("valueMapper cannot be null")); } @Test @@ -322,7 +322,7 @@ public void shouldNotAllowNullMapperOnFlatMapValuesWithNamed() { () -> testStream.flatMapValues( (ValueMapper>) null, Named.as("flatValueMapper"))); - assertThat(exception.getMessage(), equalTo("valueMapper can't be null")); + assertThat(exception.getMessage(), equalTo("valueMapper cannot be null")); } @Test @@ -579,7 +579,7 @@ public void shouldNotAllowNullValueJoinerOnJoin() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.join(testStream, (ValueJoiner) null, JoinWindows.of(ofMillis(10)))); - assertThat(exception.getMessage(), equalTo("joiner can't be null")); + assertThat(exception.getMessage(), equalTo("joiner cannot be null")); } @SuppressWarnings("deprecation") @@ -601,7 +601,7 @@ public void shouldNotAllowNullValueJoinerOnJoinWithStreamJoined() { (ValueJoiner) null, JoinWindows.of(ofMillis(10)), StreamJoined.as("name"))); - assertThat(exception.getMessage(), equalTo("joiner can't be null")); + assertThat(exception.getMessage(), equalTo("joiner cannot be null")); } @SuppressWarnings("deprecation") @@ -678,7 +678,7 @@ public void shouldNotAllowNullValueJoinerOnLeftJoin() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.leftJoin(testStream, (ValueJoiner) null, JoinWindows.of(ofMillis(10)))); - assertThat(exception.getMessage(), equalTo("joiner can't be null")); + assertThat(exception.getMessage(), equalTo("joiner cannot be null")); } @SuppressWarnings("deprecation") @@ -700,7 +700,7 @@ public void shouldNotAllowNullValueJoinerOnLeftJoinWithStreamJoined() { (ValueJoiner) null, JoinWindows.of(ofMillis(10)), StreamJoined.as("name"))); - assertThat(exception.getMessage(), equalTo("joiner can't be null")); + assertThat(exception.getMessage(), equalTo("joiner cannot be null")); } @SuppressWarnings("deprecation") @@ -778,7 +778,7 @@ public void shouldNotAllowNullValueJoinerOnOuterJoin() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.outerJoin(testStream, (ValueJoiner) null, JoinWindows.of(ofMillis(10)))); - assertThat(exception.getMessage(), equalTo("joiner can't be null")); + assertThat(exception.getMessage(), equalTo("joiner cannot be null")); } @SuppressWarnings("deprecation") @@ -800,7 +800,7 @@ public void shouldNotAllowNullValueJoinerOnOuterJoinWithStreamJoined() { (ValueJoiner) null, JoinWindows.of(ofMillis(10)), StreamJoined.as("name"))); - assertThat(exception.getMessage(), equalTo("joiner can't be null")); + assertThat(exception.getMessage(), equalTo("joiner cannot be null")); } @SuppressWarnings("deprecation") @@ -854,7 +854,7 @@ public void shouldNotAllowNullTableOnTableJoin() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.join(null, MockValueJoiner.TOSTRING_JOINER)); - assertThat(exception.getMessage(), equalTo("table can't be null")); + assertThat(exception.getMessage(), equalTo("table cannot be null")); } @Test @@ -862,7 +862,7 @@ public void shouldNotAllowNullTableOnTableJoinWithJoiner() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.join(null, MockValueJoiner.TOSTRING_JOINER, Joined.as("name"))); - assertThat(exception.getMessage(), equalTo("table can't be null")); + assertThat(exception.getMessage(), equalTo("table cannot be null")); } @Test @@ -870,7 +870,7 @@ public void shouldNotAllowNullValueJoinerOnTableJoin() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.join(testTable, (ValueJoiner) null)); - assertThat(exception.getMessage(), equalTo("joiner can't be null")); + assertThat(exception.getMessage(), equalTo("joiner cannot be null")); } @Test @@ -878,7 +878,7 @@ public void shouldNotAllowNullValueJoinerWithKeyOnTableJoin() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.join(testTable, (ValueJoinerWithKey) null)); - assertThat(exception.getMessage(), equalTo("joiner can't be null")); + assertThat(exception.getMessage(), equalTo("joiner cannot be null")); } @Test @@ -886,7 +886,7 @@ public void shouldNotAllowNullValueJoinerOnTableJoinWithJoiner() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.join(testTable, (ValueJoiner) null, Joined.as("name"))); - assertThat(exception.getMessage(), equalTo("joiner can't be null")); + assertThat(exception.getMessage(), equalTo("joiner cannot be null")); } @Test @@ -894,7 +894,7 @@ public void shouldNotAllowNullValueJoinerWithKeyOnTableJoinWithJoiner() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.join(testTable, (ValueJoinerWithKey) null, Joined.as("name"))); - assertThat(exception.getMessage(), equalTo("joiner can't be null")); + assertThat(exception.getMessage(), equalTo("joiner cannot be null")); } @Test @@ -902,7 +902,7 @@ public void shouldNotAllowNullJoinedOnTableJoin() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.join(testTable, MockValueJoiner.TOSTRING_JOINER, null)); - assertThat(exception.getMessage(), equalTo("joined can't be null")); + assertThat(exception.getMessage(), equalTo("joined cannot be null")); } @Test @@ -910,7 +910,7 @@ public void shouldNotAllowNullTableOnTableLeftJoin() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.leftJoin(null, MockValueJoiner.TOSTRING_JOINER)); - assertThat(exception.getMessage(), equalTo("table can't be null")); + assertThat(exception.getMessage(), equalTo("table cannot be null")); } @Test @@ -918,7 +918,7 @@ public void shouldNotAllowNullTableOnTableLeftJoinWithJoined() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.leftJoin(null, MockValueJoiner.TOSTRING_JOINER, Joined.as("name"))); - assertThat(exception.getMessage(), equalTo("table can't be null")); + assertThat(exception.getMessage(), equalTo("table cannot be null")); } @Test @@ -926,7 +926,7 @@ public void shouldNotAllowNullValueJoinerOnTableLeftJoin() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.leftJoin(testTable, (ValueJoiner) null)); - assertThat(exception.getMessage(), equalTo("joiner can't be null")); + assertThat(exception.getMessage(), equalTo("joiner cannot be null")); } @Test @@ -934,7 +934,7 @@ public void shouldNotAllowNullValueJoinerWithKeyOnTableLeftJoin() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.leftJoin(testTable, (ValueJoinerWithKey) null)); - assertThat(exception.getMessage(), equalTo("joiner can't be null")); + assertThat(exception.getMessage(), equalTo("joiner cannot be null")); } @Test @@ -942,7 +942,7 @@ public void shouldNotAllowNullValueJoinerOnTableLeftJoinWithJoined() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.leftJoin(testTable, (ValueJoiner) null, Joined.as("name"))); - assertThat(exception.getMessage(), equalTo("joiner can't be null")); + assertThat(exception.getMessage(), equalTo("joiner cannot be null")); } @Test @@ -950,7 +950,7 @@ public void shouldNotAllowNullValueJoinerWithKeyOnTableLeftJoinWithJoined() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.leftJoin(testTable, (ValueJoinerWithKey) null, Joined.as("name"))); - assertThat(exception.getMessage(), equalTo("joiner can't be null")); + assertThat(exception.getMessage(), equalTo("joiner cannot be null")); } @Test @@ -958,7 +958,7 @@ public void shouldNotAllowNullJoinedOnTableLeftJoin() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.leftJoin(testTable, MockValueJoiner.TOSTRING_JOINER, null)); - assertThat(exception.getMessage(), equalTo("joined can't be null")); + assertThat(exception.getMessage(), equalTo("joined cannot be null")); } @Test @@ -1006,7 +1006,7 @@ public void shouldNotAllowNullValueJoinerOnJoinWithGlobalTable() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.join(testGlobalTable, MockMapper.selectValueMapper(), (ValueJoiner) null)); - assertThat(exception.getMessage(), equalTo("joiner can't be null")); + assertThat(exception.getMessage(), equalTo("joiner cannot be null")); } @Test @@ -1026,7 +1026,7 @@ public void shouldNotAllowNullValueJoinerOnJoinWithGlobalTableWithNamed() { MockMapper.selectValueMapper(), (ValueJoiner) null, Named.as("name"))); - assertThat(exception.getMessage(), equalTo("joiner can't be null")); + assertThat(exception.getMessage(), equalTo("joiner cannot be null")); } @Test @@ -1038,7 +1038,7 @@ public void shouldNotAllowNullValueJoinerWithKeyOnJoinWithGlobalTableWithNamed() MockMapper.selectValueMapper(), (ValueJoiner) null, Named.as("name"))); - assertThat(exception.getMessage(), equalTo("joiner can't be null")); + assertThat(exception.getMessage(), equalTo("joiner cannot be null")); } @Test @@ -1086,7 +1086,7 @@ public void shouldNotAllowNullValueJoinerOnLeftJoinWithGlobalTable() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.leftJoin(testGlobalTable, MockMapper.selectValueMapper(), (ValueJoiner) null)); - assertThat(exception.getMessage(), equalTo("joiner can't be null")); + assertThat(exception.getMessage(), equalTo("joiner cannot be null")); } @Test @@ -1106,7 +1106,7 @@ public void shouldNotAllowNullValueJoinerOnLeftJoinWithGlobalTableWithNamed() { MockMapper.selectValueMapper(), (ValueJoiner) null, Named.as("name"))); - assertThat(exception.getMessage(), equalTo("joiner can't be null")); + assertThat(exception.getMessage(), equalTo("joiner cannot be null")); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java index a293625dc308c..ab37fa6c9d367 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java @@ -73,7 +73,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; -@SuppressWarnings("unchecked") @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.STRICT_STUBS) public class KTableImplTest { @@ -247,103 +246,103 @@ public void close() {} }; assertEquals( - ((AbstractStream) table1.filter((key, value) -> false)).keySerde(), + ((AbstractStream) table1.filter((key, value) -> false)).keySerde(), consumedInternal.keySerde()); assertEquals( - ((AbstractStream) table1.filter((key, value) -> false)).valueSerde(), + ((AbstractStream) table1.filter((key, value) -> false)).valueSerde(), consumedInternal.valueSerde()); assertEquals( - ((AbstractStream) table1.filter((key, value) -> false, Materialized.with(mySerde, mySerde))).keySerde(), + ((AbstractStream) table1.filter((key, value) -> false, Materialized.with(mySerde, mySerde))).keySerde(), mySerde); assertEquals( - ((AbstractStream) table1.filter((key, value) -> false, Materialized.with(mySerde, mySerde))).valueSerde(), + ((AbstractStream) table1.filter((key, value) -> false, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde); assertEquals( - ((AbstractStream) table1.filterNot((key, value) -> false)).keySerde(), + ((AbstractStream) table1.filterNot((key, value) -> false)).keySerde(), consumedInternal.keySerde()); assertEquals( - ((AbstractStream) table1.filterNot((key, value) -> false)).valueSerde(), + ((AbstractStream) table1.filterNot((key, value) -> false)).valueSerde(), consumedInternal.valueSerde()); assertEquals( - ((AbstractStream) table1.filterNot((key, value) -> false, Materialized.with(mySerde, mySerde))).keySerde(), + ((AbstractStream) table1.filterNot((key, value) -> false, Materialized.with(mySerde, mySerde))).keySerde(), mySerde); assertEquals( - ((AbstractStream) table1.filterNot((key, value) -> false, Materialized.with(mySerde, mySerde))).valueSerde(), + ((AbstractStream) table1.filterNot((key, value) -> false, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde); assertEquals( - ((AbstractStream) table1.mapValues(mapper)).keySerde(), + ((AbstractStream) table1.mapValues(mapper)).keySerde(), consumedInternal.keySerde()); - assertNull(((AbstractStream) table1.mapValues(mapper)).valueSerde()); + assertNull(((AbstractStream) table1.mapValues(mapper)).valueSerde()); assertEquals( - ((AbstractStream) table1.mapValues(mapper, Materialized.with(mySerde, mySerde))).keySerde(), + ((AbstractStream) table1.mapValues(mapper, Materialized.with(mySerde, mySerde))).keySerde(), mySerde); assertEquals( - ((AbstractStream) table1.mapValues(mapper, Materialized.with(mySerde, mySerde))).valueSerde(), + ((AbstractStream) table1.mapValues(mapper, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde); assertEquals( - ((AbstractStream) table1.toStream()).keySerde(), + ((AbstractStream) table1.toStream()).keySerde(), consumedInternal.keySerde()); assertEquals( - ((AbstractStream) table1.toStream()).valueSerde(), + ((AbstractStream) table1.toStream()).valueSerde(), consumedInternal.valueSerde()); - assertNull(((AbstractStream) table1.toStream(selector)).keySerde()); + assertNull(((AbstractStream) table1.toStream(selector)).keySerde()); assertEquals( - ((AbstractStream) table1.toStream(selector)).valueSerde(), + ((AbstractStream) table1.toStream(selector)).valueSerde(), consumedInternal.valueSerde()); assertEquals( - ((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier)).keySerde(), + ((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier)).keySerde(), consumedInternal.keySerde()); - assertNull(((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier)).valueSerde()); + assertNull(((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier)).valueSerde()); assertEquals( - ((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier, Materialized.with(mySerde, mySerde))).keySerde(), + ((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier, Materialized.with(mySerde, mySerde))).keySerde(), mySerde); - assertEquals(((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier, Materialized.with(mySerde, mySerde))).valueSerde(), + assertEquals(((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde); - assertNull(((AbstractStream) table1.groupBy(KeyValue::new)).keySerde()); - assertNull(((AbstractStream) table1.groupBy(KeyValue::new)).valueSerde()); + assertNull(((AbstractStream) table1.groupBy(KeyValue::new)).keySerde()); + assertNull(((AbstractStream) table1.groupBy(KeyValue::new)).valueSerde()); assertEquals( - ((AbstractStream) table1.groupBy(KeyValue::new, Grouped.with(mySerde, mySerde))).keySerde(), + ((AbstractStream) table1.groupBy(KeyValue::new, Grouped.with(mySerde, mySerde))).keySerde(), mySerde); assertEquals( - ((AbstractStream) table1.groupBy(KeyValue::new, Grouped.with(mySerde, mySerde))).valueSerde(), + ((AbstractStream) table1.groupBy(KeyValue::new, Grouped.with(mySerde, mySerde))).valueSerde(), mySerde); assertEquals( - ((AbstractStream) table1.join(table1, joiner)).keySerde(), + ((AbstractStream) table1.join(table1, joiner)).keySerde(), consumedInternal.keySerde()); - assertNull(((AbstractStream) table1.join(table1, joiner)).valueSerde()); + assertNull(((AbstractStream) table1.join(table1, joiner)).valueSerde()); assertEquals( - ((AbstractStream) table1.join(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(), + ((AbstractStream) table1.join(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(), mySerde); assertEquals( - ((AbstractStream) table1.join(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(), + ((AbstractStream) table1.join(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde); assertEquals( - ((AbstractStream) table1.leftJoin(table1, joiner)).keySerde(), + ((AbstractStream) table1.leftJoin(table1, joiner)).keySerde(), consumedInternal.keySerde()); - assertNull(((AbstractStream) table1.leftJoin(table1, joiner)).valueSerde()); + assertNull(((AbstractStream) table1.leftJoin(table1, joiner)).valueSerde()); assertEquals( - ((AbstractStream) table1.leftJoin(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(), + ((AbstractStream) table1.leftJoin(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(), mySerde); assertEquals( - ((AbstractStream) table1.leftJoin(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(), + ((AbstractStream) table1.leftJoin(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde); assertEquals( - ((AbstractStream) table1.outerJoin(table1, joiner)).keySerde(), + ((AbstractStream) table1.outerJoin(table1, joiner)).keySerde(), consumedInternal.keySerde()); - assertNull(((AbstractStream) table1.outerJoin(table1, joiner)).valueSerde()); + assertNull(((AbstractStream) table1.outerJoin(table1, joiner)).valueSerde()); assertEquals( - ((AbstractStream) table1.outerJoin(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(), + ((AbstractStream) table1.outerJoin(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(), mySerde); assertEquals( - ((AbstractStream) table1.outerJoin(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(), + ((AbstractStream) table1.outerJoin(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde); } @@ -588,6 +587,7 @@ public void shouldThrowNullPointerOnTransformValuesWithKeyWhenTransformerSupplie assertThrows(NullPointerException.class, () -> table.transformValues(null)); } + @SuppressWarnings("unchecked") @Test public void shouldThrowNullPointerOnTransformValuesWithKeyWhenMaterializedIsNull() { final ValueTransformerWithKeySupplier valueTransformerSupplier = @@ -595,6 +595,7 @@ public void shouldThrowNullPointerOnTransformValuesWithKeyWhenMaterializedIsNull assertThrows(NullPointerException.class, () -> table.transformValues(valueTransformerSupplier, (Materialized>) null)); } + @SuppressWarnings("unchecked") @Test public void shouldThrowNullPointerOnTransformValuesWithKeyWhenStoreNamesNull() { final ValueTransformerWithKeySupplier valueTransformerSupplier =