Skip to content

Commit

Permalink
Avoid write multiple times for same key, got KesqueCompactor test pas…
Browse files Browse the repository at this point in the history
…sed. #8
  • Loading branch information
dcaoyuan committed Mar 25, 2019
1 parent 1d772d8 commit 7147bf1
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 12 deletions.
4 changes: 1 addition & 3 deletions kesque/src/main/scala/kesque/HashKeyValueTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,7 @@ final class HashKeyValueTable private[kesque] (
write(kvs, topic, fileno = 1)
}

def write(kvs: Iterable[TKeyVal], topic: String): Vector[Iterable[Int]] = write(kvs, topic, fileno = 0)

private def write(kvs: Iterable[TKeyVal], topic: String, fileno: Int): Vector[Iterable[Int]] = {
def write(kvs: Iterable[TKeyVal], topic: String, fileno: Int = 0): Vector[Iterable[Int]] = {
val col = topicToCol(topic)

// prepare simple records, filter no changed ones
Expand Down
26 changes: 17 additions & 9 deletions khipu-eth/src/main/scala/khipu/store/KesqueCompactor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ object KesqueCompactor {

}

final class NodeWriter(topic: String, nodeTable: HashKeyValueTable) {
final class NodeWriter(topic: String, nodeTable: HashKeyValueTable, toFileNo: Int) {
private val buf = new mutable.ArrayBuffer[TKeyVal]()

def write(kv: TKeyVal) {
Expand All @@ -125,7 +125,7 @@ object KesqueCompactor {
val (_, offset) = HashKeyValueTable.toFileNoAndOffset(mixedOffset)
TKeyVal(key, value, offset, timestamp)
}
nodeTable.writeShift(kvs, topic)
nodeTable.write(kvs, topic, toFileNo)

buf foreach {
case TKeyVal(key, _, mixedOffset, _) =>
Expand All @@ -142,7 +142,7 @@ object KesqueCompactor {
val (_, offset) = HashKeyValueTable.toFileNoAndOffset(mixedOffset)
TKeyVal(key, value, offset, timestamp)
}
nodeTable.writeShift(kvs, topic)
nodeTable.write(kvs, topic, toFileNo)

buf foreach {
case TKeyVal(key, _, mixedOffset, _) =>
Expand Down Expand Up @@ -197,7 +197,7 @@ object KesqueCompactor {
val storageTable = storages.storageNodeDataSource.table
val blockHeaderStorage = storages.blockHeaderStorage

val compactor = new KesqueCompactor(kesque, accountTable, storageTable, blockHeaderStorage, 7225555)
val compactor = new KesqueCompactor(kesque, accountTable, storageTable, blockHeaderStorage, 7225555, 0, 1)
compactor.load()
}
}
Expand All @@ -206,7 +206,9 @@ final class KesqueCompactor(
accountTable: HashKeyValueTable,
storageTable: HashKeyValueTable,
blockHeaderStorage: BlockHeaderStorage,
blockNumber: Long
blockNumber: Long,
fromFileNo: Int,
toFileNo: Int
) {
import KesqueCompactor._

Expand All @@ -215,12 +217,15 @@ final class KesqueCompactor(
private val targetStorageTable = kesque.getTable(Array(KesqueDataSource.storage), 4096, CompressionType.NONE, 1024)
private val targetAccountTable = kesque.getTable(Array(KesqueDataSource.account), 4096, CompressionType.NONE, 1024)

private val storageWriter = new NodeWriter(KesqueDataSource.storage, targetStorageTable)
private val accountWriter = new NodeWriter(KesqueDataSource.account, targetAccountTable)
private val storageWriter = new NodeWriter(KesqueDataSource.storage, targetStorageTable, toFileNo)
private val accountWriter = new NodeWriter(KesqueDataSource.account, targetAccountTable, toFileNo)

private val storageReader = new NodeReader[UInt256](KesqueDataSource.storage, storageTable)(trie.rlpUInt256Serializer) {
override def nodeGot(kv: TKeyVal) {
storageWriter.write(kv)
val (fileno, _) = HashKeyValueTable.toFileNoAndOffset(kv.offset)
if (fileno == fromFileNo) {
storageWriter.write(kv)
}
}
}

Expand All @@ -233,7 +238,10 @@ final class KesqueCompactor(
}

override def nodeGot(kv: TKeyVal) {
accountWriter.write(kv)
val (fileno, _) = HashKeyValueTable.toFileNoAndOffset(kv.offset)
if (fileno == fromFileNo) {
accountWriter.write(kv)
}
}
}

Expand Down

0 comments on commit 7147bf1

Please sign in to comment.