Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Add javadocs to AbstractMergedSortedCacheStoreIterator #18772

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,31 @@
import java.util.NoSuchElementException;

/**
* Merges two iterators. Assumes each of them is sorted by key
* AbstractMergedSortedCacheStoreIterator is an abstract class for merging two sorted iterators, one from a cache and
* the other from a store. It ensures the merged results maintain sorted order while resolving conflicts between cache
* and store entries.
*
* @param <K>
* @param <V>
* <p>This iterator is used for state stores in Kafka Streams, which have an (optional) caching layer that needs to be
* "merged" with the underlying state. It handles common scenarios like skipping records with cached tombstones (deleted
* entries) and preferring cache entries over store entries when conflicts arise.</p>
*
* @param <K> The type of the resulting merged key.
* @param <KS> The type of the store key.
* @param <V> The type of the resulting merged value.
* @param <VS> The type of the store value.
*/
abstract class AbstractMergedSortedCacheStoreIterator<K, KS, V, VS> implements KeyValueIterator<K, V> {
private final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator;
private final KeyValueIterator<KS, VS> storeIterator;
private final boolean forward;

/**
* Constructs an AbstractMergedSortedCacheStoreIterator.
*
* @param cacheIterator The iterator for the cache, assumed to be sorted by key.
* @param storeIterator The iterator for the store, assumed to be sorted by key.
* @param forward The direction of iteration. True for forward, false for reverse.
*/
AbstractMergedSortedCacheStoreIterator(final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator,
final KeyValueIterator<KS, VS> storeIterator,
final boolean forward) {
Expand All @@ -41,20 +56,79 @@ abstract class AbstractMergedSortedCacheStoreIterator<K, KS, V, VS> implements K
this.forward = forward;
}

/**
* Compares the keys from the cache and store to determine their ordering.
*
* @param cacheKey The key from the cache.
* @param storeKey The key from the store.
*
* @return A negative integer, zero, or a positive integer as the cache key is less than,
* equal to, or greater than the store key.
*/
abstract int compare(final Bytes cacheKey, final KS storeKey);

/**
* Deserializes a store key into a generic merged key type.
*
* @param key The store key to deserialize.
*
* @return The deserialized key.
*/
abstract K deserializeStoreKey(final KS key);

/**
* Deserializes a key-value pair from the store into a generic merged key-value pair.
*
* @param pair The key-value pair from the store.
*
* @return The deserialized key-value pair.
*/
abstract KeyValue<K, V> deserializeStorePair(final KeyValue<KS, VS> pair);

/**
* Deserializes a cache key into a generic merged key type.
*
* @param cacheKey The cache key to deserialize.
*
* @return The deserialized key.
*/
abstract K deserializeCacheKey(final Bytes cacheKey);

/**
* Deserializes a cache entry into a generic value type.
*
* @param cacheEntry The cache entry to deserialize.
*
* @return The deserialized value.
*/
abstract V deserializeCacheValue(final LRUCacheEntry cacheEntry);

/**
* Checks if a cache entry is a tombstone (representing a deleted value).
*
* @param nextFromCache The cache entry to check.
*
* @return True if the cache entry is a tombstone, false otherwise.
*/
private boolean isDeletedCacheEntry(final KeyValue<Bytes, LRUCacheEntry> nextFromCache) {
return nextFromCache.value.value() == null;
}

/**
* Determines if there are more entries to iterate over, resolving conflicts between cache and store entries (e.g.,
* skipping tombstones).
*
* <p>Conflict resolution scenarios:</p>
*
* <ul>
* <li><b>Cache contains a tombstone for a key:</b> Skip both the cache tombstone and the corresponding store entry (if exists).</li>
* <li><b>Cache contains a value for a key present in the store:</b> Prefer the cache value and skip the store entry.</li>
* <li><b>Cache key is unique:</b> Return the cache value.</li>
* <li><b>Store key is unique:</b> Return the store value.</li>
* </ul>
*
* @return True if there are more entries, false otherwise.
*/
@Override
public boolean hasNext() {
// skip over items deleted from cache, and corresponding store items if they have the same key
Expand Down Expand Up @@ -86,6 +160,13 @@ public boolean hasNext() {
return cacheIterator.hasNext() || storeIterator.hasNext();
}

/**
* Retrieves the next key-value pair in the merged iteration.
*
* @return The next key-value pair.
*
* @throws NoSuchElementException If there are no more elements to iterate.
*/
@Override
public KeyValue<K, V> next() {
if (!hasNext()) {
Expand All @@ -107,6 +188,15 @@ public KeyValue<K, V> next() {
return chooseNextValue(nextCacheKey, nextStoreKey, comparison);
}

/**
* Resolves which source (cache or store) to fetch the next key-value pair when a comparison is performed.
*
* @param nextCacheKey The next key from the cache.
* @param nextStoreKey The next key from the store.
* @param comparison The comparison result between the cache and store keys.
*
* @return The next key-value pair.
*/
private KeyValue<K, V> chooseNextValue(final Bytes nextCacheKey,
final KS nextStoreKey,
final int comparison) {
Expand All @@ -133,6 +223,15 @@ private KeyValue<K, V> chooseNextValue(final Bytes nextCacheKey,
}
}

/**
* Fetches the next value from the store, ensuring it matches the expected key.
*
* @param nextStoreKey The expected next key from the store.
*
* @return The next key-value pair from the store.
*
* @throws IllegalStateException If the key does not match the expected key.
*/
private KeyValue<K, V> nextStoreValue(final KS nextStoreKey) {
final KeyValue<KS, VS> next = storeIterator.next();

Expand All @@ -143,6 +242,15 @@ private KeyValue<K, V> nextStoreValue(final KS nextStoreKey) {
return deserializeStorePair(next);
}

/**
* Fetches the next value from the cache, ensuring it matches the expected key.
*
* @param nextCacheKey The expected next key from the cache.
*
* @return The next key-value pair from the cache.
*
* @throws IllegalStateException If the key does not match the expected key.
*/
private KeyValue<K, V> nextCacheValue(final Bytes nextCacheKey) {
final KeyValue<Bytes, LRUCacheEntry> next = cacheIterator.next();

Expand All @@ -153,6 +261,13 @@ private KeyValue<K, V> nextCacheValue(final Bytes nextCacheKey) {
return KeyValue.pair(deserializeCacheKey(next.key), deserializeCacheValue(next.value));
}

/**
* Peeks at the next key in the merged iteration without advancing the iterator.
*
* @return The next key in the iteration.
*
* @throws NoSuchElementException If there are no more elements to peek.
*/
@Override
public K peekNextKey() {
if (!hasNext()) {
Expand All @@ -174,6 +289,18 @@ public K peekNextKey() {
return chooseNextKey(nextCacheKey, nextStoreKey, comparison);
}

/**
* Determines the next key to return from the merged iteration based on the comparison of the cache and store keys.
* Resolves conflicts by considering the iteration direction and ensuring the merged order is maintained.
*
* @param nextCacheKey The next key from the cache.
* @param nextStoreKey The next key from the store.
* @param comparison The comparison result between the cache and store keys. A negative value indicates the cache
* key is smaller, zero indicates equality, and a positive value indicates the store key is
* smaller.
*
* @return The next key to return from the merged iteration.
*/
private K chooseNextKey(final Bytes nextCacheKey,
final KS nextStoreKey,
final int comparison) {
Expand All @@ -200,6 +327,9 @@ private K chooseNextKey(final Bytes nextCacheKey,
}
}

/**
* Closes the iterators and releases any associated resources.
*/
@Override
public void close() {
cacheIterator.close();
Expand Down