Skip to content

Commit

Permalink
Use large fetch.max.bytes during post append. #8
Browse files Browse the repository at this point in the history
  • Loading branch information
dcaoyuan committed Sep 21, 2019
1 parent ac2eaf4 commit c3e1152
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 19 deletions.
8 changes: 5 additions & 3 deletions khipu-eth/src/main/scala/khipu/storage/KesqueCompactor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ object KesqueCompactor {
import system.dispatcher
lazy val serviceBoard = ServiceBoard(system)
lazy val dbConfig = serviceBoard.dbConfig

private val FETCH_MAX_BYTES_IN_BACTH = 50 * 1024 * 1024 // 52428800, 50M

implicit val logSource: LogSource[AnyRef] = new LogSource[AnyRef] {
def genString(o: AnyRef): String = o.getClass.getName
Expand Down Expand Up @@ -275,8 +277,8 @@ final class KesqueCompactor(
var offset = storageWriter.maxOffset + 1
var nRead = 0
do {
val (lastOffset, recs) = accountTable.readBatch(DbConfig.account, offset, 4096)
recs foreach accountWriter.write
val (lastOffset, recs) = storageTable.readBatch(DbConfig.account, offset, FETCH_MAX_BYTES_IN_BACTH)
recs foreach storageWriter.write
nRead = recs.length
offset = lastOffset + 1
} while (nRead > 0)
Expand All @@ -293,7 +295,7 @@ final class KesqueCompactor(
var offset = accountWriter.maxOffset + 1
var nRead = 0
do {
val (lastOffset, recs) = accountTable.readBatch(DbConfig.account, offset, 4096)
val (lastOffset, recs) = accountTable.readBatch(DbConfig.account, offset, FETCH_MAX_BYTES_IN_BACTH)
recs foreach accountWriter.write
nRead = recs.length
offset = lastOffset + 1
Expand Down
52 changes: 36 additions & 16 deletions khipu-eth/src/main/scala/khipu/storage/KesqueNodeCompactor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ object KesqueNodeCompactor {
implicit lazy val system = ActorSystem("khipu")
import system.dispatcher

private val FETCH_MAX_BYTES_IN_BACTH = 50 * 1024 * 1024 // 52428800, 50M

implicit val logSource: LogSource[AnyRef] = new LogSource[AnyRef] {
def genString(o: AnyRef): String = o.getClass.getName
override def getClazz(o: AnyRef): Class[_] = o.getClass
Expand Down Expand Up @@ -118,16 +120,18 @@ object KesqueNodeCompactor {
final class NodeWriter(topic: String, nodeDataSource: KesqueNodeDataSource) {
private val buf = new mutable.ArrayBuffer[TKeyVal]()

private var _maxOffset = Long.MinValue
private var _maxOffset = -1L // kafka offset starts from 0
def maxOffset = _maxOffset

/**
* Should flush() after all kv are written.
*/
def write(kv: TKeyVal) {
buf += kv
if (buf.size > 100) { // keep the batched size around 4096 (~ 32*100 bytes)
flush()
if (kv ne null) {
buf += kv
if (buf.size > 100) { // keep the batched size around 4096 (~ 32*100 bytes)
flush()
}
}
}

Expand Down Expand Up @@ -223,9 +227,9 @@ final class KesqueNodeCompactor(
}

def start() {
loadSnaphot()
//loadSnaphot()
stopWorld(() => true)
//postAppend()
postAppend()
}

private def loadSnaphot() {
Expand All @@ -252,39 +256,53 @@ final class KesqueNodeCompactor(
* should stop world during postAppend()
*/
private def postAppend() {
val start = System.nanoTime

val storageTask = new Thread {
override def run() {
log.info(s"[comp] post append storage from offset ${storageWriter.maxOffset + 1} ...")
// TODO topic from fromFileNo
println(s"[comp] storage post append from offset ${storageWriter.maxOffset + 1} ...")
val start = System.nanoTime
var offset = storageWriter.maxOffset + 1
var nRead = 0
var count = 0
do {
val (lastOffset, recs) = accountDataSource.readBatch(DbConfig.account, offset, 4096)
recs foreach accountWriter.write
val (lastOffset, recs) = storageDataSource.readBatch(DbConfig.account, offset, FETCH_MAX_BYTES_IN_BACTH)
recs foreach storageWriter.write
nRead = recs.length
offset = lastOffset + 1

count += nRead
val elapsed = (System.nanoTime - start) / 1000000000
val speed = count / math.max(1, elapsed)
println(s"[comp] storage nodes $count $speed/s, at #$blockNumber, table size ${storageDataSource.count}")
} while (nRead > 0)

storageWriter.flush()
log.info(s"[comp] post append storage done.")
println(s"[comp] storage post append done.")
}
}

val accountTask = new Thread {
override def run() {
log.info(s"[comp] post append account from offset ${accountWriter.maxOffset + 1} ...")
// TODO topic from fromFileNo
println(s"[comp] account post append from offset ${accountWriter.maxOffset + 1} ...")
val start = System.nanoTime
var offset = accountWriter.maxOffset + 1
var nRead = 0
var count = 0
do {
val (lastOffset, recs) = accountDataSource.readBatch(DbConfig.account, offset, 4096)
val (lastOffset, recs) = accountDataSource.readBatch(DbConfig.account, offset, FETCH_MAX_BYTES_IN_BACTH)
recs foreach accountWriter.write
nRead = recs.length
offset = lastOffset + 1

count += nRead
val elapsed = (System.nanoTime - start) / 1000000000
val speed = count / math.max(1, elapsed)
println(s"[comp] account nodes $count $speed/s, at #$blockNumber, table size ${accountDataSource.count}")
} while (nRead > 0)

accountWriter.flush()
log.info(s"[comp] post append account done.")
println(s"[comp] account post append done.")
}
}

Expand All @@ -293,6 +311,8 @@ final class KesqueNodeCompactor(

storageTask.join
accountTask.join
log.info(s"[comp] post append done.")

val elapsed = (System.nanoTime - start) / 1000000000
println(s"[comp] post append done in ${elapsed}s.")
}
}

0 comments on commit c3e1152

Please sign in to comment.