From 6a8ac6cf11aab5f7d60eb8ffc4eaf6b38053c9e4 Mon Sep 17 00:00:00 2001 From: Josh Leverette Date: Wed, 9 Jan 2019 22:55:47 -0800 Subject: [PATCH] remove some bottlenecks --- client.go | 22 +++++++++++----------- stream.go | 4 ++-- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/client.go b/client.go index 2515579..06e4d2c 100644 --- a/client.go +++ b/client.go @@ -14,12 +14,12 @@ type Client struct { flush http.Flusher write io.Writer ctx context.Context - events chan *Event + events chan Event closed bool waiter sync.WaitGroup lock sync.Mutex - lastFlush time.Time - lastWrite time.Time + lastFlush uint64 + lastWrite uint64 } // NewClient creates a client wrapping a response writer. @@ -30,7 +30,7 @@ type Client struct { // Returns nil on error. func NewClient(w http.ResponseWriter, req *http.Request) *Client { c := &Client{ - events: make(chan *Event, 100), + events: make(chan Event, 100), write: w, } @@ -62,11 +62,11 @@ func NewClient(w http.ResponseWriter, req *http.Request) *Client { // This does not block until the event has been sent, // however it could block if the event queue is full. // Returns an error if the Client has disconnected -func (c *Client) Send(ev *Event) error { +func (c *Client) Send(ev Event) error { if c.closed { return io.ErrClosedPipe } - c.events <- ev.Clone() + c.events <- ev return nil } @@ -74,12 +74,12 @@ func (c *Client) Send(ev *Event) error { // This guarantees not block until the event has been sent. // Returns true if blocked // Returns an error if the Client has disconnected -func (c *Client) SendNonBlocking(ev *Event) (bool, error) { +func (c *Client) SendNonBlocking(ev Event) (bool, error) { if c.closed { return false, io.ErrClosedPipe } select { - case c.events <- ev.Clone(): + case c.events <- ev: default: return true, nil } @@ -114,8 +114,8 @@ func (c *Client) run() { // send the event c.lock.Lock() - io.Copy(c.write, ev) - c.lastWrite = time.Now() + io.Copy(c.write, &ev) + c.lastWrite += 1 c.lock.Unlock() case <-done: @@ -137,7 +137,7 @@ func (c *Client) flusher() { if c.closed { break } - if c.lastFlush.Before(c.lastWrite) { + if c.lastFlush < c.lastWrite { c.lastFlush = c.lastWrite c.flush.Flush() } diff --git a/stream.go b/stream.go index c63b8b0..3ce78d7 100644 --- a/stream.go +++ b/stream.go @@ -85,7 +85,7 @@ func (s *Stream) Broadcast(e *Event) { defer s.listLock.RUnlock() for cli := range s.clients { - cli.Send(e) + cli.Send(*e.Clone()) } } @@ -127,7 +127,7 @@ func (s *Stream) Publish(topic string, e *Event) { for cli, topics := range s.clients { if topics[topic] { - cli.Send(e) + cli.Send(*e.Clone()) } } }