Skip to content

Commit

Permalink
KAFKA-18644: improve generic type names for KStreamImpl and KTableImpl (
Browse files Browse the repository at this point in the history
#18722)

Reviewers: Bill Bejeck <[email protected]>, Lucas Brutschy <[email protected]>
  • Loading branch information
mjsax authored Jan 31, 2025
1 parent 184b891 commit 0d1e7e0
Show file tree
Hide file tree
Showing 21 changed files with 291 additions and 234 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3044,7 +3044,7 @@ <GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalTable,
* @see #map(KeyValueMapper)
*/
<KOut, VOut> KStream<KOut, VOut> process(
final ProcessorSupplier<? super K, ? super V, KOut, VOut> processorSupplier,
final ProcessorSupplier<? super K, ? super V, ? extends KOut, ? extends VOut> processorSupplier,
final String... stateStoreNames
);

Expand Down Expand Up @@ -3144,7 +3144,7 @@ <KOut, VOut> KStream<KOut, VOut> process(
* @see #processValues(FixedKeyProcessorSupplier, Named, String...)
*/
<KOut, VOut> KStream<KOut, VOut> process(
final ProcessorSupplier<? super K, ? super V, KOut, VOut> processorSupplier,
final ProcessorSupplier<? super K, ? super V, ? extends KOut, ? extends VOut> processorSupplier,
final Named named,
final String... stateStoreNames
);
Expand Down Expand Up @@ -3244,7 +3244,7 @@ <KOut, VOut> KStream<KOut, VOut> process(
* @see #process(ProcessorSupplier, Named, String...)
*/
<VOut> KStream<K, VOut> processValues(
final FixedKeyProcessorSupplier<? super K, ? super V, VOut> processorSupplier,
final FixedKeyProcessorSupplier<? super K, ? super V, ? extends VOut> processorSupplier,
final String... stateStoreNames
);

Expand Down Expand Up @@ -3344,7 +3344,7 @@ <VOut> KStream<K, VOut> processValues(
* @see #process(ProcessorSupplier, Named, String...)
*/
<VOut> KStream<K, VOut> processValues(
final FixedKeyProcessorSupplier<? super K, ? super V, VOut> processorSupplier,
final FixedKeyProcessorSupplier<? super K, ? super V, ? extends VOut> processorSupplier,
final Named named,
final String... stateStoreNames
);
Expand Down
74 changes: 38 additions & 36 deletions streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ public interface KTable<K, V> {
* @return a {@code KTable} that contains only those records that satisfy the given predicate
* @see #filterNot(Predicate)
*/
KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final Named named);
KTable<K, V> filter(final Predicate<? super K, ? super V> predicate,
final Named named);

/**
* Create a new {@code KTable} that consists of all records of this {@code KTable} which satisfy the given
Expand Down Expand Up @@ -241,7 +242,8 @@ KTable<K, V> filter(final Predicate<? super K, ? super V> predicate,
* @return a {@code KTable} that contains only those records that do <em>not</em> satisfy the given predicate
* @see #filter(Predicate)
*/
KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final Named named);
KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate,
final Named named);

/**
* Create a new {@code KTable} that consists all records of this {@code KTable} which do <em>not</em> satisfy the
Expand Down Expand Up @@ -1069,7 +1071,7 @@ <VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super
* @param <VR> the value type of the result {@link KGroupedTable}
* @return a {@link KGroupedTable} that contains the re-grouped records of the original {@code KTable}
*/
<KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector);
<KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> selector);

/**
* Re-groups the records of this {@code KTable} using the provided {@link KeyValueMapper}
Expand Down Expand Up @@ -1101,7 +1103,7 @@ <VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super
* @param <VR> the value type of the result {@link KGroupedTable}
* @return a {@link KGroupedTable} that contains the re-grouped records of the original {@code KTable}
*/
<KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector,
<KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> selector,
final Grouped<KR, VR> grouped);

/**
Expand Down Expand Up @@ -2109,8 +2111,8 @@ <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*/
<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner);
final Function<? super V, ? extends KO> foreignKeyExtractor,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner);

/**
* Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join.
Expand All @@ -2127,8 +2129,8 @@ <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*/
<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final BiFunction<K, V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner);
final BiFunction<? super K, ? super V, ? extends KO> foreignKeyExtractor,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner);

/**
* Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join,
Expand All @@ -2149,8 +2151,8 @@ <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*/
<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Function<? super V, ? extends KO> foreignKeyExtractor,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final TableJoined<K, KO> tableJoined);

/**
Expand All @@ -2172,8 +2174,8 @@ <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*/
<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final BiFunction<K, V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final BiFunction<? super K, ? super V, ? extends KO> foreignKeyExtractor,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final TableJoined<K, KO> tableJoined);
/**
* Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join.
Expand All @@ -2192,8 +2194,8 @@ <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*/
<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Function<? super V, ? extends KO> foreignKeyExtractor,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

/**
Expand All @@ -2213,8 +2215,8 @@ <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*/
<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final BiFunction<K, V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final BiFunction<? super K, ? super V, ? extends KO> foreignKeyExtractor,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

/**
Expand All @@ -2238,8 +2240,8 @@ <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*/
<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Function<? super V, ? extends KO> foreignKeyExtractor,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final TableJoined<K, KO> tableJoined,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

Expand All @@ -2264,8 +2266,8 @@ <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*/
<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final BiFunction<K, V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final BiFunction<? super K, ? super V, ? extends KO> foreignKeyExtractor,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final TableJoined<K, KO> tableJoined,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

Expand All @@ -2284,8 +2286,8 @@ <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
* @return a {@code KTable} that contains only those records that satisfy the given predicate
*/
<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner);
final Function<? super V, ? extends KO> foreignKeyExtractor,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner);

