Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18209 Cleanup __transaction_state config logic #18201

Open
wants to merge 9 commits into
base: trunk
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package kafka.coordinator.transaction
import kafka.server.{KafkaConfig, MetadataCache, ReplicaManager}
import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.AddPartitionsToTxnResult
import org.apache.kafka.common.message.{DescribeTransactionsResponseData, ListTransactionsResponseData}
Expand All @@ -36,6 +37,8 @@ import java.util.concurrent.atomic.AtomicBoolean
import scala.jdk.CollectionConverters._

object TransactionCoordinator {
val EnforcedCompression: Compression = Compression.NONE
val EnforcedRequiredAcks: Short = -1.toShort

def apply(config: KafkaConfig,
replicaManager: ReplicaManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package kafka.coordinator.transaction

import java.nio.ByteBuffer
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.protocol.{ByteBufferAccessor, MessageUtil}
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.TopicPartition
Expand All @@ -37,14 +36,6 @@ import scala.jdk.CollectionConverters._
*/
object TransactionLog {

// enforce always using
// 1. cleanup policy = compact
// 2. compression = none
// 3. unclean leader election = disabled
// 4. required acks = -1 when writing
val EnforcedCompression: Compression = Compression.NONE
val EnforcedRequiredAcks: Short = (-1).toShort

/**
* Generates the bytes for transaction log message key
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ class TransactionStateManager(brokerId: Int,
if (recordsBuilder == null) {
recordsBuilder = MemoryRecords.builder(
ByteBuffer.allocate(math.min(16384, maxBatchSize)),
TransactionLog.EnforcedCompression,
TransactionCoordinator.EnforcedCompression,
TimestampType.CREATE_TIME,
0L,
maxBatchSize
Expand Down Expand Up @@ -280,7 +280,7 @@ class TransactionStateManager(brokerId: Int,
inReadLock(stateLock) {
replicaManager.appendRecords(
timeout = config.requestTimeoutMs,
requiredAcks = TransactionLog.EnforcedRequiredAcks,
requiredAcks = TransactionCoordinator.EnforcedRequiredAcks,
internalTopicsAllowed = true,
origin = AppendOrigin.COORDINATOR,
entriesPerPartition = Map(transactionPartition -> tombstoneRecords),
Expand Down Expand Up @@ -400,10 +400,19 @@ class TransactionStateManager(brokerId: Int,
def validateTransactionTimeoutMs(txnTimeoutMs: Int): Boolean =
txnTimeoutMs <= config.transactionMaxTimeoutMs && txnTimeoutMs > 0

/**
* Enforce always using:
* <ul>
* <li>cleanup.policy = compact</li>
* <li>compression.type = uncompressed</li>
* <li>unclean.leader.election.enable = false</li>
* </ul>
*
* @return transaction topic properties
*/
def transactionTopicConfigs: Properties = {
val props = new Properties

// enforce disabled unclean leader election, no compression types, and compact cleanup policy
props.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "false")
props.put(TopicConfig.COMPRESSION_TYPE_CONFIG, BrokerCompressionType.UNCOMPRESSED.name)
props.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
Expand Down Expand Up @@ -640,7 +649,7 @@ class TransactionStateManager(brokerId: Int,
val valueBytes = TransactionLog.valueToBytes(newMetadata, transactionVersionLevel())
val timestamp = time.milliseconds()

val records = MemoryRecords.withRecords(TransactionLog.EnforcedCompression, new SimpleRecord(timestamp, keyBytes, valueBytes))
val records = MemoryRecords.withRecords(TransactionCoordinator.EnforcedCompression, new SimpleRecord(timestamp, keyBytes, valueBytes))
val topicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionFor(transactionalId))
val recordsPerPartition = Map(topicPartition -> records)

Expand Down Expand Up @@ -782,7 +791,7 @@ class TransactionStateManager(brokerId: Int,
if (append) {
replicaManager.appendRecords(
timeout = newMetadata.txnTimeoutMs.toLong,
requiredAcks = TransactionLog.EnforcedRequiredAcks,
requiredAcks = TransactionCoordinator.EnforcedRequiredAcks,
internalTopicsAllowed = true,
origin = AppendOrigin.COORDINATOR,
entriesPerPartition = recordsPerPartition,
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1752,7 +1752,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (controlRecords.nonEmpty) {
replicaManager.appendRecords(
timeout = config.requestTimeoutMs.toLong,
requiredAcks = -1,
requiredAcks = TransactionCoordinator.EnforcedRequiredAcks,
internalTopicsAllowed = true,
origin = AppendOrigin.COORDINATOR,
entriesPerPartition = controlRecords,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1154,7 +1154,7 @@ class TransactionStateManagerTest {
val partitionId = transactionManager.partitionFor(transactionalId1)
val topicPartition = new TopicPartition(TRANSACTION_STATE_TOPIC_NAME, partitionId)
val expectedTombstone = new SimpleRecord(time.milliseconds(), TransactionLog.keyToBytes(transactionalId1), null)
val expectedRecords = MemoryRecords.withRecords(TransactionLog.EnforcedCompression, expectedTombstone)
val expectedRecords = MemoryRecords.withRecords(TransactionCoordinator.EnforcedCompression, expectedTombstone)
assertEquals(Set(topicPartition), appendedRecords.keySet)
assertEquals(Seq(expectedRecords), appendedRecords(topicPartition).toSeq)
} else {
Expand Down
Loading