Skip to content

Commit

Permalink
add watch status monitor api
Browse files Browse the repository at this point in the history
  • Loading branch information
wongoo committed Oct 18, 2019
1 parent bd809cd commit 82f13da
Show file tree
Hide file tree
Showing 20 changed files with 110 additions and 26 deletions.
2 changes: 1 addition & 1 deletion codec.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The vogo Authors. All rights reserved.
// Copyright 2018-2019 The vogo Authors. All rights reserved.
// author: wongoo
// since: 2018/12/27
//
Expand Down
2 changes: 1 addition & 1 deletion consts.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The vogo Authors. All rights reserved.
// Copyright 2018-2019 The vogo Authors. All rights reserved.
// author: wongoo
// since: 2018/12/27
//
Expand Down
2 changes: 1 addition & 1 deletion doc.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The vogo Authors. All rights reserved.
// Copyright 2018-2019 The vogo Authors. All rights reserved.
// author: wongoo
// since: 2018/12/27

Expand Down
2 changes: 1 addition & 1 deletion err.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The vogo Authors. All rights reserved.
// Copyright 2018-2019 The vogo Authors. All rights reserved.
// author: wongoo
// since: 2018/12/27
//
Expand Down
2 changes: 1 addition & 1 deletion examples/syncdemo.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The vogo Authors. All rights reserved.
// Copyright 2018-2019 The vogo Authors. All rights reserved.
// author: wongoo

package main
Expand Down
2 changes: 1 addition & 1 deletion get.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The vogo Authors. All rights reserved.
// Copyright 2018-2019 The vogo Authors. All rights reserved.
// author: wongoo
// since: 2018/12/27
//
Expand Down
2 changes: 1 addition & 1 deletion handler.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The vogo Authors. All rights reserved.
// Copyright 2018-2019 The vogo Authors. All rights reserved.
// author: wongoo
// since: 2018/12/27
//
Expand Down
2 changes: 1 addition & 1 deletion json.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The vogo Authors. All rights reserved.
// Copyright 2018-2019 The vogo Authors. All rights reserved.
// author: wongoo
// since: 2018/12/27
//
Expand Down
2 changes: 1 addition & 1 deletion map.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The vogo Authors. All rights reserved.
// Copyright 2018-2019 The vogo Authors. All rights reserved.
// author: wongoo
// since: 2018/12/27
//
Expand Down
2 changes: 1 addition & 1 deletion options.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The vogo Authors. All rights reserved.
// Copyright 2018-2019 The vogo Authors. All rights reserved.
// author: wongoo

package zkclient
Expand Down
2 changes: 1 addition & 1 deletion set.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The vogo Authors. All rights reserved.
// Copyright 2018-2019 The vogo Authors. All rights reserved.
// author: wongoo
// since: 2018/12/27
//
Expand Down
2 changes: 1 addition & 1 deletion set_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The vogo Authors. All rights reserved.
// Copyright 2018-2019 The vogo Authors. All rights reserved.
// author: wongoo
// since: 2019/10/12
//
Expand Down
2 changes: 1 addition & 1 deletion string.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The vogo Authors. All rights reserved.
// Copyright 2018-2019 The vogo Authors. All rights reserved.
// author: wongoo
// since: 2019/10/07
//
Expand Down
2 changes: 1 addition & 1 deletion sync.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The vogo Authors. All rights reserved.
// Copyright 2018-2019 The vogo Authors. All rights reserved.
// author: wongoo
// since: 2018/12/27
//
Expand Down
2 changes: 1 addition & 1 deletion sync_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The vogo Authors. All rights reserved.
// Copyright 2018-2019 The vogo Authors. All rights reserved.
// author: wongoo
// since: 2019/10/12
//
Expand Down
2 changes: 1 addition & 1 deletion util.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The vogo Authors. All rights reserved.
// Copyright 2018-2019 The vogo Authors. All rights reserved.
// author: wongoo
// since: 2018/12/27
//
Expand Down
30 changes: 28 additions & 2 deletions watcher.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The vogo Authors. All rights reserved.
// Copyright 2018-2019 The vogo Authors. All rights reserved.
// author: wongoo
// since: 2018/12/21
//
Expand All @@ -7,6 +7,8 @@ package zkclient

