From 774bf71ca877a6c62f81f8ba793e296bdcfc14ec Mon Sep 17 00:00:00 2001 From: Caoyuan Deng Date: Wed, 27 Mar 2019 12:40:31 -0700 Subject: [PATCH] Added postAppend, gc methods for KesqueCompactor. #8 --- .../main/scala/kesque/HashKeyValueTable.scala | 51 +++++--- .../src/main/scala/kesque/HashOffsets.scala | 11 ++ kesque/src/main/scala/kesque/IntIntMap.scala | 60 ++++++++- kesque/src/main/scala/kesque/IntIntsMap.scala | 122 +++++++++++++++++- .../scala/khipu/store/KesqueCompactor.scala | 62 ++++++++- 5 files changed, 284 insertions(+), 22 deletions(-) diff --git a/kesque/src/main/scala/kesque/HashKeyValueTable.scala b/kesque/src/main/scala/kesque/HashKeyValueTable.scala index 6da199a..c85f076 100644 --- a/kesque/src/main/scala/kesque/HashKeyValueTable.scala +++ b/kesque/src/main/scala/kesque/HashKeyValueTable.scala @@ -111,7 +111,9 @@ final class HashKeyValueTable private[kesque] ( timeIndexTask ::: tasks foreach { _.join() } } - final case class LoadIndexTask(col: Int) extends Thread { + // Add 'final' will cause "The outer reference in this type test cannot be checked at run time." compile warning + // This warns about a bug in scalac (scala/bug#4440) that does not exist in Dotty (#2156) + case class LoadIndexTask(col: Int) extends Thread { override def run() { loadOffsetsOf(col) } @@ -407,22 +409,39 @@ final class HashKeyValueTable private[kesque] ( } def removeIndexEntry(keyBytes: Array[Byte], mixedOffset: Int, topic: String) { - val col = topicToCol(topic) - val key = Hash(keyBytes) - val hash = key.hashCode - hashOffsets.get(hash, col) match { - case IntIntsMap.NO_VALUE => - case mixedOffsets => - var found = false - var i = 0 - while (!found && i < mixedOffsets.length) { - if (mixedOffset == mixedOffsets(i)) { - hashOffsets.removeValue(hash, mixedOffset, col) - found = true - } else { - i += 1 + try { + writeLock.lock() + + val col = topicToCol(topic) + val key = Hash(keyBytes) + val hash = key.hashCode + hashOffsets.get(hash, col) match { + case IntIntsMap.NO_VALUE => + case mixedOffsets => + var found = false + var i = 0 + while (!found && i < mixedOffsets.length) { + if (mixedOffset == mixedOffsets(i)) { + hashOffsets.removeValue(hash, mixedOffset, col) + found = true + } else { + i += 1 + } } - } + } + } finally { + writeLock.unlock() + } + } + + def removeIndexEntries(topic: String)(cond: (Int, Int) => Boolean) { + try { + writeLock.lock() + + val col = topicToCol(topic) + hashOffsets.removeValues(col)(cond) + } finally { + writeLock.unlock() } } diff --git a/kesque/src/main/scala/kesque/HashOffsets.scala b/kesque/src/main/scala/kesque/HashOffsets.scala index 68103ef..0df16bd 100644 --- a/kesque/src/main/scala/kesque/HashOffsets.scala +++ b/kesque/src/main/scala/kesque/HashOffsets.scala @@ -185,5 +185,16 @@ final class HashOffsets(initSize: Int, nValues: Int = 1, fillFactor: Float = 0.7 } } + def removeValues(col: Int)(cond: (Int, Int) => Boolean) { + try { + readLock.lock() + + multipleValuesMap.removeValues(col)(cond) + singleValueMap.removeValues(col)(cond) + } finally { + readLock.unlock() + } + } + def size = singleValueMap.size + multipleValuesMap.size } diff --git a/kesque/src/main/scala/kesque/IntIntMap.scala b/kesque/src/main/scala/kesque/IntIntMap.scala index 9846953..1059ab3 100644 --- a/kesque/src/main/scala/kesque/IntIntMap.scala +++ b/kesque/src/main/scala/kesque/IntIntMap.scala @@ -61,6 +61,8 @@ object IntIntMap { var col = 0 while (col < 3) { + println(s"\n=== col $col ===") + var i = -max while (i <= max) { map.put(i, i + col, col) @@ -85,6 +87,24 @@ object IntIntMap { } println(s"count: $count") + // remove values under condition + map.removeValues(col) { + case (k, v) => v < 0 + } + println(s"remove all < 0") + + // check after remove all + count = 0 + map.iterateOver(col) { + case (k, v) => + //println(s"$k -> $v") + if (v < 0) { + println(s"remove all < 0 err happened") + } + count += 1 + } + println(s"count: $count") + // remove i = -max while (i <= max) { @@ -376,9 +396,11 @@ final class IntIntMap(initSize: Int, nValues: Int, fillFactor: Float = 0.75f) { val len = m_data.length - 1 - col while (ptr <= len) { val k = m_data(ptr) - if (!freeKeyProcessed && k == FREE_KEY && m_hasFreeKey(col)) { - val v = m_freeValue(col) - op(k, v) + if (k == FREE_KEY) { + if (!freeKeyProcessed && m_hasFreeKey(col)) { + val v = m_freeValue(col) + op(k, v) + } freeKeyProcessed = true } else { val v = m_data(ptr + 1 + col) @@ -386,8 +408,40 @@ final class IntIntMap(initSize: Int, nValues: Int, fillFactor: Float = 0.75f) { op(k, v) } } + ptr += 1 + nValues } } + + def removeValues(col: Int)(cond: (Int, Int) => Boolean) { + var ptr = 0 + var freeKeyProcessed = false + val len = m_data.length - 1 - col + while (ptr <= len) { + val k = m_data(ptr) + var keyRemoved = false + if (k == FREE_KEY) { + if (!freeKeyProcessed && m_hasFreeKey(col)) { + val v = m_freeValue(col) + if (cond(k, v)) { + remove(k, col) + keyRemoved = true + } + } + freeKeyProcessed = true + } else { + val v = m_data(ptr + 1 + col) + if (cond(k, v)) { + remove(k, col) + keyRemoved = true + } + } + + // if remove happened, the key may be shifted at ptr, we should re-check it + if (!keyRemoved) { + ptr += 1 + nValues + } + } + } } diff --git a/kesque/src/main/scala/kesque/IntIntsMap.scala b/kesque/src/main/scala/kesque/IntIntsMap.scala index 3e0e609..3ac99cb 100644 --- a/kesque/src/main/scala/kesque/IntIntsMap.scala +++ b/kesque/src/main/scala/kesque/IntIntsMap.scala @@ -15,7 +15,8 @@ object IntIntsMap { var col = 0 while (col < 3) { - println(s"col is $col") + println(s"\n=== col $col ===") + // put i and -i var i = -max while (i <= max) { @@ -44,6 +45,41 @@ object IntIntsMap { println(map.get(1, col).mkString(",")) println(map.get(max - 1, col).mkString(",")) + // iterate + var count = 0 + map.iterateOver(col) { + case (k, v) => + //println(s"$k -> $v") + count += 1 + } + println(s"count: $count") + + // remove values under condition + map.removeValues(col) { + case (k, v) => v < 0 + } + println(s"remove all < 0") + + // check after remove all + count = 0 + map.iterateOver(col) { + case (k, v) => + //println(s"$k -> $v") + if (v < 0) { + println(s"remove all < 0 err happened") + } + count += 1 + } + println(s"count: $count") + + i = -max + while (i <= max) { + map.put(i, i, col) + map.put(i, -i, col) + //println(s"${map.get(i, n).mkString("[", ",", "]")}") + i += 1 + } + // remove value -i from map i = -max while (i <= max) { @@ -475,4 +511,88 @@ final class IntIntsMap(initSize: Int, nValues: Int, fillFactor: Float = 0.75f) { (currentIndex + 1) % m_mask } } + + def iterateOver(col: Int)(op: (Int, V) => Unit) { + var ptr = 0 + var freeKeyProcessed = false + val len = m_keys.length - 1 + while (ptr <= len) { + val k = m_keys(ptr) + if (k == FREE_KEY) { + if (!freeKeyProcessed && m_hasFreeKey(col)) { + val v = m_freeValue(col) + var i = 0 + while (i < v.length) { + val x = v(i) + op(k, x) + i += 1 + } + } + freeKeyProcessed = true + } else { + val v = m_values(col)(ptr) + if (v != NO_VALUE) { + var i = 0 + while (i < v.length) { + val x = v(i) + op(k, x) + i += 1 + } + } + } + + ptr += 1 + } + } + + def removeValues(col: Int)(cond: (Int, Int) => Boolean) { + var ptr = 0 + var freeKeyProcessed = false + val len = m_keys.length - 1 + while (ptr <= len) { + val k = m_keys(ptr) + var keyRemoved = false + if (k == FREE_KEY) { + if (!freeKeyProcessed && m_hasFreeKey(col)) { + val v = m_freeValue(col) + var i = 0 + var count = 0 + while (i < v.length) { + val x = v(i) + if (cond(k, x)) { + removeValue(k, x, col) + count += 1 + } + i += 1 + } + if (count == v.length) { + remove(k, col) + keyRemoved = true + } + } + freeKeyProcessed = true + } else { + val v = m_values(col)(ptr) + var count = 0 + var i = 0 + while (i < v.length) { + val x = v(i) + if (cond(k, x)) { + removeValue(k, x, col) + count += 1 + } + i += 1 + } + if (count == v.length) { + remove(k, col) + keyRemoved = true + } + } + + // if remove happened, the key may be shifted at ptr, we should re-check it + if (!keyRemoved) { + ptr += 1 + } + } + } } diff --git a/khipu-eth/src/main/scala/khipu/store/KesqueCompactor.scala b/khipu-eth/src/main/scala/khipu/store/KesqueCompactor.scala index e6f172c..a995f9c 100644 --- a/khipu-eth/src/main/scala/khipu/store/KesqueCompactor.scala +++ b/khipu-eth/src/main/scala/khipu/store/KesqueCompactor.scala @@ -117,12 +117,16 @@ object KesqueCompactor { final class NodeWriter(topic: String, nodeTable: HashKeyValueTable, toFileNo: Int) { private val buf = new mutable.ArrayBuffer[TKeyVal]() + private var _maxOffset = Int.MinValue + def maxOffset = _maxOffset + def write(kv: TKeyVal) { buf += kv if (buf.size > 100) { // keep the batched size around 4096 (~ 32*100 bytes) val kvs = buf map { case TKeyVal(key, value, mixedOffset, timestamp) => val (_, offset) = HashKeyValueTable.toFileNoAndOffset(mixedOffset) + _maxOffset = math.max(_maxOffset, offset) TKeyVal(key, value, offset, timestamp) } nodeTable.write(kvs, topic, toFileNo) @@ -198,7 +202,7 @@ object KesqueCompactor { val blockHeaderStorage = storages.blockHeaderStorage val compactor = new KesqueCompactor(kesque, accountTable, storageTable, blockHeaderStorage, 7225555, 0, 1) - compactor.load() + compactor.start() } } final class KesqueCompactor( @@ -245,7 +249,13 @@ final class KesqueCompactor( } } - def load() { + def start() { + load() + postAppend() + gc() + } + + private def load() { log.info(s"[comp] loading nodes of #$blockNumber") for { hash <- blockHeaderStorage.getBlockHash(blockNumber) @@ -259,4 +269,52 @@ final class KesqueCompactor( log.info(s"[comp] all nodes loaded of #$blockNumber") } + /** + * should stop world during postAppend() + */ + private def postAppend() { + log.info(s"[comp] post append storage from offset ${storageWriter.maxOffset + 1} ...") + val storageTask = new Thread { + override def run() { + storageTable.iterateOver(storageWriter.maxOffset + 1, KesqueDataSource.storage) { + kv => storageWriter.write(kv) + } + } + } + + log.info(s"[comp] post append account from offset ${accountWriter.maxOffset + 1} ...") + val accountTask = new Thread { + override def run() { + accountTable.iterateOver(accountWriter.maxOffset + 1, KesqueDataSource.account) { + kv => accountWriter.write(kv) + } + } + } + + storageTask.start + accountTask.start + + storageTask.join + accountTask.join + log.info(s"[comp] post append done.") + } + + /** + * should stop world during gc() + */ + private def gc() { + log.info(s"[comp] gc ...") + targetStorageTable.removeIndexEntries(KesqueDataSource.storage) { + case (k, mixedOffset) => + val (fileNo, _) = HashKeyValueTable.toFileNoAndOffset(mixedOffset) + fileNo == fromFileNo + } + + targetAccountTable.removeIndexEntries(KesqueDataSource.account) { + case (k, mixedOffset) => + val (fileNo, _) = HashKeyValueTable.toFileNoAndOffset(mixedOffset) + fileNo == fromFileNo + } + log.info(s"[comp] gc done.") + } } \ No newline at end of file