Skip to content

Commit

Permalink
Added postAppend, gc methods for KesqueCompactor. #8
Browse files Browse the repository at this point in the history
  • Loading branch information
dcaoyuan committed Mar 27, 2019
1 parent b73d9dd commit 774bf71
Show file tree
Hide file tree
Showing 5 changed files with 284 additions and 22 deletions.
51 changes: 35 additions & 16 deletions kesque/src/main/scala/kesque/HashKeyValueTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ final class HashKeyValueTable private[kesque] (
timeIndexTask ::: tasks foreach { _.join() }
}

final case class LoadIndexTask(col: Int) extends Thread {
// Add 'final' will cause "The outer reference in this type test cannot be checked at run time." compile warning
// This warns about a bug in scalac (scala/bug#4440) that does not exist in Dotty (#2156)
case class LoadIndexTask(col: Int) extends Thread {
override def run() {
loadOffsetsOf(col)
}
Expand Down Expand Up @@ -407,22 +409,39 @@ final class HashKeyValueTable private[kesque] (
}

def removeIndexEntry(keyBytes: Array[Byte], mixedOffset: Int, topic: String) {
val col = topicToCol(topic)
val key = Hash(keyBytes)
val hash = key.hashCode
hashOffsets.get(hash, col) match {
case IntIntsMap.NO_VALUE =>
case mixedOffsets =>
var found = false
var i = 0
while (!found && i < mixedOffsets.length) {
if (mixedOffset == mixedOffsets(i)) {
hashOffsets.removeValue(hash, mixedOffset, col)
found = true
} else {
i += 1
try {
writeLock.lock()

val col = topicToCol(topic)
val key = Hash(keyBytes)
val hash = key.hashCode
hashOffsets.get(hash, col) match {
case IntIntsMap.NO_VALUE =>
case mixedOffsets =>
var found = false
var i = 0
while (!found && i < mixedOffsets.length) {
if (mixedOffset == mixedOffsets(i)) {
hashOffsets.removeValue(hash, mixedOffset, col)
found = true
} else {
i += 1
}
}
}
}
} finally {
writeLock.unlock()
}
}

def removeIndexEntries(topic: String)(cond: (Int, Int) => Boolean) {
try {
writeLock.lock()

val col = topicToCol(topic)
hashOffsets.removeValues(col)(cond)
} finally {
writeLock.unlock()
}
}

Expand Down
11 changes: 11 additions & 0 deletions kesque/src/main/scala/kesque/HashOffsets.scala
Original file line number Diff line number Diff line change
Expand Up @@ -185,5 +185,16 @@ final class HashOffsets(initSize: Int, nValues: Int = 1, fillFactor: Float = 0.7
}
}

def removeValues(col: Int)(cond: (Int, Int) => Boolean) {
try {
readLock.lock()

multipleValuesMap.removeValues(col)(cond)
singleValueMap.removeValues(col)(cond)
} finally {
readLock.unlock()
}
}

def size = singleValueMap.size + multipleValuesMap.size
}
60 changes: 57 additions & 3 deletions kesque/src/main/scala/kesque/IntIntMap.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ object IntIntMap {

var col = 0
while (col < 3) {
println(s"\n=== col $col ===")

var i = -max
while (i <= max) {
map.put(i, i + col, col)
Expand All @@ -85,6 +87,24 @@ object IntIntMap {
}
println(s"count: $count")

// remove values under condition
map.removeValues(col) {
case (k, v) => v < 0
}
println(s"remove all < 0")

// check after remove all
count = 0
map.iterateOver(col) {
case (k, v) =>
//println(s"$k -> $v")
if (v < 0) {
println(s"remove all < 0 err happened")
}
count += 1
}
println(s"count: $count")

// remove
i = -max
while (i <= max) {
Expand Down Expand Up @@ -376,18 +396,52 @@ final class IntIntMap(initSize: Int, nValues: Int, fillFactor: Float = 0.75f) {
val len = m_data.length - 1 - col
while (ptr <= len) {
val k = m_data(ptr)
if (!freeKeyProcessed && k == FREE_KEY && m_hasFreeKey(col)) {
val v = m_freeValue(col)
op(k, v)
if (k == FREE_KEY) {
if (!freeKeyProcessed && m_hasFreeKey(col)) {
val v = m_freeValue(col)
op(k, v)
}
freeKeyProcessed = true
} else {
val v = m_data(ptr + 1 + col)
if (v != NO_VALUE) {
op(k, v)
}
}

ptr += 1 + nValues
}
}

def removeValues(col: Int)(cond: (Int, Int) => Boolean) {
var ptr = 0
var freeKeyProcessed = false
val len = m_data.length - 1 - col
while (ptr <= len) {
val k = m_data(ptr)
var keyRemoved = false
if (k == FREE_KEY) {
if (!freeKeyProcessed && m_hasFreeKey(col)) {
val v = m_freeValue(col)
if (cond(k, v)) {
remove(k, col)
keyRemoved = true
}
}
freeKeyProcessed = true
} else {
val v = m_data(ptr + 1 + col)
if (cond(k, v)) {
remove(k, col)
keyRemoved = true
}
}

// if remove happened, the key may be shifted at ptr, we should re-check it
if (!keyRemoved) {
ptr += 1 + nValues
}
}
}
}

122 changes: 121 additions & 1 deletion kesque/src/main/scala/kesque/IntIntsMap.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ object IntIntsMap {

var col = 0
while (col < 3) {
println(s"col is $col")
println(s"\n=== col $col ===")

// put i and -i
var i = -max
while (i <= max) {
Expand Down Expand Up @@ -44,6 +45,41 @@ object IntIntsMap {
println(map.get(1, col).mkString(","))
println(map.get(max - 1, col).mkString(","))

// iterate
var count = 0
map.iterateOver(col) {
case (k, v) =>
//println(s"$k -> $v")
count += 1
}
println(s"count: $count")

// remove values under condition
map.removeValues(col) {
case (k, v) => v < 0
}
println(s"remove all < 0")

// check after remove all
count = 0
map.iterateOver(col) {
case (k, v) =>
//println(s"$k -> $v")
if (v < 0) {
println(s"remove all < 0 err happened")
}
count += 1
}
println(s"count: $count")

i = -max
while (i <= max) {
map.put(i, i, col)
map.put(i, -i, col)
//println(s"${map.get(i, n).mkString("[", ",", "]")}")
i += 1
}

// remove value -i from map
i = -max
while (i <= max) {
Expand Down Expand Up @@ -475,4 +511,88 @@ final class IntIntsMap(initSize: Int, nValues: Int, fillFactor: Float = 0.75f) {
(currentIndex + 1) % m_mask
}
}

def iterateOver(col: Int)(op: (Int, V) => Unit) {
var ptr = 0
var freeKeyProcessed = false
val len = m_keys.length - 1
while (ptr <= len) {
val k = m_keys(ptr)
if (k == FREE_KEY) {
if (!freeKeyProcessed && m_hasFreeKey(col)) {
val v = m_freeValue(col)
var i = 0
while (i < v.length) {
val x = v(i)
op(k, x)
i += 1
}
}
freeKeyProcessed = true
} else {
val v = m_values(col)(ptr)
if (v != NO_VALUE) {
var i = 0
while (i < v.length) {
val x = v(i)
op(k, x)
i += 1
}
}
}

ptr += 1
}
}

def removeValues(col: Int)(cond: (Int, Int) => Boolean) {
var ptr = 0
var freeKeyProcessed = false
val len = m_keys.length - 1
while (ptr <= len) {
val k = m_keys(ptr)
var keyRemoved = false
if (k == FREE_KEY) {
if (!freeKeyProcessed && m_hasFreeKey(col)) {
val v = m_freeValue(col)
var i = 0
var count = 0
while (i < v.length) {
val x = v(i)
if (cond(k, x)) {
removeValue(k, x, col)
count += 1
}
i += 1
}
if (count == v.length) {
remove(k, col)
keyRemoved = true
}
}
freeKeyProcessed = true
} else {
val v = m_values(col)(ptr)
var count = 0
var i = 0
while (i < v.length) {
val x = v(i)
if (cond(k, x)) {
removeValue(k, x, col)
count += 1
}
i += 1
}
if (count == v.length) {
remove(k, col)
keyRemoved = true
}
}

// if remove happened, the key may be shifted at ptr, we should re-check it
if (!keyRemoved) {
ptr += 1
}
}
}
}
Loading

0 comments on commit 774bf71

Please sign in to comment.