Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17182: Consumer fetch sessions are evicted too quickly with AsyncKafkaConsumer #18795

Open
wants to merge 97 commits into
base: trunk
Choose a base branch
from

Conversation

kirktrue
Copy link
Collaborator

@kirktrue kirktrue commented Feb 4, 2025

This change reduces fetch session cache evictions on the broker for the KafkaConsumer by altering its logic to determine which partitions it includes in fetch requests.

Background

Consumer implementations fetch data from the cluster and temporarily buffer it in memory until the user next calls Consumer.poll(). When a fetch request is being generated, partitions that already have buffered data are not included in the fetch request.

The ClassicKafkaConsumer performs much of its fetch logic and network I/O in the application thread. On poll(), if there is any locally-buffered data, the ClassicKafkaConsumer does not fetch any new data and simply returns the buffered data to the user from poll().

On the other hand, the AsyncKafkaConsumer consumer splits its logic and network I/O between two threads, which results in a potential race condition during fetch. The AsyncKafkaConsumer also checks for buffered data on its application thread. If it finds there is none, it signals the background thread to create a fetch request. However, it's possible for the background thread to receive data from a previous fetch and buffer it before the fetch request logic starts. When that occurs, as the background thread creates a new fetch request, it skips any buffered data, which has the unintended result that those partitions get added to the fetch request's "to remove" set. This signals to the broker to remove those partitions from its internal cache.

This issue is technically possible in the ClassicKafkaConsumer too, since the heartbeat thread performs network I/O in addition to the application thread. However, because of the frequency at which the AsyncKafkaConsumer's background thread runs, it is ~100x more likely to happen.

Options

The core decision is: what should the background thread do if it is asked to create a fetch request and it discovers there's buffered data. There were multiple proposals to address this issue in the AsyncKafkaConsumer. Among them are:

  1. The background thread should omit buffered partitions from the fetch request as before (this is the existing behavior)
  2. The background thread should skip the fetch request generation entirely if there are any buffered partitions
  3. The background thread should include buffered partitions in the fetch request, but use a small “max bytes” value
  4. The background thread should skip fetching from the nodes that have buffered partitions

Option 4 won out. The change is localized to AbstractFetch where the basic idea is to skip fetch requests to a given node if that node is the leader for buffered data. By preventing a fetch request from being sent to that node, it won't have any "holes" where the buffered partitions should be.

Testing

Eviction rate testing

Here are the results of our internal stress testing:

  • ClassicKafkaConsumer—after the initial spike during test start up, the average rate settles down to ~0.14 evictions/second CLASSIC
  • AsyncKafkaConsumer, (w/o fix)—after startup, the evictions still settle down, but they are about 100x higher than the ClassicKafkaConsumer at ~1.48 evictions/second CONSUMER-before
  • AsyncKafkaConsumer (w/ fix)—the eviction rate is now closer to the ClassicKafkaConsumer at ~0.22 evictions/second CONSUMER-after

EndToEndLatency testing

The bundled EndToEndLatency test runner was executed on a single machine using Docker. The apache/kafka:latest Docker image was used and either the cluster/combined/plaintext/docker-compose.yml or single-node/plaintext/docker-compose.yml Docker Compose configuration files, depending on the test. The Docker containers were recreated from scratch before each test.

A single topic was created with 30 partitions and with a replication factor of either 1 or 3, depending on a single- or multi-node setup.

For each of the test runs these argument values were used:

  • Message count: 100000
  • acks: 1
  • Message size: 128 bytes

A configuration file which contained a single configuration value of group.protocol=<$group_protocol> was also provided to the test, where $group_protocol was either CLASSIC or CONSUMER.

Test results

Test 1—CLASSIC group protocol, cluster size: 3 nodes, replication factor: 3

Metric trunk PR
Average latency 1.4901 1.4871
50th percentile 1 1
99th percentile 3 3
99.9th percentile 6 6

Test 2—CONSUMER group protocol, cluster size: 3 nodes, replication factor: 3

Metric trunk PR
Average latency 1.4704 1.4807
50th percentile 1 1
99th percentile 3 3
99.9th percentile 6 7

Test 3—CLASSIC group protocol, cluster size: 1 node, replication factor: 1

Metric trunk PR
Average latency 1.0777 1.0193
50th percentile 1 1
99th percentile 2 2
99.9th percentile 5 4

Test 4—CONSUMER group protocol, cluster size: 1 node, replication factor: 1

Metric trunk PR
Average latency 1.0937 1.0503
50th percentile 1 1
99th percentile 2 2
99.9th percentile 4 4

Conclusion

These tests did not reveal any significant differences between the current fetcher logic on trunk and the one proposed in this PR. Addition test runs using larger message counts and/or larger message sizes did not affect the result.

… the new consumer

Updated the FetchRequestManager to only create and enqueue fetch requests when signaled to do so by a FetchEvent.
@kirktrue kirktrue requested a review from Copilot February 4, 2025 00:21
@github-actions github-actions bot added triage PRs from the community consumer clients labels Feb 4, 2025
@kirktrue kirktrue added KIP-848 The Next Generation of the Consumer Rebalance Protocol ctr Consumer Threading Refactor (KIP-848) consumer clients and removed consumer clients labels Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot reviewed 3 out of 3 changed files in this pull request and generated no comments.

Comments suppressed due to low confidence (1)

clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:215

  • [nitpick] The parameter name 'numNodes' could be more descriptive. Consider renaming it to 'numberOfNodes'.
private void assignFromUser(Set<TopicPartition> partitions, int numNodes) {
@kirktrue kirktrue added the Blocker This pull request is identified as solving a blocker for a release. label Feb 4, 2025
Copy link
Collaborator Author

@kirktrue kirktrue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two sub-comments highlight the key differences between #17700 and this PR.

@kirktrue
Copy link
Collaborator Author

kirktrue commented Feb 4, 2025

@jeffkbkim @junrao @lianetm—this is the second attempt at fixing the fix session eviction bug (#17700). I've highlight the differences between the two PRs. It really boils down to the addition of a check for paused partitions. I've added the check, comments, and another relevant unit test.

I ran the StandbyTaskEOSMultiRebalanceIntegrationTest integration test over 260 times without failure (before it failed within 5 tries).

cc @mjsax

@github-actions github-actions bot removed the triage PRs from the community label Feb 4, 2025
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kirktrue : Thanks for the PR. A couple of comments.

// data to be returned.
//
// See FetchCollector.collectFetch().
if (subscriptions.isPaused(partition))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just use subscriptions.isFetchable? Intuitively, an un-assigned/revoking partition shouldn't block the fetching of other partitions.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. isFetchable() replaces both the isAssigned() and isPaused() calls. I did a little more refactoring and moved that whole loop into a new bufferedNodes() method.

//
// - tp0 was collected and thus not in the fetch buffer
// - tp1, while still in the fetch buffer, is paused and its node should be ignored
assertEquals(1, sendFetches());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an easy way to verify the partitions in the fetch request?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good suggestion. I've updated all the tests to ensure the partitions and nodes in the requests are as expected. I also added three more tests for other cases isFetchable() should catch.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Blocker This pull request is identified as solving a blocker for a release. clients consumer ctr Consumer Threading Refactor (KIP-848) KIP-848 The Next Generation of the Consumer Rebalance Protocol
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants