Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

[提问]Informer 中为什么需要引入 Resync 机制? #11

Open
lianghao208 opened this issue Aug 5, 2020 · 32 comments
Open

[提问]Informer 中为什么需要引入 Resync 机制? #11

lianghao208 opened this issue Aug 5, 2020 · 32 comments

Comments

@lianghao208
Copy link
Member

Resync 机制会将 Indexer 的本地缓存重新同步到 DeltaFIFO 队列中。一般我们会设置一个时间周期,让 Indexer 周期性地将缓存同步到队列中。直接 list/watch apiserver 就已经能拿到集群中资源对象变化的 event 了,这里引入 Resync 的作用是什么呢?去掉会有什么影响呢?

@abhty822
Copy link

abhty822 commented Aug 6, 2020

因为这些操作都是异步的 合理的sync可以提高事件消费的容错性

@lianghao208
Copy link
Member Author

因为这些操作都是异步的 合理的sync可以提高事件消费的容错性

感谢回答~ 能举个例子说说什么情况下会出现事件消费异常吗?

@abhty822
Copy link

abhty822 commented Aug 6, 2020

因为这些操作都是异步的 合理的sync可以提高事件消费的容错性

感谢回答~ 能举个例子说说什么情况下会出现事件消费异常吗?

嗯 ... 作为一个分布式的系统 排除代码之外 网络,基础设施等都有可能导致消费的异常,resync是一个很常规的操作(简单来说比如你的mq消费失败你会怎么做)

@lianghao208
Copy link
Member Author

因为这些操作都是异步的 合理的sync可以提高事件消费的容错性

感谢回答~ 能举个例子说说什么情况下会出现事件消费异常吗?

嗯 ... 作为一个分布式的系统 排除代码之外 网络,基础设施等都有可能导致消费的异常,resync是一个很常规的操作(简单来说比如你的mq消费失败你会怎么做)

这里的消费者指的是从 FIFODelta 队列中取 event 的 workQueue 和从 workQueue 取 key 的 control loop 吗?在我看来虽然是异步的操作,但是这里面消费者和消息队列是同一个服务,好像不会有网络之类的消费异常问题

@abhty822
Copy link

abhty822 commented Aug 6, 2020

因为这些操作都是异步的 合理的sync可以提高事件消费的容错性

感谢回答~ 能举个例子说说什么情况下会出现事件消费异常吗?

嗯 ... 作为一个分布式的系统 排除代码之外 网络,基础设施等都有可能导致消费的异常,resync是一个很常规的操作(简单来说比如你的mq消费失败你会怎么做)

这里的消费者指的是从 FIFODelta 队列中取 event 的 workQueue 和从 workQueue 取 key 的 control loop 吗?在我看来虽然是异步的操作,但是这里面消费者和消息队列是同一个服务,好像不会有网络之类的消费异常问题

因为这些操作都是异步的 合理的sync可以提高事件消费的容错性

感谢回答~ 能举个例子说说什么情况下会出现事件消费异常吗?

嗯 ... 作为一个分布式的系统 排除代码之外 网络,基础设施等都有可能导致消费的异常,resync是一个很常规的操作(简单来说比如你的mq消费失败你会怎么做)

这里的消费者指的是从 FIFODelta 队列中取 event 的 workQueue 和从 workQueue 取 key 的 control loop 吗?在我看来虽然是异步的操作,但是这里面消费者和消息队列是同一个服务,好像不会有网络之类的消费异常问题

以前看过的一篇文章 https://www.kubernetes.org.cn/2693.html ,可能这篇文章比我在这码字回答的更有效,实际上你说的relist也是一种resync的做法

@zhaoweiguo zhaoweiguo changed the title Informer 中为什么需要引入 Resync 机制? [提问]Informer 中为什么需要引入 Resync 机制? Aug 8, 2020
@lianghao208
Copy link
Member Author

lianghao208 commented Aug 9, 2020

一、Client-go 中的 Informer 工作流程图

在这里插入图片描述
Informer 中的 Reflector 通过 List/watch 从 apiserver 中获取到集群中所有资源对象的变化事件(event),将其放入 Delta FIFO 队列中(以 Key、Value 的形式保存),触发 onAdd、onUpdate、onDelete 回调将 Key 放入 WorkQueue 中。同时将 Key 更新 Indexer 本地缓存。Control Loop 从 WorkQueue 中取到 Key,从 Indexer 中获取到该 Key 的 Value,进行相应的处理。

二、Resync 机制的引入

我们在使用 SharedInformerFactory 去创建 SharedInformer 时,需要填一个 ResyncDuration 的参数

// k8s.io/client-go/informers/factory.go
// NewSharedInformerFactory constructs a new instance of sharedInformerFactory for all namespaces.
func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory {
	return NewSharedInformerFactoryWithOptions(client, defaultResync)
}

这个参数指的是,多久从 Indexer 缓存中同步一次数据到 Delta FIFO 队列,重新走一遍流程

// k8s.io/client-go/tools/cache/delta_fifo.go
// 重新同步一次 Indexer 缓存数据到 Delta FIFO 队列中
func (f *DeltaFIFO) Resync() error {
	f.lock.Lock()
	defer f.lock.Unlock()

	if f.knownObjects == nil {
		return nil
	}
	// 遍历 indexer 中的 key,传入 syncKeyLocked 中处理
	keys := f.knownObjects.ListKeys()
	for _, k := range keys {
		if err := f.syncKeyLocked(k); err != nil {
			return err
		}
	}
	return nil
}

func (f *DeltaFIFO) syncKeyLocked(key string) error {
	obj, exists, err := f.knownObjects.GetByKey(key)
	if err != nil {
		klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
		return nil
	} else if !exists {
		klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
		return nil
	}
	// 如果发现 FIFO 队列中已经有相同 key 的 event 进来了,说明该资源对象有了新的 event,
	// 在 Indexer 中旧的缓存应该失效,因此不做 Resync 处理直接返回 nil
	id, err := f.KeyOf(obj)
	if err != nil {
		return KeyError{obj, err}
	}
	if len(f.items[id]) > 0 {
		return nil
	}
    // 重新放入 FIFO 队列中
	if err := f.queueActionLocked(Sync, obj); err != nil {
		return fmt.Errorf("couldn't queue object: %v", err)
	}
	return nil
}

为什么需要 Resync 机制呢?因为在处理 SharedInformer 事件回调时,可能存在处理失败的情况,定时的 Resync 让这些处理失败的事件有了重新 onUpdate 处理的机会。
那么经过 Resync 重新放入 Delta FIFO 队列的事件,和直接从 apiserver 中 watch 得到的事件处理起来有什么不一样呢?

// k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
	s.blockDeltas.Lock()
	defer s.blockDeltas.Unlock()

	// from oldest to newest
	for _, d := range obj.(Deltas) {
		// 判断事件类型,看事件是通过新增、更新、替换、删除还是 Resync 重新同步产生的
		switch d.Type {
		case Sync, Replaced, Added, Updated:
			s.cacheMutationDetector.AddObject(d.Object)
			if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
				if err := s.indexer.Update(d.Object); err != nil {
					return err
				}
				
				isSync := false
				switch {
				case d.Type == Sync:
					// 如果是通过 Resync 重新同步得到的事件则做个标记
					isSync = true
				case d.Type == Replaced:
					...
				}
				// 如果是通过 Resync 重新同步得到的事件,则触发 onUpdate 回调
				s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
			} else {
				if err := s.indexer.Add(d.Object); err != nil {
					return err
				}
				s.processor.distribute(addNotification{newObj: d.Object}, false)
			}
		case Deleted:
			if err := s.indexer.Delete(d.Object); err != nil {
				return err
			}
			s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
		}
	}
	return nil
}

从上面对 Delta FIFO 的队列处理源码可看出,如果是从 Resync 重新同步到 Delta FIFO 队列的事件,会分发到 updateNotification 中触发 onUpdate 的回调

三、总结

Resync 机制的引入,定时将 Indexer 缓存事件重新同步到 Delta FIFO 队列中,在处理 SharedInformer 事件回调时,让处理失败的事件得到重新处理。并且通过入队前判断 FIFO 队列中是否已经有了更新版本的 event,来决定是否丢弃 Indexer 缓存不进行 Resync 入队。在处理 Delta FIFO 队列中的 Resync 的事件数据时,触发 onUpdate 回调来让事件重新处理。

@luffyao
Copy link

luffyao commented Aug 9, 2020

@lianghao208 Hi "在处理 SharedInformer 事件回调时,让处理失败的事件得到重新处理"。 这里应该会将处理过的事件也进行了一次update处理吧?

@lianghao208
Copy link
Member Author

@lianghao208 Hi "在处理 SharedInformer 事件回调时,让处理失败的事件得到重新处理"。 这里应该会将处理过的事件也进行了一次update处理吧?

是的,准确的说时update处理,我改一下表述,感谢指正~

@gasxia
Copy link

gasxia commented Aug 9, 2020

这个问题在《Programming Kubernetes》的第一章详细解释了一遍,我搬运一下。

主要的目的是为了不丢数据,处理 resync 机制还有边缘触发与水平获取的设计,一起来保证不丢事件、数据同步并能及时响应事件。

以下内容都摘抄自《Programming Kubernetes》

在分布式系统中,有许多操作在并行执行,事件可能会以任意顺序异步到达。 如果我们的 controller 逻辑有问题,状态机出现一些错误或外部服务故障时,就很容易丢失事件,导致我们没有处理所有事件。 因此,我们必须更深入地研究如何应对这些问题。

在图中,您可以看到可用的不同方案:

  1. 仅使用边缘驱动逻辑的示例,其中可能错过第二个的状态更改事件。
  2. 边缘触发逻辑的示例,在处理事件时始终会获取最新状态(即水平)。换句话说,逻辑是边缘触发的(edge-triggered),但是水平驱动的(level-driven)。
  3. 该示例的逻辑是边缘触发,水平驱动的,但同时还附加了定时同步的能力。
    image

考虑到仅使用单一的边缘驱动触发会产生的问题,Kubernetes controller 通常采用第 3 种方案。

@lianghao208
Copy link
Member Author

这个问题在《Programming Kubernetes》的第一章详细解释了一遍,我搬运一下。

主要的目的是为了不丢数据,处理 resync 机制还有边缘触发与水平获取的设计,一起来保证不丢事件、数据同步并能及时响应事件。

以下内容都摘抄自《Programming Kubernetes》

在分布式系统中,有许多操作在并行执行,事件可能会以任意顺序异步到达。 如果我们的 controller 逻辑有问题,状态机出现一些错误或外部服务故障时,就很容易丢失事件,导致我们没有处理所有事件。 因此,我们必须更深入地研究如何应对这些问题。
在图中,您可以看到可用的不同方案:

  1. 仅使用边缘驱动逻辑的示例,其中可能错过第二个的状态更改事件。
  2. 边缘触发逻辑的示例,在处理事件时始终会获取最新状态(即水平)。换句话说,逻辑是边缘触发的(edge-triggered),但是水平驱动的(level-driven)。
  3. 该示例的逻辑是边缘触发,水平驱动的,但同时还附加了定时同步的能力。
    image

考虑到仅使用单一的边缘驱动触发会产生的问题,Kubernetes controller 通常采用第 3 种方案。

学习了~ 感谢~

@fighterhit
Copy link

  1. 以前以为resync指的是定时从etcd拉最新的以防出错,看来我理解错了。从上面的讨论还有一些资料来看,实际是informer启动时只会从etcd list一次全量数据,后面缓存全部都只走watch来更新,resync其实也只是将本地缓存的数据再构造成事件再次同步到队列一次,这样理解对吗?
  2. 疑问:因为watch是长连接,如果网络发生波动连接断了呢,本地缓存因为网络断了那段事件肯定就丢失了一部分事件导致etcd和本地数据不一致,这种情况下如果网络稳定下来或者在网络波动间隙,定时从etcd拉取的方案还能继续保持本地缓存和etcd一致,为什么不这样做呢?

@gaorong
Copy link

gaorong commented Mar 7, 2021

  1. 以前以为resync指的是定时从etcd拉最新的以防出错,看来我理解错了。从上面的讨论还有一些资料来看,实际是informer启动时只会从etcd list一次全量数据,后面缓存全部都只走watch来更新,resync其实也只是将本地缓存的数据再构造成事件再次同步到队列一次,这样理解对吗?
  2. 疑问:因为watch是长连接,如果网络发生波动连接断了呢,本地缓存因为网络断了那段事件肯定就丢失了一部分事件导致etcd和本地数据不一致,这种情况下如果网络稳定下来或者在网络波动间隙,定时从etcd拉取的方案还能继续保持本地缓存和etcd一致,为什么不这样做呢?
  1. 对的
  2. informer能保证通过list+watch不会丢失事件,如果网络抖动重新恢复后,watch会带着之前的resourceVersion号重连,resourceVersion是单调递增的,apiserver收到该请求后会将所有大于该resourceVersion的变更同步过来。 另外好像网络长期中断的话会导致informer重新初始化也就会重新list。

