Skip to content

Commit

Permalink
docs(streams): add javadocs to AbstractMergedSortedCacheStoreIterator
Browse files Browse the repository at this point in the history
  • Loading branch information
fonsdant committed Feb 1, 2025
1 parent 484ba83 commit df4fa92
Showing 1 changed file with 133 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,31 @@
import java.util.NoSuchElementException;

/**
* Merges two iterators. Assumes each of them is sorted by key
* AbstractMergedSortedCacheStoreIterator is an abstract class for merging two sorted iterators, one from a cache and
* the other from a store. It ensures the merged results maintain sorted order while resolving conflicts between cache
* and store entries.
*
* @param <K>
* @param <V>
* <p>This iterator is primarily used in Kafka Streams' state store layer, where state is backed by an in-memory cache
* and persistent store. It handles common scenarios like skipping tombstones (deleted entries) and preferring cache
* entries over store entries when conflicts arise.</p>
*
* @param <K> The type of the resulting merged key.
* @param <KS> The type of the store key.
* @param <V> The type of the resulting merged value.
* @param <VS> The type of the store value.
*/
abstract class AbstractMergedSortedCacheStoreIterator<K, KS, V, VS> implements KeyValueIterator<K, V> {
private final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator;
private final KeyValueIterator<KS, VS> storeIterator;
private final boolean forward;

/**
* Constructs an AbstractMergedSortedCacheStoreIterator.
*
* @param cacheIterator The iterator for the cache, assumed to be sorted by key.
* @param storeIterator The iterator for the store, assumed to be sorted by key.
* @param forward The direction of iteration. True for forward, false for reverse.
*/
AbstractMergedSortedCacheStoreIterator(final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator,
final KeyValueIterator<KS, VS> storeIterator,
final boolean forward) {
Expand All @@ -41,20 +56,79 @@ abstract class AbstractMergedSortedCacheStoreIterator<K, KS, V, VS> implements K
this.forward = forward;
}

/**
* Compares the keys from the cache and store to determine their ordering.
*
* @param cacheKey The key from the cache.
* @param storeKey The key from the store.
*
* @return A negative integer, zero, or a positive integer as the cache key is less than,
* equal to, or greater than the store key.
*/
abstract int compare(final Bytes cacheKey, final KS storeKey);

/**
* Deserializes a store key into a generic merged key type.
*
* @param key The store key to deserialize.
*
* @return The deserialized key.
*/
abstract K deserializeStoreKey(final KS key);

/**
* Deserializes a key-value pair from the store into a generic merged key-value pair.
*
* @param pair The key-value pair from the store.
*
* @return The deserialized key-value pair.
*/
abstract KeyValue<K, V> deserializeStorePair(final KeyValue<KS, VS> pair);

/**
* Deserializes a cache key into a generic merged key type.
*
* @param cacheKey The cache key to deserialize.
*
* @return The deserialized key.
*/
abstract K deserializeCacheKey(final Bytes cacheKey);

/**
* Deserializes a cache entry into a generic value type.
*
* @param cacheEntry The cache entry to deserialize.
*
* @return The deserialized value.
*/
abstract V deserializeCacheValue(final LRUCacheEntry cacheEntry);

/**
* Checks if a cache entry is a tombstone (representing a deleted value).
*
* @param nextFromCache The cache entry to check.
*
* @return True if the cache entry is a tombstone, false otherwise.
*/
private boolean isDeletedCacheEntry(final KeyValue<Bytes, LRUCacheEntry> nextFromCache) {
return nextFromCache.value.value() == null;
}

/**
* Determines if there are more entries to iterate over, resolving conflicts between cache and store entries (e.g.,
* skipping tombstones).
*
* <p>Conflict resolution scenarios:</p>
*
* <ul>
* <li><b>Cache contains a tombstone for a key:</b> Skip both the cache tombstone and the corresponding store entry (if exists).</li>
* <li><b>Cache contains a value for a key present in the store:</b> Prefer the cache value and skip the store entry.</li>
* <li><b>Cache key is unique:</b> Return the cache value.</li>
* <li><b>Store key is unique:</b> Return the store value.</li>
* </ul>
*
* @return True if there are more entries, false otherwise.
*/
@Override
public boolean hasNext() {
// skip over items deleted from cache, and corresponding store items if they have the same key
Expand Down Expand Up @@ -86,6 +160,13 @@ public boolean hasNext() {
return cacheIterator.hasNext() || storeIterator.hasNext();
}

/**
* Retrieves the next key-value pair in the merged iteration.
*
* @return The next key-value pair.
*
* @throws NoSuchElementException If there are no more elements to iterate.
*/
@Override
public KeyValue<K, V> next() {
if (!hasNext()) {
Expand All @@ -107,6 +188,15 @@ public KeyValue<K, V> next() {
return chooseNextValue(nextCacheKey, nextStoreKey, comparison);
}

/**
* Resolves which source (cache or store) to fetch the next key-value pair when a comparison is performed.
*
* @param nextCacheKey The next key from the cache.
* @param nextStoreKey The next key from the store.
* @param comparison The comparison result between the cache and store keys.
*
* @return The next key-value pair.
*/
private KeyValue<K, V> chooseNextValue(final Bytes nextCacheKey,
final KS nextStoreKey,
final int comparison) {
Expand All @@ -133,6 +223,15 @@ private KeyValue<K, V> chooseNextValue(final Bytes nextCacheKey,
}
}

/**
* Fetches the next value from the store, ensuring it matches the expected key.
*
* @param nextStoreKey The expected next key from the store.
*
* @return The next key-value pair from the store.
*
* @throws IllegalStateException If the key does not match the expected key.
*/
private KeyValue<K, V> nextStoreValue(final KS nextStoreKey) {
final KeyValue<KS, VS> next = storeIterator.next();

Expand All @@ -143,6 +242,15 @@ private KeyValue<K, V> nextStoreValue(final KS nextStoreKey) {
return deserializeStorePair(next);
}

/**
* Fetches the next value from the cache, ensuring it matches the expected key.
*
* @param nextCacheKey The expected next key from the cache.
*
* @return The next key-value pair from the cache.
*
* @throws IllegalStateException If the key does not match the expected key.
*/
private KeyValue<K, V> nextCacheValue(final Bytes nextCacheKey) {
final KeyValue<Bytes, LRUCacheEntry> next = cacheIterator.next();

Expand All @@ -153,6 +261,13 @@ private KeyValue<K, V> nextCacheValue(final Bytes nextCacheKey) {
return KeyValue.pair(deserializeCacheKey(next.key), deserializeCacheValue(next.value));
}

/**
* Peeks at the next key in the merged iteration without advancing the iterator.
*
* @return The next key in the iteration.
*
* @throws NoSuchElementException If there are no more elements to peek.
*/
@Override
public K peekNextKey() {
if (!hasNext()) {
Expand All @@ -174,6 +289,18 @@ public K peekNextKey() {
return chooseNextKey(nextCacheKey, nextStoreKey, comparison);
}

/**
* Determines the next key to return from the merged iteration based on the comparison of the cache and store keys.
* Resolves conflicts by considering the iteration direction and ensuring the merged order is maintained.
*
* @param nextCacheKey The next key from the cache.
* @param nextStoreKey The next key from the store.
* @param comparison The comparison result between the cache and store keys. A negative value indicates the cache
* key is smaller, zero indicates equality, and a positive value indicates the store key is
* smaller.
*
* @return The next key to return from the merged iteration.
*/
private K chooseNextKey(final Bytes nextCacheKey,
final KS nextStoreKey,
final int comparison) {
Expand All @@ -200,6 +327,9 @@ private K chooseNextKey(final Bytes nextCacheKey,
}
}

/**
* Closes the iterators and releases any associated resources.
*/
@Override
public void close() {
cacheIterator.close();
Expand Down

0 comments on commit df4fa92

Please sign in to comment.