Skip to content

Commit

Permalink
refactor: move the flatten map to internal/cache
Browse files Browse the repository at this point in the history
Signed-off-by: Rueian <[email protected]>
  • Loading branch information
rueian committed Feb 10, 2025
1 parent 26ba6aa commit c0e7210
Show file tree
Hide file tree
Showing 7 changed files with 765 additions and 204 deletions.
232 changes: 28 additions & 204 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package rueidis

import (
"context"
"runtime"
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/redis/rueidis/internal/cache"
)

// NewCacheStoreFn can be provided in ClientOption for using a custom CacheStore implementation
Expand Down Expand Up @@ -191,250 +192,73 @@ func (a *adapterEntry) Wait(ctx context.Context) (RedisMessage, error) {
}
}

type flatentry struct {
ovfl *flatentry
next unsafe.Pointer
prev unsafe.Pointer
cmd string
key string
val []byte
ttl int64
size int64
mark int64
mu sync.RWMutex
}

func (f *flatentry) insert(e *flatentry) {
f.size += e.size
f.ttl = e.ttl
f.mu.Lock()
e.ovfl = f.ovfl
f.ovfl = e
f.mu.Unlock()
}

func (f *flatentry) find(cmd string, ts int64) ([]byte, bool) {
if f != nil && ts >= f.ttl {
return nil, true
}
for next := f; next != nil; {
if cmd == next.cmd {
return next.val, false
}
next.mu.RLock()
ovfl := next.ovfl
next.mu.RUnlock()
next = ovfl
}
return nil, false
}

const lrBatchSize = 64
const flattEntrySize = unsafe.Sizeof(flatentry{})

type lrBatch struct {
m map[*flatentry]struct{}
}

