Skip to content

Commit

Permalink
remove some bottlenecks
Browse files Browse the repository at this point in the history
  • Loading branch information
coder543 committed Jan 10, 2019
1 parent 69bd65e commit 6a8ac6c
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 13 deletions.
22 changes: 11 additions & 11 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ type Client struct {
flush http.Flusher
write io.Writer
ctx context.Context
events chan *Event
events chan Event
closed bool
waiter sync.WaitGroup
lock sync.Mutex
lastFlush time.Time
lastWrite time.Time
lastFlush uint64
lastWrite uint64
}

// NewClient creates a client wrapping a response writer.
Expand All @@ -30,7 +30,7 @@ type Client struct {
// Returns nil on error.
func NewClient(w http.ResponseWriter, req *http.Request) *Client {
c := &Client{
events: make(chan *Event, 100),
events: make(chan Event, 100),
write: w,
}

Expand Down Expand Up @@ -62,24 +62,24 @@ func NewClient(w http.ResponseWriter, req *http.Request) *Client {
// This does not block until the event has been sent,
// however it could block if the event queue is full.
// Returns an error if the Client has disconnected
func (c *Client) Send(ev *Event) error {
func (c *Client) Send(ev Event) error {
if c.closed {
return io.ErrClosedPipe
}
c.events <- ev.Clone()
c.events <- ev
return nil
}

// Send queues an event to be sent to the client.
// This guarantees not block until the event has been sent.
// Returns true if blocked
// Returns an error if the Client has disconnected
func (c *Client) SendNonBlocking(ev *Event) (bool, error) {
func (c *Client) SendNonBlocking(ev Event) (bool, error) {
if c.closed {
return false, io.ErrClosedPipe
}
select {
case c.events <- ev.Clone():
case c.events <- ev:
default:
return true, nil
}
Expand Down Expand Up @@ -114,8 +114,8 @@ func (c *Client) run() {

// send the event
c.lock.Lock()
io.Copy(c.write, ev)
c.lastWrite = time.Now()
io.Copy(c.write, &ev)
c.lastWrite += 1
c.lock.Unlock()

case <-done:
Expand All @@ -137,7 +137,7 @@ func (c *Client) flusher() {
if c.closed {
break
}
if c.lastFlush.Before(c.lastWrite) {
if c.lastFlush < c.lastWrite {
c.lastFlush = c.lastWrite
c.flush.Flush()
}
Expand Down
4 changes: 2 additions & 2 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (s *Stream) Broadcast(e *Event) {
defer s.listLock.RUnlock()

for cli := range s.clients {
cli.Send(e)
cli.Send(*e.Clone())
}
}

Expand Down Expand Up @@ -127,7 +127,7 @@ func (s *Stream) Publish(topic string, e *Event) {

for cli, topics := range s.clients {
if topics[topic] {
cli.Send(e)
cli.Send(*e.Clone())
}
}
}
Expand Down

0 comments on commit 6a8ac6c

Please sign in to comment.