Skip to content

Commit

Permalink
MINOR: update Kafka Streams Topology JavaDocs (#18778)
Browse files Browse the repository at this point in the history
Reviewers: Bill Bejeck <[email protected]>
  • Loading branch information
mjsax authored Feb 6, 2025
1 parent 8be2a8e commit 9774635
Show file tree
Hide file tree
Showing 9 changed files with 555 additions and 844 deletions.
1,095 changes: 424 additions & 671 deletions streams/src/main/java/org/apache/kafka/streams/Topology.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.time.Duration;
import java.time.Instant;
import java.util.Objects;
import java.util.function.Supplier;

import static java.lang.String.format;
Expand Down Expand Up @@ -80,9 +81,11 @@ public static String prepareMillisCheckFailMsgPrefix(final Object value, final S
/**
* @throws IllegalArgumentException if the same instance is obtained each time
*/
public static void checkSupplier(final Supplier<?> supplier) {
if (supplier.get() == supplier.get()) {
final String supplierClass = supplier.getClass().getName();
public static void checkSupplier(final Supplier<?> processorSupplier) {
Objects.requireNonNull(processorSupplier, "processorSupplier cannot be null");

if (processorSupplier.get() == processorSupplier.get()) {
final String supplierClass = processorSupplier.getClass().getName();
throw new IllegalArgumentException(String.format("%s generates single reference." +
" %s#get() must return a new object each time it is called.", supplierClass, supplierClass));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1237,10 +1237,9 @@ public <KOut, VOut> KStream<KOut, VOut> process(
final Named named,
final String... stateStoreNames
) {
Objects.requireNonNull(processorSupplier, "processorSupplier can't be null");
ApiUtils.checkSupplier(processorSupplier);
Objects.requireNonNull(named, "named can't be null");
Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be a null array");
ApiUtils.checkSupplier(processorSupplier);
for (final String stateStoreName : stateStoreNames) {
Objects.requireNonNull(stateStoreName, "stateStoreNames can't be null");
}
Expand Down Expand Up @@ -1282,10 +1281,9 @@ public <VOut> KStream<K, VOut> processValues(
final Named named,
final String... stateStoreNames
) {
Objects.requireNonNull(processorSupplier, "processorSupplier can't be null");
ApiUtils.checkSupplier(processorSupplier);
Objects.requireNonNull(named, "named can't be null");
Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be a null array");
ApiUtils.checkSupplier(processorSupplier);
for (final String stateStoreName : stateStoreNames) {
Objects.requireNonNull(stateStoreName, "stateStoreNames can't be null");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,16 @@ public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
consumedInternal().valueDeserializer(),
topicName);

processorParameters.addProcessorTo(topologyBuilder, new String[] {sourceName});
processorParameters.addProcessorTo(topologyBuilder, sourceName);

// if the KTableSource should not be materialized, stores will be null or empty
final KTableSource<K, V> tableSource = (KTableSource<K, V>) processorParameters.processorSupplier();
if (tableSource.stores() != null) {
if (shouldReuseSourceTopicForChangelog) {
// TODO: rewrite this part to use Topology.addReadOnlyStateStore() instead
// should allow to move off using `InternalTopologyBuilder` in favor of the public `Topology` API
tableSource.stores().forEach(store -> {
// connect the source topic as (read-only) changelog topic for fault-tolerance
store.withLoggingDisabled();
topologyBuilder.connectSourceStoreAndTopic(store.name(), topicName);
});
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ public class SinkNode<KIn, VIn> extends ProcessorNode<KIn, VIn, Void, Void> {

private Serializer<KIn> keySerializer;
private Serializer<VIn> valSerializer;
private final TopicNameExtractor<KIn, VIn> topicExtractor;
private final TopicNameExtractor<? super KIn, ? super VIn> topicExtractor;
private final StreamPartitioner<? super KIn, ? super VIn> partitioner;

private InternalProcessorContext<Void, Void> context;

SinkNode(final String name,
final TopicNameExtractor<KIn, VIn> topicExtractor,
final TopicNameExtractor<? super KIn, ? super VIn> topicExtractor,
final Serializer<KIn> keySerializer,
final Serializer<VIn> valSerializer,
final StreamPartitioner<? super KIn, ? super VIn> partitioner) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,10 +423,8 @@ public void process(final Record<Object, Object> record) { }
}
}

@Deprecated // testing old PAPI
@Test
public void shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() {
when(globalStoreBuilder.name()).thenReturn("anyName");
assertThrows(TopologyException.class, () -> topology.addGlobalStore(
globalStoreBuilder,
"sameName",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,15 +507,15 @@ public void shouldNotAllowNullSelectorOnGroupByWithGrouped() {
public void shouldNotAllowNullGroupedOnGroupBy() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.groupBy((k, v) -> k, (Grouped<String, String>) null));
() -> testStream.groupBy((k, v) -> k, null));
assertThat(exception.getMessage(), equalTo("grouped can't be null"));
}

@Test
public void shouldNotAllowNullGroupedOnGroupByKey() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.groupByKey((Grouped<String, String>) null));
() -> testStream.groupByKey(null));
assertThat(exception.getMessage(), equalTo("grouped can't be null"));
}

Expand Down Expand Up @@ -646,7 +646,7 @@ public void shouldNotAllowNullStreamJoinedOnJoin() {
testStream,
MockValueJoiner.TOSTRING_JOINER,
JoinWindows.of(ofMillis(10)),
(StreamJoined<String, String, String>) null));
null));
assertThat(exception.getMessage(), equalTo("streamJoined can't be null"));
}

Expand Down Expand Up @@ -746,7 +746,7 @@ public void shouldNotAllowNullStreamJoinedOnLeftJoin() {
testStream,
MockValueJoiner.TOSTRING_JOINER,
JoinWindows.of(ofMillis(10)),
(StreamJoined<String, String, String>) null));
null));
assertThat(exception.getMessage(), equalTo("streamJoined can't be null"));
}

