Skip to content

Commit

Permalink
init --version
Browse files Browse the repository at this point in the history
  • Loading branch information
TaiJuWu committed Nov 13, 2024
1 parent 668299e commit ec6aacb
Show file tree
Hide file tree
Showing 10 changed files with 382 additions and 299 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1070,8 +1070,13 @@ private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback call

// Append the record to the accumulator. Note, that the actual partition may be
// calculated there and can be accessed via appendCallbacks.topicPartition.
Short acks = record.acks();
if (acks == null) {
acks = Short.parseShort(producerConfig.getString("acks"));
}

RecordAccumulator.RecordAppendResult result = accumulator.append(record.topic(), partition, timestamp, serializedKey,
serializedValue, headers, appendCallbacks, remainingWaitMs, abortOnNewBatch, nowMs, cluster);
serializedValue, headers, acks, appendCallbacks, remainingWaitMs, abortOnNewBatch, nowMs, cluster);
assert appendCallbacks.getPartition() != RecordMetadata.UNKNOWN_PARTITION;

if (result.abortForNewBatch) {
Expand All @@ -1082,7 +1087,7 @@ private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback call
log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
}
result = accumulator.append(record.topic(), partition, timestamp, serializedKey,
serializedValue, headers, appendCallbacks, remainingWaitMs, false, nowMs, cluster);
serializedValue, headers, acks, appendCallbacks, remainingWaitMs, false, nowMs, cluster);
}

// Add the partition to the transaction (if in progress) after it has been successfully
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class ProducerRecord<K, V> {
private final K key;
private final V value;
private final Long timestamp;
private final Short acks;

/**
* Creates a record with a specified timestamp to be sent to a specified topic and partition
Expand All @@ -65,8 +66,9 @@ public class ProducerRecord<K, V> {
* @param key The key that will be included in the record
* @param value The record contents
* @param headers the headers that will be included in the record
* @param acks acks that will be included in the record
*/
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers, Short acks) {
if (topic == null)
throw new IllegalArgumentException("Topic cannot be null.");
if (timestamp != null && timestamp < 0)
Expand All @@ -81,6 +83,23 @@ public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V
this.value = value;
this.timestamp = timestamp;
this.headers = new RecordHeaders(headers);
this.acks = acks;
}


/**
* Creates a record with a specified timestamp to be sent to a specified topic and partition
*
* @param topic The topic the record will be appended to
* @param partition The partition to which the record should be sent
* @param timestamp The timestamp of the record, in milliseconds since epoch. If null, the producer will assign
* the timestamp using System.currentTimeMillis().
* @param key The key that will be included in the record
* @param value The record contents
* @param headers the headers that will be included in the record
*/
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {
this(topic, partition, timestamp, key, value, headers, null);
}

/**
Expand Down Expand Up @@ -185,14 +204,19 @@ public Integer partition() {
return partition;
}

public Short acks() {
return acks;
}

@Override
public String toString() {
String headers = this.headers == null ? "null" : this.headers.toString();
String key = this.key == null ? "null" : this.key.toString();
String value = this.value == null ? "null" : this.value.toString();
String timestamp = this.timestamp == null ? "null" : this.timestamp.toString();
String acks = this.acks == null ? "null" : this.acks.toString();
return "ProducerRecord(topic=" + topic + ", partition=" + partition + ", headers=" + headers + ", key=" + key + ", value=" + value +
", timestamp=" + timestamp + ")";
", timestamp=" + timestamp + ", acks=" + acks + ")";
}

@Override
Expand All @@ -209,7 +233,8 @@ else if (!(o instanceof ProducerRecord))
Objects.equals(topic, that.topic) &&
Objects.equals(headers, that.headers) &&
Objects.equals(value, that.value) &&
Objects.equals(timestamp, that.timestamp);
Objects.equals(timestamp, that.timestamp) &&
Objects.equals(acks, that.acks);
}

@Override
Expand All @@ -220,6 +245,7 @@ public int hashCode() {
result = 31 * result + (key != null ? key.hashCode() : 0);
result = 31 * result + (value != null ? value.hashCode() : 0);
result = 31 * result + (timestamp != null ? timestamp.hashCode() : 0);
result = 31 * result + (acks != null ? acks.hashCode() : 0);
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,18 @@ private enum FinalState { ABORTED, FAILED, SUCCEEDED }
private long drainedMs;
private boolean retry;
private boolean reopened;
private short acks;

// Tracks the current-leader's epoch to which this batch would be sent, in the current to produce the batch.
private OptionalInt currentLeaderEpoch;
// Tracks the attempt in which leader was changed to currentLeaderEpoch for the 1st time.
private int attemptsWhenLeaderLastChanged;

public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long createdMs) {
this(tp, recordsBuilder, createdMs, false);
public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long createdMs, short acks) {
this(tp, recordsBuilder, createdMs, false, acks);
}

public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long createdMs, boolean isSplitBatch) {
public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long createdMs, boolean isSplitBatch, short acks) {
this.createdMs = createdMs;
this.lastAttemptMs = createdMs;
this.recordsBuilder = recordsBuilder;
Expand All @@ -103,6 +104,7 @@ public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, lon
recordsBuilder.compression().type());
this.currentLeaderEpoch = OptionalInt.empty();
this.attemptsWhenLeaderLastChanged = 0;
this.acks = acks;
recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
}

Expand Down Expand Up @@ -386,13 +388,17 @@ private ProducerBatch createBatchOffAccumulatorForRecord(Record record, int batc
// with how normal batches are handled).
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic(), recordsBuilder.compression(),
TimestampType.CREATE_TIME, 0L);
return new ProducerBatch(topicPartition, builder, this.createdMs, true);
return new ProducerBatch(topicPartition, builder, this.createdMs, true, acks);
}

public boolean isCompressed() {
return recordsBuilder.compression().type() != CompressionType.NONE;
}

public short acks() {
return acks;
}

/**
* A callback and the associated FutureRecordMetadata argument to pass to it.
*/
Expand Down
Loading

0 comments on commit ec6aacb

Please sign in to comment.