-
Notifications
You must be signed in to change notification settings - Fork 0
/
ndlines.go
88 lines (79 loc) · 2.06 KB
/
ndlines.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package jsonstream
import (
"bufio"
"encoding/json"
"io"
)
// LineReader reads one complete JSON document from each line.
type LineReader struct {
scn *bufio.Scanner
}
// NewLineReader returns a new lines reader. Each line must be a
// valid document. This uses a bufio.Scanner to read lines.
func NewLineReader(r io.Reader) LineReader {
return LineReader{scn: bufio.NewScanner(r)}
}
// ReadRaw reads the next line of the input. The returned buffer is a
// copy of the read buffer, so subsequent read calls will not
// overwrite the buffer. If the end if the stream is reached, returns
// io.EOF. It does not validate that the line is a valid JSON document.
func (r LineReader) ReadRaw() ([]byte, error) {
if r.scn.Scan() {
o := r.scn.Bytes()
out := make([]byte, len(o))
copy(out, o)
return out, nil
}
err := r.scn.Err()
if err == nil {
return nil, io.EOF
}
return nil, err
}
// Unmarshal the next document from the input. Returns unmarshal error
// if there is any. If the end of stream is reached, returns
// io.EOF. This reader can continue reading lines even if the previous
// line had an error.
func (r LineReader) Unmarshal(out interface{}) error {
if r.scn.Scan() {
o := r.scn.Bytes()
return json.Unmarshal(o, &out)
}
err := r.scn.Err()
if err == nil {
return io.EOF
}
return err
}
// LineWriter streams one JSON document every line
type LineWriter struct {
w io.Writer
}
// NewLineWriter returns a new writer
func NewLineWriter(w io.Writer) LineWriter {
return LineWriter{w: w}
}
// WriteRaw writes data followed by newline. Any new line characterts
// in data are removed.
func (w LineWriter) WriteRaw(data []byte) error {
bw := bufio.NewWriter(w.w)
for _, c := range data {
if c != '\n' {
if err := bw.WriteByte(c); err != nil {
return err
}
}
}
if err := bw.WriteByte('\n'); err != nil {
return err
}
return bw.Flush()
}
// Marshal writes data as a JSON document followed by newline.
func (w LineWriter) Marshal(data interface{}) error {
x, err := json.Marshal(data)
if err != nil {
return err
}
return w.WriteRaw(x)
}