-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathstreams.go
177 lines (157 loc) · 5.1 KB
/
streams.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
/*
Copyright 2017-2018 Mikael Berthe
Licensed under the MIT license. Please see the LICENSE file is this directory.
*/
package madon
import (
"encoding/json"
"net/url"
"strings"
"github.com/gorilla/websocket"
"github.com/pkg/errors"
)
// StreamEvent contains a single event from the streaming API
type StreamEvent struct {
Event string // Name of the event (error, update, notification or delete)
Data interface{} // Status, Notification or status ID
Error error // Error message from the StreamListener
}
// openStream opens a stream URL and returns an http.Response
// Note that the caller should close the connection when it's done reading
// the stream.
// The stream name can be "user", "local", "public", "direct", "list" or
// "hashtag".
// When it is "hashtag", the param argument contains the hashtag.
// When it is "list", the param argument contains the list ID.
func (mc *Client) openStream(streamName, param string) (*websocket.Conn, error) {
var tag, list string
switch streamName {
case "public", "public:media", "public:local", "public:local:media", "public:remote", "public:remote:media", "user", "user:notification", "direct":
case "hashtag", "hashtag:local":
if param == "" {
return nil, ErrInvalidParameter
}
tag = param
case "list":
if param == "" {
return nil, ErrInvalidParameter
}
list = param
default:
return nil, ErrInvalidParameter
}
if !strings.HasPrefix(mc.APIBase, "http") {
return nil, errors.New("cannot create Websocket URL: unexpected API base URL")
}
// Build streaming websocket URL
u, err := url.Parse("ws" + mc.APIBase[4:] + "/v1/streaming/")
if err != nil {
return nil, errors.Wrap(err, "cannot create Websocket URL")
}
urlParams := url.Values{}
urlParams.Add("stream", streamName)
urlParams.Add("access_token", mc.UserToken.AccessToken)
if tag != "" {
urlParams.Add("tag", tag)
} else if list != "" {
urlParams.Add("list", list)
}
u.RawQuery = urlParams.Encode()
c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
return c, err
}
// readStream reads from the http.Response and sends events to the events channel
// It stops when the connection is closed or when the stopCh channel is closed.
// The foroutine will close the doneCh channel when it terminates.
func (mc *Client) readStream(events chan<- StreamEvent, stopCh <-chan bool, doneCh chan bool, c *websocket.Conn) {
defer c.Close()
defer close(doneCh)
go func() {
select {
case <-stopCh:
// Close connection
c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
case <-doneCh:
// Leave
}
}()
for {
var msg struct {
Event string
Payload interface{}
}
err := c.ReadJSON(&msg)
if err != nil {
if strings.Contains(err.Error(), "close 1000 (normal)") {
break // Connection properly closed
}
e := errors.Wrap(err, "read error")
events <- StreamEvent{Event: "error", Error: e}
break
}
var obj interface{}
// Decode API object
switch msg.Event {
case "update", "status.update":
strPayload, ok := msg.Payload.(string)
if !ok {
e := errors.New("could not decode status: payload isn't a string")
events <- StreamEvent{Event: "error", Error: e}
continue
}
var s Status
if err := json.Unmarshal([]byte(strPayload), &s); err != nil {
e := errors.Wrap(err, "could not decode status")
events <- StreamEvent{Event: "error", Error: e}
continue
}
obj = s
case "notification":
strPayload, ok := msg.Payload.(string)
if !ok {
e := errors.New("could not decode notification: payload isn't a string")
events <- StreamEvent{Event: "error", Error: e}
continue
}
var notif Notification
if err := json.Unmarshal([]byte(strPayload), ¬if); err != nil {
e := errors.Wrap(err, "could not decode notification")
events <- StreamEvent{Event: "error", Error: e}
continue
}
obj = notif
case "delete":
strPayload, ok := msg.Payload.(string)
if !ok {
e := errors.New("could not decode deletion: payload isn't a string")
events <- StreamEvent{Event: "error", Error: e}
continue
}
obj = strPayload // statusID
default:
e := errors.Errorf("unhandled event '%s'", msg.Event)
events <- StreamEvent{Event: "error", Error: e}
continue
}
// Send event to the channel
events <- StreamEvent{Event: msg.Event, Data: obj}
}
}
// StreamListener listens to a stream from the Mastodon server
// The stream 'name' can be "user", "local", "public" or "hashtag".
// For 'hashtag', the hashTag argument cannot be empty.
// The events are sent to the events channel (the errors as well).
// The streaming is terminated if the 'stopCh' channel is closed.
// The 'doneCh' channel is closed if the connection is closed by the server.
// Please note that this method launches a goroutine to listen to the events.
func (mc *Client) StreamListener(name, hashTag string, events chan<- StreamEvent, stopCh <-chan bool, doneCh chan bool) error {
if mc == nil {
return ErrUninitializedClient
}
conn, err := mc.openStream(name, hashTag)
if err != nil {
return err
}
go mc.readStream(events, stopCh, doneCh, conn)
return nil
}