@fighterhit
Copy link

  1. 以前以为resync指的是定时从etcd拉最新的以防出错,看来我理解错了。从上面的讨论还有一些资料来看,实际是informer启动时只会从etcd list一次全量数据,后面缓存全部都只走watch来更新,resync其实也只是将本地缓存的数据再构造成事件再次同步到队列一次,这样理解对吗?
  2. 疑问:因为watch是长连接,如果网络发生波动连接断了呢,本地缓存因为网络断了那段事件肯定就丢失了一部分事件导致etcd和本地数据不一致,这种情况下如果网络稳定下来或者在网络波动间隙,定时从etcd拉取的方案还能继续保持本地缓存和etcd一致,为什么不这样做呢?
  1. 对的
  2. informer能保证通过list+watch不会丢失事件,如果网络抖动重新恢复后,watch会带着之前的resourceVersion号重连,resourceVersion是单调递增的,apiserver收到该请求后会将所有大于该resourceVersion的变更同步过来。 另外好像网络长期中断的话会导致informer重新初始化也就会重新list。

@gaorong 感谢解惑。对第2点的回答还有两个疑问:

如果网络抖动重新恢复后,watch会带着之前的resourceVersion号重连

从这句话字面上我理解的意思是说:如果网络只是短暂波动的话,watch 长连接会存在某种重试机制吗?那么这种重试机制是informer 自身实现的吗?或者还是借助http自身就有的某种机制?

另外好像网络长期中断的话会导致informer重新初始化也就会重新list

这句话指的是对于informer来说,整体也存在重试机制?也就是再重新执行一开始的list+watch的完整过程:先执行一次list,后面都还走watch跟原来一样,跟重启了一样吗?

@shadowdsp
Copy link

shadowdsp commented Mar 26, 2021

  1. 以前以为resync指的是定时从etcd拉最新的以防出错,看来我理解错了。从上面的讨论还有一些资料来看,实际是informer启动时只会从etcd list一次全量数据,后面缓存全部都只走watch来更新,resync其实也只是将本地缓存的数据再构造成事件再次同步到队列一次,这样理解对吗?
  2. 疑问:因为watch是长连接,如果网络发生波动连接断了呢,本地缓存因为网络断了那段事件肯定就丢失了一部分事件导致etcd和本地数据不一致,这种情况下如果网络稳定下来或者在网络波动间隙,定时从etcd拉取的方案还能继续保持本地缓存和etcd一致,为什么不这样做呢?

以前看好多 blog 都说 resync 是 k8s list,真的令人迷惑=-=。

@chuanyi-zjc
Copy link

  1. 以前以为resync指的是定时从etcd拉最新的以防出错,看来我理解错了。从上面的讨论还有一些资料来看,实际是informer启动时只会从etcd list一次全量数据,后面缓存全部都只走watch来更新,resync其实也只是将本地缓存的数据再构造成事件再次同步到队列一次,这样理解对吗?
  2. 疑问:因为watch是长连接,如果网络发生波动连接断了呢,本地缓存因为网络断了那段事件肯定就丢失了一部分事件导致etcd和本地数据不一致,这种情况下如果网络稳定下来或者在网络波动间隙,定时从etcd拉取的方案还能继续保持本地缓存和etcd一致,为什么不这样做呢?
  1. 对的
  2. informer能保证通过list+watch不会丢失事件,如果网络抖动重新恢复后,watch会带着之前的resourceVersion号重连,resourceVersion是单调递增的,apiserver收到该请求后会将所有大于该resourceVersion的变更同步过来。 另外好像网络长期中断的话会导致informer重新初始化也就会重新list。

这样来看的话,informer能保证event guaranteed. 不会丢失event ?

@dongfengpo0813
Copy link

