-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathgo_tx.go
72 lines (66 loc) · 1.66 KB
/
go_tx.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package gofast
import "fmt"
import "runtime/debug"
import "sync/atomic"
func (t *Transport) doTx() {
defer func() {
if r := recover(); r != nil {
errorf("doTx() panic: %v\n", r)
errorf("\n%s", getStackTrace(2, debug.Stack()))
go t.Close()
}
}()
batch := make([]*txproto, 0, 64)
tcpwriteBuf := make([]byte, t.batchsize*t.buffersize)
drainbuffers := func() {
atomic.AddUint64(&t.nFlushes, 1)
var err error
m, n := 0, 0
// consolidate.
for _, arg := range batch {
if len(arg.packet) > 0 {
//fmt.Println(hexstring(arg.packet))
n += copy(tcpwriteBuf[n:], arg.packet)
atomic.AddUint64(&t.nTx, 1)
}
}
// send.
if n > 0 {
//TODO: Issue #2, remove or prevent value escape to heap
//fmsg := "%v doTx() socket write %v:%v\n"
//debugf(fmsg, t.logprefix, n, tcpwriteBuf[:n])
m, err = t.conn.Write(tcpwriteBuf[:n])
if m != n {
err = fmt.Errorf("wrote only %d, expected %d", m, n)
}
}
atomic.AddUint64(&t.nTxbyte, uint64(m))
// unblock the callers.
for _, arg := range batch {
arg.n, arg.err = len(arg.packet), err
if arg.async {
arg.packet = arg.packet[:cap(arg.packet)]
t.pTxcmd <- arg
} else {
arg.respch <- arg
}
}
//TODO: Issue #2, remove or prevent value escape to heap
//debugf("%v drained %v packets\n", t.logprefix, len(batch))
batch = batch[:0] // reset the batch
}
infof("%v doTx(batch:%v) started ...\n", t.logprefix, t.batchsize)
loop:
for {
select {
case arg := <-t.txch:
batch = append(batch, arg)
if arg.flush || uint64(len(batch)) >= t.batchsize {
drainbuffers()
}
case <-t.killch:
break loop
}
}
infof("%v doTx() ... stopped\n", t.logprefix)
}