-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbroker.go
138 lines (112 loc) · 2.64 KB
/
broker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package ps
import (
"slices"
"sync"
)
// Broker is a pub/sub coördination point for values of type T. See the Publish,
// Subscribe, and Unsubscribe methods for more information.
type Broker[T any] struct {
mtx sync.Mutex
subs []*subscriber[T]
}
// NewBroker returns a new broker for values of type T.
func NewBroker[T any]() *Broker[T] {
return &Broker[T]{
//
}
}
// Publish the given value to all active and matching subscribers. Each send is
// non-blocking, so values are dropped when subscribers aren't keeping up. Also,
// values are sent directly, so be mindful of copy costs and semantics. Returned
// stats reflect the outcome for all active subscribers at the time of the
// publish.
func (b *Broker[T]) Publish(v T) Stats {
b.mtx.Lock()
defer b.mtx.Unlock()
var stats Stats
for _, s := range b.subs {
if !s.allow(v) {
s.stats.Skips++
stats.Skips++
} else {
select {
case s.c <- v:
s.stats.Sends++
stats.Sends++
default:
s.stats.Drops++
stats.Drops++
}
}
}
return stats
}
// Subscribe adds c to the broker, and forwards every published value that
// passes the allow func to c.
func (b *Broker[T]) Subscribe(c chan<- T, allow func(T) bool) error {
if allow == nil {
allow = func(T) bool { return true }
}
b.mtx.Lock()
defer b.mtx.Unlock()
for _, s := range b.subs {
if s.c == c {
return ErrAlreadySubscribed
}
}
s := &subscriber[T]{
c: c,
allow: allow,
}
b.subs = append(b.subs, s)
return nil
}
// SubscribeAll subscribes to every published value.
func (b *Broker[T]) SubscribeAll(c chan<- T) error {
return b.Subscribe(c, nil)
}
// Unsubscribe removes the given channel from the broker.
func (b *Broker[T]) Unsubscribe(c chan<- T) (Stats, error) {
b.mtx.Lock()
defer b.mtx.Unlock()
var target *subscriber[T]
for _, s := range b.subs {
if s.c == c {
target = s
break
}
}
if target == nil {
return Stats{}, ErrNotSubscribed
}
b.subs = slices.DeleteFunc(b.subs, func(s *subscriber[T]) bool {
return s == target
})
return target.stats, nil
}
// Stats returns current statistics for the subscription represented by c.
func (b *Broker[T]) Stats(c chan<- T) (Stats, error) {
b.mtx.Lock()
defer b.mtx.Unlock()
for _, s := range b.subs {
if s.c == c {
return s.stats, nil
}
}
return Stats{}, ErrNotSubscribed
}
// ActiveSubscribers returns statistics for every active subscriber.
func (b *Broker[T]) ActiveSubscribers() []Stats {
b.mtx.Lock()
defer b.mtx.Unlock()
res := make([]Stats, len(b.subs))
for i := range b.subs {
res[i] = b.subs[i].stats
}
return res
}
type subscriber[T any] struct {
c chan<- T
allow func(T) bool
stats Stats
}