Skip to content

Commit

Permalink
Added Kesque.readBatch and applied on KesqueCompactor.postAppend. Com…
Browse files Browse the repository at this point in the history
…pactor seems working now. #8
  • Loading branch information
dcaoyuan committed Mar 28, 2019
1 parent 14dfe19 commit 680c825
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 22 deletions.
20 changes: 15 additions & 5 deletions kesque/src/main/scala/kesque/HashKeyValueTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ final class HashKeyValueTable private[kesque] (
val start = System.nanoTime

// TODO shift indexTopics and shiftIndexTopics
val indexTopicsOfFileno = Array(indexTopics)
val indexTopicsOfFileno = Array(indexTopics)
val initCounts = Array.fill[Int](indexTopicsOfFileno.length)(0)
val (_, counts) = indexTopicsOfFileno.foldLeft(0, initCounts) {
case ((fileno, counts), idxTps) =>
Expand Down Expand Up @@ -432,21 +432,31 @@ final class HashKeyValueTable private[kesque] (
}
}

def iterateOver(fetchOffset: Long, topic: String)(op: TKeyVal => Unit) = {
def iterateOver(fromOffset: Long, topic: String)(op: TKeyVal => Unit) = {
try {
readLock.lock()

db.iterateOver(topic, fetchOffset, fetchMaxBytes)(op)
db.iterateOver(topic, fromOffset, fetchMaxBytes)(op)
} finally {
readLock.unlock()
}
}

def readOnce(fetchOffset: Long, topic: String)(op: TKeyVal => Unit) = {
def readOnce(fromOffset: Long, topic: String)(op: TKeyVal => Unit) = {
try {
readLock.lock()

db.readOnce(topic, fetchOffset, fetchMaxBytes)(op)
db.readOnce(topic, fromOffset, fetchMaxBytes)(op)
} finally {
readLock.unlock()
}
}

def readBatch(topic: String, fromOffset: Long, fetchMaxBytes: Int): (Long, Array[TKeyVal]) = {
try {
readLock.lock()

db.readBatch(topic, fromOffset, fetchMaxBytes)
} finally {
readLock.unlock()
}
Expand Down
33 changes: 31 additions & 2 deletions kesque/src/main/scala/kesque/Kesque.scala
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ final class Kesque(props: Properties) {
* @param fetchOffset
* @param op: action applied on (offset, key, value)
*/
private[kesque] def iterateOver(topic: String, fetchOffset: Long = 0L, fetchMaxBytes: Int)(op: TKeyVal => Unit) = {
var offset = fetchOffset
private[kesque] def iterateOver(topic: String, fromOffset: Long = 0L, fetchMaxBytes: Int)(op: TKeyVal => Unit) = {
var offset = fromOffset
var nRead = 0
do {
readOnce(topic, offset, fetchMaxBytes)(op) match {
Expand All @@ -134,6 +134,7 @@ final class Kesque(props: Properties) {
/**
* @param topic
* @param from offset
* @param fetchMaxBytes
* @param op: action applied on TKeyVal(key, value, offset, timestamp)
* @return (number of read, last offset read)
*/
Expand All @@ -160,6 +161,34 @@ final class Kesque(props: Properties) {
(count, lastOffset)
}

/**
* @param topic
* @param from offset
* @param fetchMaxBytes
* @return (last offset read, records)
*/
private[kesque] def readBatch(topic: String, fromOffset: Long, fetchMaxBytes: Int): (Long, Array[TKeyVal]) = {
val batch = mutable.ArrayBuffer[TKeyVal]()
val (topicPartition, result) = read(topic, fromOffset, fetchMaxBytes).head
val recs = result.info.records.records.iterator
var lastOffset = fromOffset
while (recs.hasNext) {
val rec = recs.next
val offset = rec.offset
if (offset >= fromOffset) {
val key = if (rec.hasKey) kesque.getBytes(rec.key) else null
val value = if (rec.hasValue) kesque.getBytes(rec.value) else null
val timestamp = rec.timestamp

batch += TKeyVal(key, value, offset.toInt, timestamp)

lastOffset = offset
}
}

(lastOffset, batch.toArray)
}

def shutdown() {
kafkaServer.shutdown()
}
Expand Down
34 changes: 19 additions & 15 deletions khipu-eth/src/main/scala/khipu/store/KesqueCompactor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -132,22 +132,18 @@ object KesqueCompactor {
}

def flush() {
println(s"flush on $topic")
buf foreach {
case TKeyVal(key, _, mixedOffset, _) =>
nodeTable.removeIndexEntry(key, mixedOffset, topic)
}
println(s"flush on $topic, removed index")

val kvs = buf map {
case TKeyVal(key, value, mixedOffset, timestamp) =>
val (_, offset) = HashKeyValueTable.toFileNoAndOffset(mixedOffset)
_maxOffset = math.max(_maxOffset, offset)
TKeyVal(key, value, offset, timestamp)
}
println(s"flush on $topic, writing to $toFileNo")
nodeTable.write(kvs, topic, toFileNo)
println(s"flush on $topic, writing done")

buf.clear()
}
Expand Down Expand Up @@ -246,7 +242,7 @@ final class KesqueCompactor(
}

def start() {
loadSnaphot
loadSnaphot()
postAppend()
gc()
}
Expand All @@ -273,10 +269,15 @@ final class KesqueCompactor(
override def run() {
log.info(s"[comp] post append storage from offset ${storageWriter.maxOffset + 1} ...")
// TODO topic from fromFileNo
storageTable.iterateOver(storageWriter.maxOffset + 1, KesqueDataSource.storage) {
//storageTable.iterateOver(241714020, KesqueDataSource.storage) {
kv => storageWriter.write(kv)
}
var offset = storageWriter.maxOffset + 1
var nRead = 0
do {
val (lastOffset, recs) = accountTable.readBatch(KesqueDataSource.account, offset, 4096)
recs foreach accountWriter.write
nRead = recs.length
offset = lastOffset + 1
} while (nRead > 0)

storageWriter.flush()
log.info(s"[comp] post append storage done.")
}
Expand All @@ -286,12 +287,15 @@ final class KesqueCompactor(
override def run() {
log.info(s"[comp] post append account from offset ${accountWriter.maxOffset + 1} ...")
// TODO topic from fromFileNo
accountTable.iterateOver(accountWriter.maxOffset + 1, KesqueDataSource.account) {
//accountTable.iterateOver(109535465, KesqueDataSource.account, true) {
kv =>
log.info(s"[comp] account iterate on $kv")
accountWriter.write(kv)
}
var offset = accountWriter.maxOffset + 1
var nRead = 0
do {
val (lastOffset, recs) = accountTable.readBatch(KesqueDataSource.account, offset, 4096)
recs foreach accountWriter.write
nRead = recs.length
offset = lastOffset + 1
} while (nRead > 0)

accountWriter.flush()
log.info(s"[comp] post append account done.")
}
Expand Down

0 comments on commit 680c825

Please sign in to comment.