Skip to content

Commit

Permalink
Removed updatePost methods. #8
Browse files Browse the repository at this point in the history
  • Loading branch information
dcaoyuan committed Mar 3, 2019
1 parent 23559b2 commit d19c5d1
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 49 deletions.
49 changes: 29 additions & 20 deletions kesque/src/main/scala/kesque/HashKeyValueTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,37 +63,47 @@ final class HashKeyValueTable private[kesque] (
}

private def indexTopic(topic: String) = topic + "_idx"
private def postTopic(topic: String) = topic + "~"
private def shiftTopic(topic: String) = topic + "~"

private val postTopics = topics map postTopic
private val indexTopics = topics map indexTopic
private val postIndexTopics = postTopics map indexTopic
private val shiftTopics = topics map shiftTopic
private val shiftIndexTopics = shiftTopics map indexTopic

private val topicsOfFileno = Array(topics, postTopics)
private val indexTopicsOfFileno = Array(indexTopics, postIndexTopics)
private var topicsOfFileno = Array(topics, shiftTopics)
private var indexTopicsOfFileno = Array(indexTopics, shiftIndexTopics)

private val lock = new ReentrantReadWriteLock()
private val readLock = lock.readLock
private val writeLock = lock.writeLock

private class LoadIndexesTask(col: Int, topic: String) extends Thread {
override def run() {
loadOffsets(col)
}
}

loadIndexes()

def shift() {
val x0 = topicsOfFileno(0)
val x1 = topicsOfFileno(1)
topicsOfFileno = Array(x1, x0)

val y0 = indexTopicsOfFileno(0)
val y1 = indexTopicsOfFileno(1)
indexTopicsOfFileno = Array(y1, y0)

// TODO then delete the old first topic
}

private def loadIndexes() {
var tasks = List[Thread]()
var n = 0
while (n < topics.length) {
val topic = topics(n)
caches(n) = new FIFOCache[Hash, (TVal, Int)](cacheSize)
var col = 0
while (col < topics.length) {
val topic = topics(col)
caches(col) = new FIFOCache[Hash, (TVal, Int)](cacheSize)

tasks = new LoadIndexesTask(n, topic) :: tasks
tasks = (new Thread() {
override def run() {
loadOffsetsOf(col)
}
}) :: tasks

n += 1
col += 1
}

val timeIndexTask = if (withTimeToKey) {
Expand All @@ -110,7 +120,7 @@ final class HashKeyValueTable private[kesque] (
timeIndexTask ::: tasks foreach { _.join() }
}

private def loadOffsets(col: Int) {
private def loadOffsetsOf(col: Int) {
info(s"Loading index of ${topics(col)}")
val start = System.nanoTime

Expand Down Expand Up @@ -231,8 +241,7 @@ final class HashKeyValueTable private[kesque] (
}
}

def writePost(kvs: Iterable[TKeyVal], topic: String) = write(kvs, topic, fileno = 1)
def writeSnap(kvs: Iterable[TKeyVal], topic: String) = write(kvs, topic, fileno = 0)
def writeShift(kvs: Iterable[TKeyVal], topic: String) = write(kvs, topic, fileno = 1)
def write(kvs: Iterable[TKeyVal], topic: String): Vector[Iterable[Int]] = write(kvs, topic, fileno = 0)
private def write(kvs: Iterable[TKeyVal], topic: String, fileno: Int): Vector[Iterable[Int]] = {
val col = topicToCol(topic)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,13 +248,6 @@ final class KesqueDataSource(val table: HashKeyValueTable, val topic: String)(im
this
}

override def updatePost(toRemove: Set[Hash], toUpsert: Map[Hash, TVal]): KesqueDataSource = {
// TODO what's the meaning of remove a node? sometimes causes node not found
//table.remove(toRemove.map(_.bytes).toList)
table.writePost(toUpsert.map { case (key, value) => TKeyVal(key.bytes, value.value, _currWritingBlockNumber) }, topic)
this
}

def cacheHitRate = table.cacheHitRate(topic)
def cacheReadCount = table.cacheReadCount(topic)
def resetCacheHitRate() = table.resetCacheHitRate(topic)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,6 @@ final class NodeTableStorage(source: KesqueDataSource)(implicit system: ActorSys
this
}

override def updatePost(toRemove: Set[Hash], toUpsert: Map[Hash, Array[Byte]]): NodeTableStorage = {
//toRemove foreach CachedNodeStorage.remove // TODO remove from repositoty when necessary (pruning)
//toUpsert foreach { case (key, value) => nodeTable.put(key, () => Future(value)) }
source.updatePost(toRemove, toUpsert map { case (key, value) => key -> TVal(value, -1L) })
this
}

def setWritingBlockNumber(writingBlockNumber: Long) = source.setWritingBlockNumber(writingBlockNumber)

override def tableName = source.topic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ final class MerklePatriciaTrie[K, V] private (
case (k, Deleted(_)) => k -> None
case (k, Updated(v)) => k -> Some(v)
}
nodeStorage.updatePost(changes)
nodeStorage.update(changes)
this
}

Expand Down
14 changes: 0 additions & 14 deletions khipu-eth/src/main/scala/khipu/util/SimpleMap.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,20 +68,6 @@ trait SimpleMap[K, V, T <: SimpleMap[K, V, T]] {
update(toRemove, toUpsert)
}

/**
* Since the remove may still have to be saved to reposity, we'll let same key
* in both toRemove and toUpsert
*/
final def updatePost(changes: Iterable[(K, Option[V])]): T = {
val (toRemove, toUpsert) = changes.foldLeft((Set[K](), Map[K, V]())) {
case ((toRemove, toUpsert), (k, None)) => (toRemove + k, toUpsert)
case ((toRemove, toUpsert), (k, Some(v))) => (toRemove, toUpsert + (k -> v))
}
updatePost(toRemove, toUpsert)
}

def updatePost(toRemove: Set[K], toUpsert: Map[K, V]): T = update(toRemove, toUpsert)

/**
* This function updates the KeyValueStore by deleting, updating and inserting new (key-value) pairs.
*
Expand Down

0 comments on commit d19c5d1

Please sign in to comment.