-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathstore.go
276 lines (256 loc) · 7.74 KB
/
store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
// Copyright (c) 2013 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed under the
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.
// Data store for btree, organised in two files, index-file and kv-file.
//
// index-file,
// contains head block, list of free blocks within index file,
// and btree-blocks.
//
// head,
// 512 byte sector written at the head of the file. contains reference to
// the root bock, head-sector-size, free-list-size and block-size.
// freelist,
// contains a list of 8-byte offset into the index file that contains
// free blocks.
//
// kv-file,
// contains key, value, docid bytes. They are always added in append
// only mode, and a separate read-fd fetches them in random-access. Refer to
// appendkv.go for more information.
package btree
import (
"log"
"os"
)
// constants that are relevant for index-file and kv-file
const (
OFFSET_SIZE = 8 // 64 bit offset
SECTOR_SIZE = 512 // Disk drive sector size in bytes.
FLIST_SIZE = 1024 * OFFSET_SIZE // default free list size in bytes.
BLOCK_SIZE = 1024 * 64 // default block size in bytes.
)
type Store struct {
Config
wstore *WStore // Reference to write-store.
kvRfd *os.File // Random read-only access for kv-file.
idxRfd *os.File // Random read-only access for index-file.
}
//---- functions and receivers
// Construct a new `Store` object.
func NewStore(conf Config) *Store {
wstore := OpenWStore(conf)
store := &Store{
Config: conf,
wstore: wstore,
idxRfd: openRfd(conf.Idxfile),
kvRfd: openRfd(conf.Kvfile),
}
// TODO : Check whether index file is sane, both configuration and
// freelist.
return store
}
// Close will release all resources maintained by store.
func (store *Store) Close() {
store.kvRfd.Close()
store.kvRfd = nil
store.idxRfd.Close()
store.idxRfd = nil
store.wstore.CloseWStore()
store.wstore = nil
}
// Destroy is opposite of Create, it cleans up the datafiles. Data files will
// be deleted only when all references to WStore is removed.
func (store *Store) Destroy() {
store.kvRfd.Close()
store.kvRfd = nil
store.idxRfd.Close()
store.idxRfd = nil
// Close and destroy
if store.wstore.CloseWStore() {
store.wstore.DestroyWStore()
}
store.wstore = nil
}
// Fetch the root btree block from index-file. `transaction` must be true for
// write access. It is assumed that there will be only one outstanding
// transaction at any given time, so the caller has to make sure to acquire a
// transaction lock from MVCC controller.
func (store *Store) OpStart(transaction bool) (Node, *MV, int64) {
var mv *MV
var root Node
var ts, rootfpos int64
if transaction {
store.wstore.translock <- true
ts, rootfpos = store.wstore.access(transaction)
mvroot := mvRoot(store)
if mvroot == 0 {
mvroot = rootfpos
}
if store.Debug {
log.Println("MV Root: ", mvroot)
}
staleroot := store.FetchMVCache(mvroot)
root = staleroot.copyOnWrite(store)
mv = &MV{stales: []int64{mvroot}, commits: make(map[int64]Node)}
mv.commits[root.getKnode().fpos] = root
} else {
ts, rootfpos = store.wstore.access(transaction)
if store.Debug {
log.Println("Root: ", rootfpos)
}
root = store.FetchNCache(rootfpos)
mv = &MV{stales: []int64{}, commits: make(map[int64]Node)}
mv.commits[root.getKnode().fpos] = root
}
mv.timestamp = ts
store.wstore.opCounts += 1
return root, mv, ts
}
// Opposite of OpStart() API.
func (store *Store) OpEnd(transaction bool, mv *MV, ts int64) {
minAccess := store.wstore.release(ts)
if transaction {
store.wstore.commit(mv, minAccess, false)
<-store.wstore.translock
}
}
// Fetch a node, identified by its file-position, from cache. If it is not
// available from cache, fetch from disk and cache them in memory. To learn
// how nodes are cached, refer to cache.go
func (store *Store) FetchNCache(fpos int64) Node {
var node Node
// Sanity check
fpos_firstblock, blocksize := store.wstore.fpos_firstblock, store.Blocksize
if fpos < fpos_firstblock || (fpos-fpos_firstblock)%blocksize != 0 {
panic("Invalid fpos to fetch")
}
// Try to fetch from cache
if store.Debug {
log.Println("fetch", fpos)
}
if node = store.wstore.ncacheLookup(fpos); node == nil {
store.wstore.loadCounts += 1
node = store.FetchNode(fpos)
store.wstore.ncache(node)
}
if store.Debug {
store.wstore.freelist.assertNotMember(fpos)
}
return node
}
// Fetch a node, identified by its file-position, from commitQ or from memory
// cache. If it is not available from memory fetch from disk.
// NOTE: multi-version fetches are only used from index mutations and they
// eventually end up in commitQ under a new file-position. They are not cached
// into memory until the snapshot is flushed into the disk.
func (store *Store) FetchMVCache(fpos int64) Node {
var node Node
// Sanity check
fpos_firstblock, blocksize := store.wstore.fpos_firstblock, store.Blocksize
if fpos < fpos_firstblock || (fpos-fpos_firstblock)%blocksize != 0 {
log.Panicln("Invalid fpos to fetch", fpos)
}
// Try to fetch from commitQ
if node = store.wstore.ccacheLookup(fpos); node == nil {
// Try to fetch from cache
if node = store.wstore.ncacheLookup(fpos); node == nil {
store.wstore.MVloadCounts += 1
node = store.FetchNode(fpos)
}
}
if store.Debug {
store.wstore.freelist.assertNotMember(fpos)
}
return node
}
// Fetch the prestine block from disk and make a knode or inode out of it.
func (store *Store) FetchNode(fpos int64) Node {
var node Node
data := make([]byte, store.Blocksize)
if _, err := store.idxRfd.ReadAt(data, fpos); err != nil {
panic(err.Error())
}
b := (&block{}).newBlock(0, store.maxKeys())
b.gobDecode(data)
kn := knode{block: *b, fpos: fpos}
if b.isLeaf() {
node = &kn
} else {
node = &inode{knode: kn}
}
return node
}
// Maximum number of keys that are stored in a btree block, it is always an
// even number and adjusted for the additional value entry.
func (store *Store) maxKeys() int {
return int(store.wstore.head.maxkeys)
}
func calculateMaxKeys(blocksize int64) int64 {
return (blocksize - 16) / 24
}
func calculateMaxKeys_gob(blocksize int64) int64 {
max64 := int64(9223372036854775807 - 1)
start := int64(float64(blocksize-14) / (10.1875 * 3))
inc := int64(2)
for i := start; ; {
b := (&block{leaf: TRUE}).newBlock(int(i), int(i))
for j := int64(0); j < i; j++ {
b.ks[j] = max64
b.ds[j] = max64
b.vs[j] = max64
}
if int64(len(b.gobEncode())) > blocksize {
if inc > 4 {
i -= (inc / 2)
inc = 2
continue
}
max := (i - 2)
if (max % 2) == 0 { // fix max as even value.
return max
}
return max - 1
}
i += inc
inc *= 2
}
}
//---- local functions
func openWfd(file string, flag int, perm os.FileMode) *os.File {
if wfd, err := os.OpenFile(file, flag, perm); err != nil {
panic(err.Error())
} else {
return wfd
}
}
func openRfd(file string) *os.File {
if rfd, err := os.Open(file); err != nil {
panic(err.Error())
} else {
return rfd
}
}
func is_configSane(store *Store) bool {
wstore := store.wstore
if store.Sectorsize != wstore.Sectorsize {
return false
}
if store.Flistsize != wstore.Flistsize {
return false
}
if store.Blocksize != wstore.Blocksize {
return false
}
return true
}
//func BlockCalculate(store *Store) {
// fi, _ := store.idxRfd.Stat()
// size := fi.Size()
// count := (size - 1024 - (store.Flistsize * 2)) / store.Blocksize
//}