forked from dbaseqp/Quotient
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsse.go
128 lines (106 loc) · 2.56 KB
/
sse.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package main
import (
// "io"
"encoding/json"
"io"
"log"
"net/http"
// "os"
"github.com/gin-gonic/gin"
"github.com/pkg/errors"
)
type ClientChan chan gin.H
type Event struct {
// Events are pushed to this channel by the main events-gathering routine
Message chan gin.H
// New client connections
NewClients chan ClientChan
// Closed client connections
ClosedClients chan ClientChan
// Total client connections
TotalClients map[ClientChan]bool
}
func NewSSEServer() (event *Event) {
event = &Event{
Message: make(chan gin.H),
NewClients: make(chan ClientChan),
ClosedClients: make(chan ClientChan),
TotalClients: make(map[ClientChan]bool),
}
go event.listen()
return
}
func (stream *Event) listen() {
for {
select {
// Add new available client
case client := <-stream.NewClients:
stream.TotalClients[client] = true
// Remove closed client
case client := <-stream.ClosedClients:
delete(stream.TotalClients, client)
close(client)
// Broadcast message to client
case eventMsg := <-stream.Message:
for clientMessageChan := range stream.TotalClients {
clientMessageChan <- eventMsg
}
}
}
}
func (stream *Event) ServeHTTP() gin.HandlerFunc {
return func(c *gin.Context) {
// Initialize client channel
clientChan := make(ClientChan)
// Send new connection to event server
stream.NewClients <- clientChan
defer func() {
// Send closed connection to event server
stream.ClosedClients <- clientChan
}()
c.Set("clientChan", clientChan)
c.Next()
}
}
func SendSSE(data gin.H) {
if stream != nil {
stream.Message <- data
}
}
func sse(c *gin.Context) {
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
c.Header("Access-Control-Allow-Origin", "*")
c.Header("Transfer-Encoding", "chunked")
claims, err := contextGetClaims(c)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
// Continuously send messages to the client
v, ok := c.Get("clientChan")
if !ok {
return
}
clientChan, ok := v.(ClientChan)
if !ok {
return
}
c.Stream(func(w io.Writer) bool {
// Stream message to client from message channel
if data, ok := <-clientChan; ok {
jsonString, err := json.Marshal(data)
if err != nil {
log.Printf("%s: %+v", errors.Wrap(err, "SSE json error").Error(), data)
return false
}
// send the message through the available channel
if !data["admin"].(bool) || claims.Admin {
c.SSEvent("message", string(jsonString))
return true
}
}
return false
})
}