dongfengpo0813 commented Sep 24, 2021

  1. 以前以为resync指的是定时从etcd拉最新的以防出错,看来我理解错了。从上面的讨论还有一些资料来看,实际是informer启动时只会从etcd list一次全量数据,后面缓存全部都只走watch来更新,resync其实也只是将本地缓存的数据再构造成事件再次同步到队列一次,这样理解对吗?
  2. 疑问:因为watch是长连接,如果网络发生波动连接断了呢,本地缓存因为网络断了那段事件肯定就丢失了一部分事件导致etcd和本地数据不一致,这种情况下如果网络稳定下来或者在网络波动间隙,定时从etcd拉取的方案还能继续保持本地缓存和etcd一致,为什么不这样做呢?
  1. 对的
  2. informer能保证通过list+watch不会丢失事件,如果网络抖动重新恢复后,watch会带着之前的resourceVersion号重连,resourceVersion是单调递增的,apiserver收到该请求后会将所有大于该resourceVersion的变更同步过来。 另外好像网络长期中断的话会导致informer重新初始化也就会重新list。

@gaorong 感谢解惑。对第2点的回答还有两个疑问:

如果网络抖动重新恢复后,watch会带着之前的resourceVersion号重连

从这句话字面上我理解的意思是说:如果网络只是短暂波动的话,watch 长连接会存在某种重试机制吗?那么这种重试机制是informer 自身实现的吗?或者还是借助http自身就有的某种机制?

另外好像网络长期中断的话会导致informer重新初始化也就会重新list

这句话指的是对于informer来说,整体也存在重试机制?也就是再重新执行一开始的list+watch的完整过程:先执行一次list,后面都还走watch跟原来一样,跟重启了一样吗?

启动之后只会list一次(或两次,出错时), 连接watch, 后续等待watch数据. 接收watch数据出错后, 会重新list & watch.
list&watch一般会被wait.Until函数封装.

@dongfengpo0813
Copy link

个人理解, resync机制就是为了定时确认当前的状态与目标状态是否一致.

@PlatformLC
Copy link

谢谢@gasxia 和 @yutian2011 的解释帮助了理解。
小弟拙见:因为informer(或者说reflector)本身是通过边缘进行同步触发的,而后真正的controller(非informer内部的controller)通过水平驱动进行协调同步。而加入resync机制就是保证的协调的最终一致性,单次触发的同步协调失败没关系,会有定时的resync心跳机制让controller去到达最终的期望状态。

@Yarollingstone
Copy link

// k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
	s.blockDeltas.Lock()
	defer s.blockDeltas.Unlock()

	// from oldest to newest
	for _, d := range obj.(Deltas) {
		// 判断事件类型,看事件是通过新增、更新、替换、删除还是 Resync 重新同步产生的
		switch d.Type {
		case Sync, Replaced, Added, Updated:
			s.cacheMutationDetector.AddObject(d.Object)
			if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
				if err := s.indexer.Update(d.Object); err != nil {
					return err
				}
				
				isSync := false
				switch {
				case d.Type == Sync:
					// 如果是通过 Resync 重新同步得到的事件则做个标记
					isSync = true
				case d.Type == Replaced:
					...
				}
				// 如果是通过 Resync 重新同步得到的事件,则触发 onUpdate 回调
				s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
			} else {
				if err := s.indexer.Add(d.Object); err != nil {
					return err
				}
				s.processor.distribute(addNotification{newObj: d.Object}, false)
			}
		case Deleted:
			if err := s.indexer.Delete(d.Object); err != nil {
				return err
			}
			s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
		}
	}
	return nil
}

从上面对 Delta FIFO 的队列处理源码可看出,如果是从 Resync 重新同步到 Delta FIFO 队列的事件,会分发到 updateNotification 中触发 onUpdate 的回调

三、总结

Resync 机制的引入,定时将 Indexer 缓存事件重新同步到 Delta FIFO 队列中,在处理 SharedInformer 事件回调时,让处理失败的事件得到重新处理。并且通过入队前判断 FIFO 队列中是否已经有了更新版本的 event,来决定是否丢弃 Indexer 缓存不进行 Resync 入队。在处理 Delta FIFO 队列中的 Resync 的事件数据时,触发 onUpdate 回调来让事件重新处理。
@qmloong
这里是通过 onUpdate 回调来处理 Resync,但一般我们写 Controller 时 onUpdate 是否决定入 workqueue 是通过比较新老对象是否一致来处理,但这里的”新对象“本质上是还是 indexer sync 的,老对象是 indexer 里的,这俩本质上就是一个对象,所以 onUpdate 回调就会丢弃,这个 resync 也就没了用处了。。

