-
Notifications
You must be signed in to change notification settings - Fork 0
/
lp.go
122 lines (113 loc) · 2.81 KB
/
lp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package jsonstream
import (
"bufio"
"encoding/json"
"fmt"
"io"
"strconv"
"unicode"
)
// LenPrefixedReader reads documents delimited with the
// length of the next record. The input looks like this:
//
// 18{"some":"thing\n"}55{"may":{"include":"nested","objects":["and","arrays"]}}
//
// This uses a bufio.Scanner to read the stream.
type LenPrefixedReader struct {
scn *bufio.Scanner
err error
}
// NewLenPrefixedReader returns a new reader
func NewLenPrefixedReader(r io.Reader) LenPrefixedReader {
ret := LenPrefixedReader{scn: bufio.NewScanner(r)}
ret.scn.Split(bufio.ScanBytes)
return ret
}
// ReadRaw reads the next raw document. The returned buffer is a copy
// of the read buffer, so subsequent read calls will not overwrite the
// buffer. If the end if the stream is reached, returns io.EOF. If
// another error is detected, stream is put into error state and
// subsequent calls will fail.
func (r LenPrefixedReader) ReadRaw() ([]byte, error) {
if r.err != nil {
return nil, r.err
}
sz := make([]byte, 0, 10)
out := make([]byte, 0)
readingLen := true
len := 0
var err error
for r.scn.Scan() {
if readingLen {
c := r.scn.Bytes()[0]
if unicode.IsDigit(rune(c)) {
sz = append(sz, c)
} else {
readingLen = false
len, err = strconv.Atoi(string(sz))
if err != nil {
r.err = err
return nil, err
}
sz = make([]byte, 0, 10)
out = append(out, c)
len--
}
} else {
if len > 0 {
out = append(out, r.scn.Bytes()[0])
len--
if len == 0 {
return out, nil
}
}
}
}
err = r.scn.Err()
if err != nil {
r.err = err
return nil, err
}
return nil, io.EOF
}
// Unmarshal the next document from the input. Returns unmarshal
// error if there is any. If the end of stream is reached, returns
// io.EOF
func (r LenPrefixedReader) Unmarshal(out interface{}) error {
o, err := r.ReadRaw()
if err != nil {
return err
}
return json.Unmarshal(o, &out)
}
// LenPrefixedWriter streams JSON documents by adding the byte length
// of the JSON document before it.
type LenPrefixedWriter struct {
w io.Writer
}
// NewLenPrefixedWriter returns a new writer
func NewLenPrefixedWriter(w io.Writer) LenPrefixedWriter {
return LenPrefixedWriter{w: w}
}
// WriteRaw writes the length of data and then data. It assumes that
// the 'data' is a valid JSON document. If len(data)==0, returns
// error. Don't pass empty documents.
func (w LenPrefixedWriter) WriteRaw(data []byte) error {
if len(data) == 0 {
return fmt.Errorf("Empty doc")
}
_, err := w.w.Write([]byte(strconv.Itoa(len(data))))
if err != nil {
return err
}
_, err = w.w.Write(data)
return err
}
// Marshal writes the object as length-prefixed JSON to output
func (w LenPrefixedWriter) Marshal(data interface{}) error {
x, err := json.Marshal(data)
if err != nil {
return err
}
return w.WriteRaw(x)
}