Expand Down Expand Up @@ -845,7 +845,7 @@ public void shouldNotAllowNullStreamJoinedOnOuterJoin() {
testStream,
MockValueJoiner.TOSTRING_JOINER,
JoinWindows.of(ofMillis(10)),
(StreamJoined<String, String, String>) null));
null));
assertThat(exception.getMessage(), equalTo("streamJoined can't be null"));
}

Expand Down Expand Up @@ -1595,7 +1595,7 @@ public void shouldNotAllowNullProcessSupplierOnProcess() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.process((ProcessorSupplier<? super String, ? super String, Void, Void>) null));
assertThat(exception.getMessage(), equalTo("processorSupplier can't be null"));
assertThat(exception.getMessage(), equalTo("processorSupplier cannot be null"));
}

@Test
Expand All @@ -1604,7 +1604,7 @@ public void shouldNotAllowNullProcessSupplierOnProcessWithStores() {
NullPointerException.class,
() -> testStream.process((ProcessorSupplier<? super String, ? super String, Void, Void>) null,
"storeName"));
assertThat(exception.getMessage(), equalTo("processorSupplier can't be null"));
assertThat(exception.getMessage(), equalTo("processorSupplier cannot be null"));
}

@Test
Expand All @@ -1613,7 +1613,7 @@ public void shouldNotAllowNullProcessSupplierOnProcessWithNamed() {
NullPointerException.class,
() -> testStream.process((ProcessorSupplier<? super String, ? super String, Void, Void>) null,
Named.as("processor")));
assertThat(exception.getMessage(), equalTo("processorSupplier can't be null"));
assertThat(exception.getMessage(), equalTo("processorSupplier cannot be null"));
}

@Test
Expand All @@ -1622,7 +1622,7 @@ public void shouldNotAllowNullProcessSupplierOnProcessWithNamedAndStores() {
NullPointerException.class,
() -> testStream.process((ProcessorSupplier<? super String, ? super String, Void, Void>) null,
Named.as("processor"), "stateStore"));
assertThat(exception.getMessage(), equalTo("processorSupplier can't be null"));
assertThat(exception.getMessage(), equalTo("processorSupplier cannot be null"));
}

@Test
Expand Down Expand Up @@ -1678,7 +1678,7 @@ public void shouldNotAllowNullProcessValuesSupplierOnProcess() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.processValues((FixedKeyProcessorSupplier<? super String, ? super String, Void>) null));
assertThat(exception.getMessage(), equalTo("processorSupplier can't be null"));
assertThat(exception.getMessage(), equalTo("processorSupplier cannot be null"));
}

@Test
Expand All @@ -1687,7 +1687,7 @@ public void shouldNotAllowNullProcessSupplierOnProcessValuesWithStores() {
NullPointerException.class,
() -> testStream.processValues((FixedKeyProcessorSupplier<? super String, ? super String, Void>) null,
"storeName"));
assertThat(exception.getMessage(), equalTo("processorSupplier can't be null"));
assertThat(exception.getMessage(), equalTo("processorSupplier cannot be null"));
}

@Test
Expand All @@ -1696,7 +1696,7 @@ public void shouldNotAllowNullProcessSupplierOnProcessValuesWithNamed() {
NullPointerException.class,
() -> testStream.process((ProcessorSupplier<? super String, ? super String, Void, Void>) null,
Named.as("processor")));
assertThat(exception.getMessage(), equalTo("processorSupplier can't be null"));
assertThat(exception.getMessage(), equalTo("processorSupplier cannot be null"));
}

@Test
Expand All @@ -1705,7 +1705,7 @@ public void shouldNotAllowNullProcessSupplierOnProcessValuesWithNamedAndStores()
NullPointerException.class,
() -> testStream.process((ProcessorSupplier<? super String, ? super String, Void, Void>) null,
Named.as("processor"), "stateStore"));
assertThat(exception.getMessage(), equalTo("processorSupplier can't be null"));
assertThat(exception.getMessage(), equalTo("processorSupplier cannot be null"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ public void shouldNotAllowToAddGlobalStoresWithSameName() {
new MockKeyValueStoreBuilder("testStore", false).withLoggingDisabled();

builder.addGlobalStore(
"global-store",
"global-source",
null,
null,
null,
Expand All @@ -562,11 +562,11 @@ public void shouldNotAllowToAddGlobalStoresWithSameName() {
final TopologyException exception = assertThrows(
TopologyException.class,
() -> builder.addGlobalStore(
"global-store-2",
"global-source-2",
null,
null,
null,
"global-topic",
"global-topic-2",
"global-processor-2",
new StoreDelegatingProcessorSupplier<>(new MockApiProcessorSupplier<>(), Set.of(secondGlobalBuilder)),
false
Expand Down

0 comments on commit 9774635

Please sign in to comment.