func NewFlattenCache(limit int) CacheStore {
f := &flatten{
flights: make(map[string]*adapterEntry),
cache: make(map[string]*flatentry),
head: &flatentry{},
tail: &flatentry{},
size: 0,
limit: int64(limit),
return &flatten{
flights: cache.NewDoubleMap[*adapterEntry](64),
cache: cache.NewLRUDoubleMap[[]byte](64, int64(limit)),
}
f.head.next = unsafe.Pointer(f.tail)
f.tail.prev = unsafe.Pointer(f.head)
f.lrup = sync.Pool{New: func() any {
b := &lrBatch{m: make(map[*flatentry]struct{}, lrBatchSize)}
runtime.SetFinalizer(b, func(b *lrBatch) {
if len(b.m) >= 0 {
f.mu.Lock()
f.llTailBatch(b)
f.mu.Unlock()
}
})
return b
}}
return f
}

type flatten struct {
flights map[string]*adapterEntry
cache map[string]*flatentry
head *flatentry
tail *flatentry
lrup sync.Pool
mark int64
size int64
limit int64
mu sync.RWMutex
}

func (f *flatten) llAdd(e *flatentry) {
e.mark = f.mark
e.prev = f.tail.prev
e.next = unsafe.Pointer(f.tail)
f.tail.prev = unsafe.Pointer(e)
(*flatentry)(e.prev).next = unsafe.Pointer(e)
}

func (f *flatten) llDel(e *flatentry) {
(*flatentry)(e.prev).next = e.next
(*flatentry)(e.next).prev = e.prev
e.mark = -1
}

func (f *flatten) llTail(e *flatentry) {
f.llDel(e)
f.llAdd(e)
}

func (f *flatten) llTailBatch(b *lrBatch) {
for e := range b.m {
if e.mark == f.mark {
f.llTail(e)
}
}
clear(b.m)
}

func (f *flatten) remove(e *flatentry) {
f.size -= e.size
f.llDel(e)
delete(f.cache, e.key)
flights *cache.DoubleMap[*adapterEntry]
cache *cache.LRUDoubleMap[[]byte]
close int32
}

func (f *flatten) Flight(key, cmd string, ttl time.Duration, now time.Time) (RedisMessage, CacheEntry) {
f.mu.RLock()
e := f.cache[key]
f.mu.RUnlock()
ts := now.UnixMilli()
if v, _ := e.find(cmd, ts); v != nil {
batch := f.lrup.Get().(*lrBatch)
batch.m[e] = struct{}{}
if len(batch.m) >= lrBatchSize {
f.mu.Lock()
f.llTailBatch(batch)
f.mu.Unlock()
}
f.lrup.Put(batch)
var ret RedisMessage
_ = ret.CacheUnmarshalView(v)
return ret, nil
if atomic.LoadInt32(&f.close) == 1 {
return RedisMessage{}, nil
}
fk := key + cmd
f.mu.RLock()
af := f.flights[fk]
f.mu.RUnlock()
if af != nil {
return RedisMessage{}, af
}
f.mu.Lock()
e = f.cache[key]
v, expired := e.find(cmd, ts)
if v != nil {
f.llTail(e)
f.mu.Unlock()
ts := now.UnixMilli()
if e, ok := f.cache.Find(key, cmd, ts); ok {
var ret RedisMessage
_ = ret.CacheUnmarshalView(v)
_ = ret.CacheUnmarshalView(e)
return ret, nil
}
defer f.mu.Unlock()
if expired {
f.remove(e)
}
if af = f.flights[fk]; af != nil {
xat := ts + ttl.Milliseconds()
if af, ok := f.flights.FindOrInsert(key, cmd, func() *adapterEntry {
return &adapterEntry{ch: make(chan struct{}), xat: xat}
}); ok {
return RedisMessage{}, af
}
if f.flights != nil {
f.flights[fk] = &adapterEntry{ch: make(chan struct{}), xat: ts + ttl.Milliseconds()}
}
return RedisMessage{}, nil
}

func (f *flatten) Update(key, cmd string, val RedisMessage) (sxat int64) {
fk := key + cmd
f.mu.RLock()
af := f.flights[fk]
f.mu.RUnlock()
if af != nil {
if af, ok := f.flights.Find(key, cmd); ok {
sxat = val.getExpireAt()
if af.xat < sxat || sxat == 0 {
sxat = af.xat
val.setExpireAt(sxat)
}
bs := val.CacheMarshal(nil)
fe := &flatentry{cmd: cmd, val: bs, ttl: sxat, size: int64(len(bs)+len(key)+len(cmd)) + int64(flattEntrySize) + 64} // 64 for 2 map entries
f.mu.Lock()
if f.flights != nil {
delete(f.flights, fk)
f.size += fe.size
for ep := f.head.next; f.size > f.limit && ep != unsafe.Pointer(f.tail); {
e := (*flatentry)(ep)
f.remove(e)
ep = e.next
}
e := f.cache[key]
if e != nil && e.cmd == cmd {
f.size -= e.size
f.llDel(e)
e = nil
}
if e == nil {
fe.key = key
f.cache[key] = fe
f.llAdd(fe)
} else {
e.insert(fe)
}
}
f.mu.Unlock()
f.cache.Insert(key, cmd, int64(len(bs)+len(key)+len(cmd))+int64(cache.LRUEntrySize)+64, sxat, bs)
f.flights.Delete(key, cmd)
af.setVal(val)
}
return sxat
}

func (f *flatten) Cancel(key, cmd string, err error) {
fk := key + cmd
f.mu.Lock()
defer f.mu.Unlock()
if af := f.flights[fk]; af != nil {
delete(f.flights, fk)
if af, ok := f.flights.Find(key, cmd); ok {
f.flights.Delete(key, cmd)
af.setErr(err)
}
}

func (f *flatten) Delete(keys []RedisMessage) {
f.mu.Lock()
defer f.mu.Unlock()
if keys == nil {
f.cache = make(map[string]*flatentry, len(f.cache))
f.head.next = unsafe.Pointer(f.tail)
f.tail.prev = unsafe.Pointer(f.head)
f.mark++
f.size = 0
f.cache.DeleteAll()
} else {
for _, k := range keys {
if e := f.cache[k.string]; e != nil {
f.remove(e)
}
f.cache.Delete(k.string)
}
}
}

func (f *flatten) Close(err error) {
f.mu.Lock()
flights := f.flights
f.flights = nil
f.cache = nil
f.tail = nil
f.head = nil
f.mark++
f.mu.Unlock()
for _, entry := range flights {
atomic.StoreInt32(&f.close, 1)
f.flights.Iterate(func(entry *adapterEntry) {
entry.setErr(err)
}
})
}
72 changes: 72 additions & 0 deletions internal/cache/chain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package cache

type node[V any] struct {
key string
next *node[V]
val V
}
type chain[V any] struct {
node[V]
}

func (h *chain[V]) find(key string) (val V, ok bool) {
if h.node.key == key {
return h.node.val, true
}
for curr := h.node.next; curr != nil; curr = curr.next {
if curr.key == key {
return curr.val, true
}
}
return val, ok
}

func (h *chain[V]) insert(key string, val V) {
if h.node.key == "" {
h.node.key = key
h.node.val = val
} else if h.node.key == key {
h.node.val = val
} else {
n := &node[V]{key: key, val: val}
n.next = h.node.next
h.node.next = n
}
}

func (h *chain[V]) empty() bool {
return h.node.next == nil && h.node.key == ""
}

func (h *chain[V]) delete(key string) bool {
var zero V
if h.node.key == key {
h.node.key = ""
h.node.val = zero
return h.node.next == nil
}

if h.node.next == nil {
return h.node.key == ""
}

if h.node.next.key == key {
h.node.next.key = ""
h.node.next.val = zero
h.node.next, h.node.next.next = h.node.next.next, nil
return h.empty()
}

prev := h.node.next
curr := h.node.next.next
for curr != nil {
if curr.key == key {
curr.key = ""
curr.val = zero
prev.next, curr.next = curr.next, nil
break
}
prev, curr = curr, curr.next
}
return h.empty()
}
Loading

0 comments on commit c0e7210

Please sign in to comment.