Skip to content

Commit

Permalink
add watch only api
Browse files Browse the repository at this point in the history
  • Loading branch information
wongoo committed Oct 14, 2019
1 parent 1e6cb21 commit 1e07fff
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 60 deletions.
3 changes: 3 additions & 0 deletions consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

package zkclient

import "reflect"

var (
nilStruct = struct{}{}
nilValue = reflect.Value{}
)
4 changes: 2 additions & 2 deletions doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
//
//- `Codec`: value encode/decode
//- `Watcher`: loop watch control
//- `Handler`: include `ValueHandler` and `MapHandler`, set/get value, handle event, synchronize value, trigger listener
//- `Listener`: include `ValueListener` and `ChildListener`, listen value change
//- `Handler`: include `valueHandler` and `mapHandler`, set/get/delete value, handle event, synchronize value, trigger listener
//- `Listener`: include `ValueListener` and `ChildListener`, listen value updated/deleted
//
//## API
//
Expand Down
59 changes: 37 additions & 22 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,15 @@ import (
"github.com/vogo/logger"
)

type ValueHandler struct {
type valueHandler struct {
path string
value reflect.Value
typ reflect.Type
codec Codec
listener ValueListener
}

func StringValueHandler(path string, s *string, listener ValueListener) *ValueHandler {
return &ValueHandler{
path: path,
value: reflect.ValueOf(s),
typ: reflect.TypeOf(s),
codec: stringCodec,
listener: listener,
}
}

func NewValueHandler(path string, obj interface{}, codec Codec, listener ValueListener) (*ValueHandler, error) {
func newValueHandler(path string, obj interface{}, codec Codec, watchOnly bool, listener ValueListener) (*valueHandler, error) {
if path == "" {
return nil, errors.New("path required")
}
Expand All @@ -50,38 +40,53 @@ func NewValueHandler(path string, obj interface{}, codec Codec, listener ValueLi
return nil, errors.New("codec required")
}

return &ValueHandler{
if watchOnly && listener == nil {
return nil, errors.New("listener required when watch only")
}

handler := &valueHandler{
path: path,
value: reflect.ValueOf(obj),
typ: typ,
codec: codec,
listener: listener,
}, nil
}

if !watchOnly {
handler.value = reflect.ValueOf(obj)
}

return handler, nil
}

func (h *ValueHandler) Encode() ([]byte, error) {
func (h *valueHandler) Encode() ([]byte, error) {
if h.value == nilValue {
return nil, nil
}

return h.codec.Encode(h.value.Interface())
}

func (h *ValueHandler) Decode(data []byte) error {
func (h *valueHandler) Decode(data []byte) error {
v, err := h.codec.Decode(data, h.typ)
if err != nil {
return err
}

h.value.Elem().Set(v.Elem())
if h.value != nilValue {
h.value.Elem().Set(v.Elem())
}

if h.listener != nil {
go func() {
h.listener(h.path, h.value.Interface())
h.listener.Update(h.path, h.value.Interface())
}()
}

return nil
}

// SetTo set value in zookeeper
func (h *ValueHandler) SetTo(cli *Client, path string) error {
func (h *valueHandler) SetTo(cli *Client, path string) error {
bytes, err := h.Encode()
if err != nil {
return err
Expand All @@ -90,11 +95,21 @@ func (h *ValueHandler) SetTo(cli *Client, path string) error {
return cli.SetRawValue(path, bytes)
}

func (h *ValueHandler) Path() string {
func (h *valueHandler) Path() string {
return h.path
}

func (h *ValueHandler) Handle(w *Watcher, evt *zk.Event) (<-chan zk.Event, error) {
func (h *valueHandler) Handle(w *Watcher, evt *zk.Event) (<-chan zk.Event, error) {
if evt != nil && evt.Type == zk.EventNodeDeleted {
logger.Infof("zk watcher [%s] node deleted", h.path)

if h.listener != nil {
h.listener.Delete(h.path)
}

return nil, nil
}

data, _, wch, err := w.client.Conn().GetW(h.path)
if err != nil {
if err == zk.ErrNoNode {
Expand Down
71 changes: 49 additions & 22 deletions map.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,18 @@ import (
"github.com/vogo/logger"
)

type MapHandler struct {
type mapHandler struct {
path string
lock sync.Mutex
value reflect.Value
typ reflect.Type
syncChild bool
codec Codec
syncChild bool
listener ChildListener
children map[string]struct{}
}

func NewMapHandler(path string, obj interface{}, syncChild bool, codec Codec, h ChildListener) (*MapHandler, error) {
func newMapHandler(path string, obj interface{}, syncChild bool, codec Codec, watchOnly bool, listener ChildListener) (*mapHandler, error) {
if path == "" {
return nil, errors.New("path required")
}
Expand Down Expand Up @@ -55,19 +55,32 @@ func NewMapHandler(path string, obj interface{}, syncChild bool, codec Codec, h
return nil, errors.New("codec required")
}

return &MapHandler{
if watchOnly && listener == nil {
return nil, errors.New("listener required when watch only")
}

handler := &mapHandler{
path: path,
value: reflect.ValueOf(obj),
typ: valueTyp,
syncChild: syncChild,
codec: codec,
lock: sync.Mutex{},
listener: h,
listener: listener,
children: make(map[string]struct{}),
}, nil
}

if !watchOnly {
handler.value = reflect.ValueOf(obj)
}

return handler, nil
}

func (h *MapHandler) Encode(key string) ([]byte, error) {
func (h *mapHandler) Encode(key string) ([]byte, error) {
if h.value == nilValue {
return nil, io.EOF
}

v := h.value.MapIndex(reflect.ValueOf(key))
if v.IsNil() {
return nil, io.EOF
Expand All @@ -76,7 +89,7 @@ func (h *MapHandler) Encode(key string) ([]byte, error) {
return h.codec.Encode(v.Interface())
}

func (h *MapHandler) Decode(key string, data []byte) error {
func (h *mapHandler) Decode(key string, data []byte) error {
h.lock.Lock()
defer h.lock.Unlock()

Expand All @@ -85,35 +98,49 @@ func (h *MapHandler) Decode(key string, data []byte) error {
return err
}

h.value.SetMapIndex(reflect.ValueOf(key), v)
if h.value != nilValue {
h.value.SetMapIndex(reflect.ValueOf(key), v)
}

if h.listener != nil {
go func() {
h.listener(h.path, key, v.Interface())
h.listener.Update(h.path, key, v.Interface())
}()
}

return nil
}

func (h *MapHandler) Delete(key string) {
func (h *mapHandler) Delete(key string) {
h.lock.Lock()
defer h.lock.Unlock()

h.value.SetMapIndex(reflect.ValueOf(key), reflect.Value{})
if h.value != nilValue {
h.value.SetMapIndex(reflect.ValueOf(key), reflect.Value{})
}

if h.listener != nil {
go func() {
h.listener(h.path, key, nil)
h.listener.Delete(h.path, key)
}()
}
}

func (h *MapHandler) Path() string {
func (h *mapHandler) Path() string {
return h.path
}

func (h *MapHandler) Handle(w *Watcher, evt *zk.Event) (<-chan zk.Event, error) {
func (h *mapHandler) Handle(w *Watcher, evt *zk.Event) (<-chan zk.Event, error) {
if evt != nil && evt.Type == zk.EventNodeDeleted {
logger.Infof("zk watcher [%s] node deleted", h.path)

for child := range h.children {
h.Delete(child)
}

return nil, nil
}

children, _, wch, err := w.client.Conn().ChildrenW(h.path)
if err != nil {
if err == zk.ErrNoNode {
Expand Down Expand Up @@ -150,8 +177,8 @@ func (h *MapHandler) Handle(w *Watcher, evt *zk.Event) (<-chan zk.Event, error)
}

type childHandler struct {
path string
mapHandler *MapHandler
path string
handler *mapHandler
}

func (ch *childHandler) Path() string {
Expand All @@ -163,10 +190,10 @@ func (ch *childHandler) Handle(w *Watcher, evt *zk.Event) (<-chan zk.Event, erro
return nil, nil // return nil chan to exit watching
}

return ch.mapHandler.handleChild(w.client, ch.path)
return ch.handler.handleChild(w.client, ch.path)
}

func (h *MapHandler) syncWatchChild(w *Watcher, child string) {
func (h *mapHandler) syncWatchChild(w *Watcher, child string) {
childPath := h.path + "/" + child

if !h.syncChild {
Expand All @@ -177,12 +204,12 @@ func (h *MapHandler) syncWatchChild(w *Watcher, child string) {
return
}

childWatcher := w.newChildWatcher(&childHandler{path: childPath, mapHandler: h})
childWatcher := w.newChildWatcher(&childHandler{path: childPath, handler: h})
childWatcher.Watch()
}

// handleChild load map child value into packMap, and return the event chan for waiting the next event
func (h *MapHandler) handleChild(client *Client, childPath string) (<-chan zk.Event, error) {
func (h *mapHandler) handleChild(client *Client, childPath string) (<-chan zk.Event, error) {
var (
data []byte
err error
Expand Down
48 changes: 42 additions & 6 deletions sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,25 @@ func (cli *Client) Sync(path string, obj interface{}, codec Codec) (*Watcher, er

// SyncWatch synchronize value of the path to obj, and trigger listener when value change
func (cli *Client) SyncWatch(path string, obj interface{}, codec Codec, listener ValueListener) (*Watcher, error) {
handler, err := NewValueHandler(path, obj, codec, nil)
handler, err := newValueHandler(path, obj, codec, false, listener)
if err != nil {
return nil, err
}

return cli.createWatcher(handler)
}

// Watch synchronize value of the path to obj, and trigger listener when value change
func (cli *Client) Watch(path string, obj interface{}, codec Codec, listener ValueListener) (*Watcher, error) {
handler, err := newValueHandler(path, obj, codec, true, listener)
if err != nil {
return nil, err
}

return cli.createWatcher(handler)
}

func (cli *Client) createWatcher(handler EventHandler) (*Watcher, error) {
watcher, err := cli.NewWatcher(handler)

if err != nil {
Expand All @@ -38,27 +52,39 @@ func (cli *Client) SyncWatchString(path string, s *string, listener ValueListene
return cli.SyncWatch(path, s, stringCodec, listener)
}

// WatchJSON watch json value of the path to obj, and trigger listener when value change
func (cli *Client) WatchJSON(path string, obj interface{}, listener ValueListener) (*Watcher, error) {
return cli.Watch(path, obj, jsonCodec, listener)
}

// WatchJSON watch string value of the path to obj, and trigger listener when value change
func (cli *Client) WatchString(path string, s *string, listener ValueListener) (*Watcher, error) {
return cli.Watch(path, s, stringCodec, listener)
}

// SyncMap synchronize sub-path value into a map
func (cli *Client) SyncMap(path string, m interface{}, valueCodec Codec, syncChild bool) (*Watcher, error) {
return cli.SyncWatchMap(path, m, valueCodec, syncChild, nil)
}

// SyncWatchMap synchronize sub-path value into a map, and trigger listener when child value change
func (cli *Client) SyncWatchMap(path string, m interface{}, valueCodec Codec, syncChild bool, listener ChildListener) (*Watcher, error) {
mapHandler, err := NewMapHandler(path, m, syncChild, valueCodec, listener)
handler, err := newMapHandler(path, m, syncChild, valueCodec, false, listener)
if err != nil {
return nil, err
}

watcher, err := cli.NewWatcher(mapHandler)
return cli.createWatcher(handler)
}

// SyncWatchMap synchronize sub-path value into a map, and trigger listener when child value change
func (cli *Client) WatchMap(path string, m interface{}, valueCodec Codec, syncChild bool, listener ChildListener) (*Watcher, error) {
handler, err := newMapHandler(path, m, syncChild, valueCodec, true, listener)
if err != nil {
return nil, err
}

watcher.Watch()

return watcher, nil
return cli.createWatcher(handler)
}

// SyncWatchJSONMap synchronize sub-path json value into a map, and trigger listener when child value change
Expand All @@ -70,3 +96,13 @@ func (cli *Client) SyncWatchJSONMap(path string, m interface{}, syncChild bool,
func (cli *Client) SyncWatchStringMap(path string, m map[string]string, syncChild bool, listener ChildListener) (*Watcher, error) {
return cli.SyncWatchMap(path, m, stringCodec, syncChild, listener)
}

// WatchJSONMap watch sub-path json value into a map, and trigger listener when child value change
func (cli *Client) WatchJSONMap(path string, m interface{}, syncChild bool, listener ChildListener) (*Watcher, error) {
return cli.WatchMap(path, m, jsonCodec, syncChild, listener)
}

// WatchStringMap watch sub-path string value into a map, and trigger listener when child value change
func (cli *Client) WatchStringMap(path string, m map[string]string, syncChild bool, listener ChildListener) (*Watcher, error) {
return cli.WatchMap(path, m, stringCodec, syncChild, listener)
}
Loading

0 comments on commit 1e07fff

Please sign in to comment.