import (
"errors"
"sync"
"sync/atomic"

"github.com/samuel/go-zookeeper/zk"
"github.com/vogo/logger"
Expand All @@ -32,9 +34,11 @@ type ChildListener interface {

// Watcher zookeeper watcher
type Watcher struct {
sync.Mutex
client *Client
handler EventHandler
done chan struct{}
alive int32
}

// NewWatcher create new watcher
Expand All @@ -47,12 +51,30 @@ func (cli *Client) NewWatcher(handler EventHandler) (*Watcher, error) {
client: cli,
handler: handler,
done: make(chan struct{}),
alive: 0,
}, nil
}

// Done chan
func (w *Watcher) Done() <-chan struct{} {
return w.done
}

// Alive whether watcher is watching
func (w *Watcher) Alive() bool {
return atomic.LoadInt32(&w.alive) == 1
}

// Close close watch event
func (w *Watcher) Close() {
close(w.done)
w.Lock()
defer w.Unlock()

select {
case <-w.done:
default:
close(w.done)
}
}

// Watch start watch event
Expand All @@ -61,6 +83,9 @@ func (w *Watcher) Watch() {
path := w.handler.Path()
logger.Debugf("zk watcher [%s] start", path)

atomic.StoreInt32(&w.alive, 1)
defer atomic.StoreInt32(&w.alive, 0)

var (
evt *zk.Event
ch <-chan zk.Event
Expand Down Expand Up @@ -89,6 +114,7 @@ func (w *Watcher) Watch() {
select {
case <-w.client.done:
logger.Debugf("zk watcher [%s] exit for client closed", path)
w.Close()
return
case <-w.done:
logger.Debugf("zk watcher [%s] exit for watcher closed", path)
Expand Down
46 changes: 46 additions & 0 deletions watcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2018-2019 The vogo Authors. All rights reserved.
// author: wongoo
// since: 2019/10/18

package zkclient

import (
"errors"
"testing"
"time"
)

func TestWatcherClose(t *testing.T) {
c1, w, err := createClientWatcher(t)
if err != nil {
return
}

time.Sleep(time.Second)

c1.Close()
w.Close()

c1, w, err = createClientWatcher(t)
if err != nil {
return
}

time.Sleep(time.Second)

w.Close()
c1.Close()
}

func createClientWatcher(t *testing.T) (*Client, *Watcher, error) {
c1 := connectLocalZK(t)
if c1 == nil {
return nil, nil, errors.New("can't conn zk")
}

path := "/test/watcher_close_users"
users := make(map[string]*user)
w, err := c1.SyncWatchJSONMap(path, users, true, &mListener{})

return c1, w, err
}
26 changes: 19 additions & 7 deletions zkclient.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The vogo Authors. All rights reserved.
// Copyright 2018-2019 The vogo Authors. All rights reserved.
// author: wongoo
// since: 2018/12/3
//
Expand All @@ -21,8 +21,8 @@ type AlarmTrigger func(err error)

// Client for zookeeper
type Client struct {
sync.Mutex
ClientOptions
lock sync.Mutex
servers []string
conn *zk.Conn
done chan struct{}
Expand Down Expand Up @@ -84,8 +84,8 @@ func (cli *Client) startConnMaintainer() {

// collectDeadWatchers return queued watchers, and empty the queue
func (cli *Client) collectDeadWatchers() []*Watcher {
cli.lock.Lock()
defer cli.lock.Unlock()
cli.Lock()
defer cli.Unlock()

watchers := cli.deadWatchers
cli.deadWatchers = []*Watcher{}
Expand All @@ -95,8 +95,8 @@ func (cli *Client) collectDeadWatchers() []*Watcher {

// AppendDeadWatcher add dead watcher, wait to watch again
func (cli *Client) AppendDeadWatcher(watcher *Watcher) {
cli.lock.Lock()
defer cli.lock.Unlock()
cli.Lock()
defer cli.Unlock()

logger.Debugf("zk watcher append to dead queue: %s", watcher.handler.Path())
watcher.client = cli
Expand Down Expand Up @@ -128,7 +128,19 @@ func (cli *Client) connect() error {

// Close client, NOT use Client which already calling Close()
func (cli *Client) Close() {
close(cli.done)
cli.Lock()
defer cli.Unlock()

select {
case <-cli.done:
default:
close(cli.done)
}

// notify dead watchers
for _, watcher := range cli.deadWatchers {
watcher.Close()
}

if cli.ConnAlive() {
cli.conn.Close()
Expand Down
2 changes: 1 addition & 1 deletion zkclient_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The vogo Authors. All rights reserved.
// Copyright 2018-2019 The vogo Authors. All rights reserved.
// author: wongoo
// since: 2019/10/12
//
Expand Down

0 comments on commit 82f13da

Please sign in to comment.