From 84f02538056e909880277d54012f21ae8f78c4d1 Mon Sep 17 00:00:00 2001 From: Marwan Sulaiman Date: Mon, 10 Aug 2020 22:49:05 -0400 Subject: [PATCH] sync writes --- client/client.go | 37 ++++++++++++++++++++++++++++--------- 1 file changed, 28 insertions(+), 9 deletions(-) diff --git a/client/client.go b/client/client.go index bf8738e..d8a11a9 100644 --- a/client/client.go +++ b/client/client.go @@ -36,25 +36,40 @@ func New() (*Client, error) { return nil, fmt.Errorf("error connecting to iTerm2: %v", err) } cl := &Client{ - c: c, - rpcs: make(map[int64]chan<- *api.ServerOriginatedMessage), + c: c, + rpcs: make(map[int64]chan<- *api.ServerOriginatedMessage), + writeCh: make(chan writeReq), } ctx, cancel := context.WithCancel(context.Background()) cl.cancel = cancel - go cl.worker(ctx) + go cl.readWorker(ctx) + go cl.writeWorker() return cl, nil } // Client wraps a websocket client connection to iTerm2. // Must be instantiated with NewClient. type Client struct { - c *websocket.Conn - rpcs map[int64]chan<- *api.ServerOriginatedMessage - mu sync.Mutex - cancel context.CancelFunc + c *websocket.Conn + rpcs map[int64]chan<- *api.ServerOriginatedMessage + mu sync.Mutex + cancel context.CancelFunc + writeCh chan writeReq } -func (c *Client) worker(ctx context.Context) { +type writeReq struct { + msg []byte + resp chan error +} + +func (c *Client) writeWorker() { + for req := range c.writeCh { + err := c.c.WriteMessage(websocket.BinaryMessage, req.msg) + req.resp <- err + } +} + +func (c *Client) readWorker(ctx context.Context) { for { _, msg, err := c.c.ReadMessage() if ctx.Err() != nil { @@ -93,7 +108,9 @@ func (c *Client) Call(req *api.ClientOriginatedMessage) (*api.ServerOriginatedMe if err != nil { return nil, err } - err = c.c.WriteMessage(websocket.BinaryMessage, msg) + wr := writeReq{msg: msg, resp: make(chan error, 1)} + c.writeCh <- wr + err = <-wr.resp if err != nil { return nil, fmt.Errorf("error writing to websocket: %w", err) } @@ -107,6 +124,8 @@ func (c *Client) Call(req *api.ClientOriginatedMessage) (*api.ServerOriginatedMe // Close closes the websocket connection // and frees any goroutine resources func (c *Client) Close() error { + // TODO: if a *Client.Call is in flight, this will cause it to panic + close(c.writeCh) c.cancel() return c.c.Close() }