@shikingram
Copy link

  1. 以前以为resync指的是定时从etcd拉最新的以防出错,看来我理解错了。从上面的讨论还有一些资料来看,实际是informer启动时只会从etcd list一次全量数据,后面缓存全部都只走watch来更新,resync其实也只是将本地缓存的数据再构造成事件再次同步到队列一次,这样理解对吗?
  2. 疑问:因为watch是长连接,如果网络发生波动连接断了呢,本地缓存因为网络断了那段事件肯定就丢失了一部分事件导致etcd和本地数据不一致,这种情况下如果网络稳定下来或者在网络波动间隙,定时从etcd拉取的方案还能继续保持本地缓存和etcd一致,为什么不这样做呢?
  1. 对的
  2. informer能保证通过list+watch不会丢失事件,如果网络抖动重新恢复后,watch会带着之前的resourceVersion号重连,resourceVersion是单调递增的,apiserver收到该请求后会将所有大于该resourceVersion的变更同步过来。 另外好像网络长期中断的话会导致informer重新初始化也就会重新list。

@gaorong 感谢解惑。对第2点的回答还有两个疑问:

如果网络抖动重新恢复后,watch会带着之前的resourceVersion号重连

从这句话字面上我理解的意思是说:如果网络只是短暂波动的话,watch 长连接会存在某种重试机制吗?那么这种重试机制是informer 自身实现的吗?或者还是借助http自身就有的某种机制?

另外好像网络长期中断的话会导致informer重新初始化也就会重新list

这句话指的是对于informer来说,整体也存在重试机制?也就是再重新执行一开始的list+watch的完整过程:先执行一次list,后面都还走watch跟原来一样,跟重启了一样吗?

启动之后只会list一次(或两次,出错时), 连接watch, 后续等待watch数据. 接收watch数据出错后, 会重新list & watch. list&watch一般会被wait.Until函数封装.

我想问下,如何让他不list,或者我只关心我需要的数据,不需要全量数据,毕竟数据量太大,全量list占用cpu和内存会较高

@zhangzerui20
Copy link

zhangzerui20 commented Feb 23, 2022

我理解 reSync 机制有两个作用。

  1. 用来补偿 controller 代码中的 bug ,如果 controller 执行过程中出了问题,导致某个事件的协调没有完成,reSync 让 controller 可以再执行一次。我认为这点不是特别重要,sdk 给用户提供了可以重试的队列,可以解决非 bug(网络问题之类的),如果是代码 bug ,即使 reSync 还是会报错。
  2. 用来解决外部系统协调的问题,比如说你实现的 controller 会把 k8s 的某个资源同步到外部系统上,如果你的用户直接在外部系统修改了这个资源,就会导致外部系统和 k8s 资源不一致。这种场景,如果外部系统没有提供一些 watch 的方法,reSync 机制是一个比较好的选择,可以周期性的对账,保证一致。

关于推荐怎么配置这个时间,我理解就是看你是用来保证第一点,还是第二点。如果是第一点,可以把时间配置的长一些,比如几个小时。如果是第二点,就是看你的系统要求,比如一分钟,几十秒,都是可以的,这里也要考虑 controller 的协调里做的是什么,会不会对外部系统造成很大压力。

@loheagn
Copy link

loheagn commented Feb 26, 2023

我理解 reSync 机制有两个作用。

  1. 用来补偿 controller 代码中的 bug ,如果 controller 执行过程中出了问题,导致某个事件的协调没有完成,reSync 让 controller 可以再执行一次。我认为这点不是特别重要,sdk 给用户提供了可以重试的队列,可以解决非 bug(网络问题之类的),如果是代码 bug ,即使 reSync 还是会报错。
  2. 用来解决外部系统协调的问题,比如说你实现的 controller 会把 k8s 的某个资源同步到外部系统上,如果你的用户直接在外部系统修改了这个资源,就会导致外部系统和 k8s 资源不一致。这种场景,如果外部系统没有提供一些 watch 的方法,reSync 机制是一个比较好的选择,可以周期性的对账,保证一致。

关于推荐怎么配置这个时间,我理解就是看你是用来保证第一点,还是第二点。如果是第一点,可以把时间配置的长一些,比如几个小时。如果是第二点,就是看你的系统要求,比如一分钟,几十秒,都是可以的,这里也要考虑 controller 的协调里做的是什么,会不会对外部系统造成很大压力。

关于第2点谈一下个人愚见哈。