/**
* Join records of this {@code KTable} with another {@code KTable} using non-windowed left join.
Expand All @@ -2302,8 +2304,8 @@ <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
* @return a {@code KTable} that contains only those records that satisfy the given predicate
*/
<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
final BiFunction<K, V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner);
final BiFunction<? super K, ? super V, ? extends KO> foreignKeyExtractor,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner);

/**
* Join records of this {@code KTable} with another {@code KTable} using non-windowed left join,
Expand All @@ -2323,8 +2325,8 @@ <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*/
<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Function<? super V, ? extends KO> foreignKeyExtractor,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final TableJoined<K, KO> tableJoined);

/**
Expand All @@ -2345,8 +2347,8 @@ <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*/
<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
final BiFunction<K, V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final BiFunction<? super K, ? super V, ? extends KO> foreignKeyExtractor,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final TableJoined<K, KO> tableJoined);

/**
Expand All @@ -2366,8 +2368,8 @@ <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*/
<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Function<? super V, ? extends KO> foreignKeyExtractor,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

/**
Expand All @@ -2387,8 +2389,8 @@ <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*/
<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
final BiFunction<K, V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final BiFunction<? super K, ? super V, ? extends KO> foreignKeyExtractor,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

/**
Expand All @@ -2412,8 +2414,8 @@ <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*/
<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Function<? super V, ? extends KO> foreignKeyExtractor,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final TableJoined<K, KO> tableJoined,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

Expand All @@ -2438,8 +2440,8 @@ <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*/
<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
final BiFunction<K, V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final BiFunction<? super K, ? super V, ? extends KO> foreignKeyExtractor,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final TableJoined<K, KO> tableJoined,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,20 +91,20 @@ Set<String> ensureCopartitionWith(final Collection<? extends AbstractStream<K, ?
return allSourceNodes;
}

static <T2, T1, R> ValueJoiner<T2, T1, R> reverseJoiner(final ValueJoiner<T1, T2, R> joiner) {
static <VRight, VLeft, VOut> ValueJoiner<VRight, VLeft, VOut> reverseJoiner(final ValueJoiner<VLeft, VRight, VOut> joiner) {
return (value2, value1) -> joiner.apply(value1, value2);
}

static <K, T2, T1, R> ValueJoinerWithKey<K, T2, T1, R> reverseJoinerWithKey(final ValueJoinerWithKey<K, T1, T2, R> joiner) {
static <K, VRight, VLeft, VOut> ValueJoinerWithKey<K, VRight, VLeft, VOut> reverseJoinerWithKey(final ValueJoinerWithKey<K, VLeft, VRight, VOut> joiner) {
return (key, value2, value1) -> joiner.apply(key, value1, value2);
}

static <K, V, VR> ValueMapperWithKey<K, V, VR> withKey(final ValueMapper<V, VR> valueMapper) {
static <K, V, VOut> ValueMapperWithKey<K, V, VOut> withKey(final ValueMapper<V, VOut> valueMapper) {
Objects.requireNonNull(valueMapper, "valueMapper can't be null");
return (readOnlyKey, value) -> valueMapper.apply(value);
}

static <K, V1, V2, VR> ValueJoinerWithKey<K, V1, V2, VR> toValueJoinerWithKey(final ValueJoiner<V1, V2, VR> valueJoiner) {
static <K, VLeft, VRight, VOut> ValueJoinerWithKey<K, VLeft, VRight, VOut> toValueJoinerWithKey(final ValueJoiner<VLeft, VRight, VOut> valueJoiner) {
Objects.requireNonNull(valueJoiner, "joiner can't be null");
return (readOnlyKey, value1, value2) -> valueJoiner.apply(value1, value2);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@

public class ForeachProcessor<K, V> implements Processor<K, V, Void, Void> {

private final ForeachAction<K, V> action;
private final ForeachAction<? super K, ? super V> action;

public ForeachProcessor(final ForeachAction<K, V> action) {
public ForeachProcessor(final ForeachAction<? super K, ? super V> action) {
this.action = action;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,12 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
this.userProvidedRepartitionTopicName = groupedInternal.name();
}

private <VAgg> KTable<K, VAgg> doAggregate(final ProcessorSupplier<K, Change<V>, K, Change<VAgg>> aggregateSupplier,
final NamedInternal named,
final String functionName,
final MaterializedInternal<K, VAgg, KeyValueStore<Bytes, byte[]>> materialized) {

private <VAgg> KTable<K, VAgg> doAggregate(
final ProcessorSupplier<K, Change<V>, K, Change<VAgg>> aggregateSupplier,
final NamedInternal named,
final String functionName,
final MaterializedInternal<K, VAgg, KeyValueStore<Bytes, byte[]>> materialized
) {
final String sinkName = named.suffixWithOrElseGet("-sink", builder, KStreamImpl.SINK_NAME);
final String sourceName = named.suffixWithOrElseGet("-source", builder, KStreamImpl.SOURCE_NAME);
final String funcName = named.orElseGenerateWithPrefix(builder, functionName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@

class KStreamFilter<K, V> implements FixedKeyProcessorSupplier<K, V, V> {

private final Predicate<K, V> predicate;
private final Predicate<? super K, ? super V> predicate;
private final boolean filterNot;

public KStreamFilter(final Predicate<K, V> predicate, final boolean filterNot) {
public KStreamFilter(final Predicate<? super K, ? super V> predicate, final boolean filterNot) {
this.predicate = predicate;
this.filterNot = filterNot;
}
Expand Down
Loading

0 comments on commit 0d1e7e0

Please sign in to comment.