Skip to content

Commit

Permalink
Added method HashKeyValueTable.removeIndexEntry. #8
Browse files Browse the repository at this point in the history
  • Loading branch information
dcaoyuan committed Mar 25, 2019
1 parent 17608e0 commit aa2d558
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 20 deletions.
42 changes: 31 additions & 11 deletions kesque/src/main/scala/kesque/HashKeyValueTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,11 @@ final class HashKeyValueTable private[kesque] (
var foundValue: Option[TVal] = None
var foundOffset = Int.MinValue
var i = mixedOffsets.length - 1 // loop backward to find the newest one
while (i >= 0 && foundValue.isEmpty) {
while (foundValue.isEmpty && i >= 0) {
val mixedOffset = mixedOffsets(i)
val (fileno, offset) = toFileNoAndOffset(mixedOffset)
val theTopic = topicsOfFileno(fileno)(col)
val (topicPartition, result) = db.read(theTopic, offset, fetchMaxBytes).head
val kafkaTopic = topicsOfFileno(fileno)(col) // kafka topic directory name
val (topicPartition, result) = db.read(kafkaTopic, offset, fetchMaxBytes).head
val recs = result.info.records.records.iterator
// NOTE: the records usally do not start from the fecth-offset,
// the expected record may be near the tail of recs
Expand Down Expand Up @@ -294,9 +294,9 @@ final class HashKeyValueTable private[kesque] (
try {
writeLock.lock()

val topic = topicsOfFileno(fileno)(col)
val kafkaTopic = topicsOfFileno(fileno)(col)
// write simple records and create index records
val indexRecords = db.write(topic, records, compressionType).foldLeft(Vector[Vector[SimpleRecord]]()) {
val indexRecords = db.write(kafkaTopic, records, compressionType).foldLeft(Vector[Vector[SimpleRecord]]()) {
case (indexRecords, (topicPartition, LogAppendResult(appendInfo, Some(ex)))) =>
error(ex.getMessage, ex) // TODO
indexRecords
Expand All @@ -305,8 +305,8 @@ final class HashKeyValueTable private[kesque] (
if (appendInfo.numMessages > 0) {
val firstOffert = appendInfo.firstOffset.get
val (lastOffset, idxRecords) = tkvs.foldLeft(firstOffert, Vector[SimpleRecord]()) {
case ((_offset, idxRecords), TKeyVal(keyBytes, value, _, timestamp)) =>
val offset = _offset.toInt
case ((longOffset, idxRecords), TKeyVal(keyBytes, value, _, timestamp)) =>
val offset = longOffset.toInt
val key = Hash(keyBytes)
val hash = key.hashCode
val indexRecord = new SimpleRecord(intToBytes(hash), intToBytes(offset))
Expand Down Expand Up @@ -336,7 +336,7 @@ final class HashKeyValueTable private[kesque] (
case (topicPartition, LogAppendResult(appendInfo, Some(ex))) =>
error(ex.getMessage, ex) // TODO
case (topicPartition, LogAppendResult(appendInfo, None)) =>
debug(s"$topic: append index records ${indexRecords.size}")
debug(s"$kafkaTopic: append index records ${indexRecords.size}")
}

indexRecords.map(_.size)
Expand Down Expand Up @@ -375,9 +375,9 @@ final class HashKeyValueTable private[kesque] (
if (appendInfo.numMessages > 0) {
val firstOffert = appendInfo.firstOffset.get
val (lastOffset, idxRecords) = records.foldLeft(firstOffert, Vector[SimpleRecord]()) {
case ((offset, idxRecords), (key, _)) =>
val keyh = Hash(key)
val hash = keyh.hashCode
case ((offset, idxRecords), (keyBytes, _)) =>
val key = Hash(keyBytes)
val hash = key.hashCode
val indexRecord = new SimpleRecord(intToBytes(hash), intToBytes(offset.toInt))

// remove action always happens on file 1
Expand Down Expand Up @@ -410,6 +410,26 @@ final class HashKeyValueTable private[kesque] (
}
}

def removeIndexEntry(keyBytes: Array[Byte], mixedOffset: Int, topic: String) {
val col = topicToCol(topic)
val key = Hash(keyBytes)
val hash = key.hashCode
hashOffsets.get(hash, col) match {
case IntIntsMap.NO_VALUE => None
case mixedOffsets =>
var found = false
var i = 0
while (!found && i < mixedOffsets.length) {
if (mixedOffset == mixedOffsets(i)) {
hashOffsets.removeValue(hash, mixedOffset, col)
found = true
} else {
i += 1
}
}
}
}

def iterateOver(fetchOffset: Long, topic: String)(op: TKeyVal => Unit) = {
try {
readLock.lock()
Expand Down
29 changes: 20 additions & 9 deletions khipu-eth/src/main/scala/khipu/store/KesqueCompactor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,15 @@ object KesqueCompactor {
Some(key, blockNumber)
} else {
nodeTable.read(key, topic, bypassCache = true) match {
case Some(TVal(bytes, offset, blockNumber)) =>
case Some(TVal(bytes, mixedOffset, blockNumber)) =>
nodeCount += 1
if (nodeCount % 1000 == 0) {
val elapsed = (System.nanoTime - start) / 1000000000
val speed = nodeCount / math.max(1, elapsed)
log.info(s"[comp] $topic $nodeCount nodes $speed/s, at $blockNumber")
}

nodeGot(TKeyVal(key, bytes, offset, blockNumber))
nodeGot(TKeyVal(key, bytes, mixedOffset, blockNumber))
Some(bytes, blockNumber)

case None =>
Expand All @@ -120,17 +120,28 @@ object KesqueCompactor {
def write(kv: TKeyVal) {
buf += kv
if (buf.size > 100) { // keep the batched size around 4096 (~ 32*100 bytes)
nodeTable.write(buf, topic)
nodeTable.writeShift(buf, topic)
buf foreach {
case TKeyVal(key, _, mixedOffset, _) =>
nodeTable.removeIndexEntry(key, mixedOffset, topic)
}
buf.clear()
}
}

def flush() {
nodeTable.write(buf, topic)
nodeTable.writeShift(buf, topic)
buf foreach {
case TKeyVal(key, _, mixedOffset, _) =>
nodeTable.removeIndexEntry(key, mixedOffset, topic)
}
buf.clear()
}
}

/**
* Used for testing only
*/
private def initTablesBySelf() = {
val khipuPath = new File(classOf[KesqueDataSource].getProtectionDomain.getCodeSource.getLocation.toURI).getParentFile.getParentFile
//val configDir = new File(khipuPath, "../src/universal/conf")
Expand Down Expand Up @@ -172,7 +183,7 @@ object KesqueCompactor {
val storageTable = storages.storageNodeDataSource.table
val blockHeaderStorage = storages.blockHeaderStorage

val compactor = new KesqueCompactor(kesque, accountTable, storageTable, blockHeaderStorage, 6574258)
val compactor = new KesqueCompactor(kesque, accountTable, storageTable, blockHeaderStorage, 7225550)
compactor.load()
}
}
Expand All @@ -187,11 +198,11 @@ final class KesqueCompactor(

val log = Logging(system, this)

private val targetStorageTable = kesque.getTable(Array(KesqueDataSource.storage + "_comp"), 4096, CompressionType.NONE, 1024)
private val targetAccountTable = kesque.getTable(Array(KesqueDataSource.account + "_comp"), 4096, CompressionType.NONE, 1024)
private val targetStorageTable = kesque.getTable(Array(KesqueDataSource.storage), 4096, CompressionType.NONE, 1024)
private val targetAccountTable = kesque.getTable(Array(KesqueDataSource.account), 4096, CompressionType.NONE, 1024)

private val storageWriter = new NodeWriter(KesqueDataSource.storage + "_comp", targetStorageTable)
private val accountWriter = new NodeWriter(KesqueDataSource.account + "_comp", targetAccountTable)
private val storageWriter = new NodeWriter(KesqueDataSource.storage, targetStorageTable)
private val accountWriter = new NodeWriter(KesqueDataSource.account, targetAccountTable)

private val storageReader = new NodeReader[UInt256](KesqueDataSource.storage, storageTable)(trie.rlpUInt256Serializer) {
override def nodeGot(kv: TKeyVal) {
Expand Down

0 comments on commit aa2d558

Please sign in to comment.