我觉得楼主讨论的 resync 机制针对的应该是“系统内部”“错过”事件的情况,即,一个资源的变动事件没有被informer接收到,自然也就会被controller错过。resync的存在可以定期让用户自己实现的controller整个过一遍所有资源,从而弥补之前“错过”的事件(因为本质上我们并不是特别关心“发生了什么事件”,而更关心“现在资源的期望状态到底是啥”,而事件的接收更多的起到一个通知的作用,而每次执行controller的reconcile逻辑时都要获取一下资源的最新状态)。

而你这里第2点说的情况,应该是“系统外部”导致的不一致情况。这种应该在用户自定义的controller中的reconcile函数中处理,比如返回值指定requeue参数等。

虽然上述两种情况的结果是相同的(即,本质上都是定期重新reconcile一下资源),但动机不太一样。

@callmevincent
Copy link

callmevincent commented Feb 26, 2023 via email

@lightnine
Copy link

我看了client-go 1.26版本代码,代码如下(tools/cache/Reflector.go/Run):

// Run repeatedly uses the reflector's ListAndWatch to fetch all the
// objects and subsequent deltas.
// Run will exit when stopCh is closed.
func (r *Reflector) Run(stopCh <-chan struct{}) {
	klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
	wait.BackoffUntil(func() {
		if err := r.ListAndWatch(stopCh); err != nil {
			r.watchErrorHandler(r, err)
		}
	}, r.backoffManager, true, stopCh)
	klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}

这里的BackoffUntil应该会定期的执行ListAndWatch方法,看起来list函数会定期的从kube-apiserver获取数据

@fighterhit
Copy link

我看了client-go 1.26版本代码,代码如下(tools/cache/Reflector.go/Run):

// Run repeatedly uses the reflector's ListAndWatch to fetch all the
// objects and subsequent deltas.
// Run will exit when stopCh is closed.
func (r *Reflector) Run(stopCh <-chan struct{}) {
	klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
	wait.BackoffUntil(func() {
		if err := r.ListAndWatch(stopCh); err != nil {
			r.watchErrorHandler(r, err)
		}
	}, r.backoffManager, true, stopCh)
	klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}

这里的BackoffUntil应该会定期的执行ListAndWatch方法,看起来list函数会定期的从kube-apiserver获取数据

多久一次?可能默认为了保证数据一致性?

@novahe
Copy link

novahe commented Apr 1, 2023

我看了client-go 1.26版本代码,代码如下(tools/cache/Reflector.go/Run):

// Run repeatedly uses the reflector's ListAndWatch to fetch all the
// objects and subsequent deltas.
// Run will exit when stopCh is closed.
func (r *Reflector) Run(stopCh <-chan struct{}) {
	klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
	wait.BackoffUntil(func() {
		if err := r.ListAndWatch(stopCh); err != nil {
			r.watchErrorHandler(r, err)
		}
	}, r.backoffManager, true, stopCh)
	klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}

这里的BackoffUntil应该会定期的执行ListAndWatch方法,看起来list函数会定期的从kube-apiserver获取数据

@lightnine
不会一直backoff,r.ListAndWatch(stopCh)函数是一个阻塞方法(里面是一个for循环),只会在由于错误导致函数退出for循环后才会重新执行

@callmevincent
Copy link

callmevincent commented Apr 1, 2023 via email

1 similar comment
@callmevincent
Copy link

callmevincent commented May 2, 2023 via email

@ScXin
Copy link

ScXin commented Jun 30, 2023

image

I find the author's answer from the book-programming Kubernetes

@callmevincent
Copy link

callmevincent commented Jun 30, 2023 via email

@KingsonKai
Copy link

Resync 机制会将 Indexer 的本地缓存重新同步到 DeltaFIFO 队列中。一般我们会设置一个时间周期,让 Indexer 周期性地将缓存同步到队列中。直接 list/watch apiserver 就已经能拿到集群中资源对象变化的 event 了,这里引入 Resync 的作用是什么呢?去掉会有什么影响呢?

“让 Indexer 周期性地将缓存同步到队列中”,请问一下这里的意思是把local store的事件列表重新和DeltaFIFO中的事件进行对比吗?本地缓存是对象的值,而DeltaFIFO存储的是事件。我不太明白为什么二者存储的东西不一样,但是能同步?

@callmevincent
Copy link

callmevincent commented Oct 16, 2023 via email

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests