Kubernetes Informer 认知

Informer是Client-go中的一个核心工具包。Informer 的机制,降低了 Kubernetes 各个组件跟 Etcd 与 Kubernetes API Server 的通信压力。本文描述为什么降低了压力。

1、Informer 机制架构设计

Kubernetes Informer 认知
informer

注意:这张图分为两部分,黄色图标是开发者需要自行开发的部分,而其它的部分是 client-go 已经提供的,直接使用即可。即理解informer原理时,只需看图中上部分即可。

  • Reflector:用于 Watch 指定的 Kubernetes 资源,当 watch 的资源发生变化时,触发变更的事件,比如 Added,Updated 和 Deleted 事件,并将资源对象存放到本地缓存 DeltaFIFO;
  • DeltaFIFO:拆开理解,FIFO 就是一个队列,拥有队列基本方法(ADD,UPDATE,DELETE,LIST,POP,CLOSE 等),Delta 是一个资源对象存储,保存存储对象的消费类型,比如 Added,Updated,Deleted,Sync 等;
  • Indexer:Client-go 用来存储资源对象并自带索引功能的本地存储,Reflector 从 DeltaFIFO 中将消费出来的资源对象存储到 Indexer,Indexer 与 Etcd 集群中的数据完全保持一致。从而 client-go 可以本地读取,减少 Kubernetes API 和 Etcd 集群的压力。

2、informers 过程

  • Informer在初始化的时,先调用Kubernetes List API获得某种resource的全部Object,缓存在内存中;
  • Informer调用Watch API去watch这种resource,去维护这份缓存;
  • Informer通过Kubernetes Watch API监听某种resource下的所有事件,触发回调函数。

2.1、回调函数实例

  • (即ResourceEventHandler实例)只需实现OnAdd(obj interface{})OnUpdate(oldObj, newObj interface{}) 和OnDelete(obj interface{}) 三个方法。回调函数实例是不包括在informer里面的,需要用户开发。

2.2、二级缓存

属于Informer的底层缓存机制,这两级缓存分别是DeltaFIFO和LocalStore。这两级缓存的用途各不相同。但Informer内部的这两级缓存之间存在resync机制。

  • DeltaFIFO用来存储Watch API返回的各种事件
  • LocalStore只会被Lister的List/Get方法访问 。

3、图中整体过程

已某pod资源为例说明,提前list资源在内存中,后续watch资源变化,有变化执行对应的函数,所以没有单独和apiserver交互。当为K8s本身资源时,没有以上的倒数第三步,当自定义资源时,一般会在倒数第三步先入队列,然后在Processor函数中获取队列,处理自定义资源,其中可能包括(sts、cm和service创建等操作)

  • Informer 在初始化时,Reflector 会先 List API 获得所有的 Pod
  • Reflect 拿到全部 Pod 后,会将全部 Pod 放到 Store 中
  • 如果有人调用 Lister 的 List/Get 方法获取 Pod, 那么 Lister 会直接从 Store 中拿数据
  • Informer 初始化完成之后,Reflector 开始 Watch Pod,监听 Pod 相关 的所有事件;如果此时 pod_1 被删除,那么 Reflector 会监听到这个事件
  • Reflector 将 pod_1 被删除 的这个事件发送到 DeltaFIFO
  • DeltaFIFO 首先会将这个事件存储在自己的数据结构中(实际上是一个 queue),然后会直接操作 Store 中的数据,删除 Store 中的 pod_1
  • DeltaFIFO 再 Pop 这个事件到 Controller 中
  • 事件通过ResourceEventHandlers OnDelete函数入队列中(当自定义资源时)
  • Controller 收到这个事件,会触发 Processor 的回调函数
  • LocalStore 会周期性地把所有的 Pod 信息重新放到 DeltaFIFO 中

4、informers使用方法

informer可以在现有的k8s资源使用,也可以在自定义资源使用,一般是在operator开发中对自定义和现有的资源一起使用。流程大致:

4.1、定义一个client

  • 可参考上篇文章

4.2、声明一个informer

  • 其中使用的package包为:”k8s.io/client-go/informers”,
  • 如果是自定义的资源需要自己实现,也可借助工具code-generator的脚本生成,目前kubebuilder和operator-sdk是不支持。
  • NewSharedInformerFactoryWithOptions和NewSharedInformerFactory都是创建共享 Informer 工厂的函数,但WithOptions可以更多的自定义配置
func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory {
return NewSharedInformerFactoryWithOptions(client, defaultResync)
}

func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
factory := &sharedInformerFactory{
client: client,
namespace: v1.NamespaceAll,
defaultResync: defaultResync,
informers: make(map[reflect.Type]cache.SharedIndexInformer),
startedInformers: make(map[reflect.Type]bool),
customResync: make(map[reflect.Type]time.Duration),
}

// Apply all options
for _, opt := range options {
factory = opt(factory)
}

return factory
}

4.3、和controller关联

  • controller定义informer 资源的 Lister()
用于从缓存中获取特定资源列表的接口  serviceLister: kubeInformerFactory.Core().V1().Services().Lister()       
方法获取对应的 Informer 的同步状态函数 serviceListerSynced: kubeInformerFactory.Core().V1().Services().Informer().HasSynced
  • controller中添加informer事件,对应ResourceEventHandlerFuncs回调函数

4.4、启动informer

具体代码示例,分为k8s资源和自定义资源:

// Create Informers
kubeInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(
kubeClient,
kubeInformerFactoryResyncPeriod,
kubeinformers.WithNamespace("test"),
)
InformerFactory := informers.NewSharedInformerFactoryWithOptions(
Client,
InformerFactoryResyncPeriod,
informers.WithNamespace("test"),
)
// Create Controller
Controller := NewController(
InformerFactory,
kubeInformerFactory,
)

// NewController creates instance of Controller
func NewController(
InformerFactory informers.SharedInformerFactory,
kubeInformerFactory kubeinformers.SharedInformerFactory,
) *Controller {
// Create Controller instance
controller := &Controller{
Lister: InformerFactory.Lin().V1().Installations().Lister(),
ListerSynced: InformerFactory.Lin().V1().Installations().Informer().HasSynced,
serviceLister: kubeInformerFactory.Core().V1().Services().Lister(),
serviceListerSynced: kubeInformerFactory.Core().V1().Services().Informer().HasSynced,
}

controller.addEventHandlers(InformerFactory, kubeInformerFactory)

return controller
}


func (c *Controller) addEventHandlersService(
kubeInformerFactory kubeinformers.SharedInformerFactory,
) {
kubeInformerFactory.Core().V1().Services().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
service := obj.(*core.Service)
if !c.isTrackedObject(&service.ObjectMeta) {
return
}
log.V(3).M(service).Info("serviceInformer.AddFunc")
},
UpdateFunc: func(old, new interface{}) {
oldService := old.(*core.Service)
if !c.isTrackedObject(&oldService.ObjectMeta) {
return
}
log.V(3).M(oldService).Info("serviceInformer.UpdateFunc")
},
DeleteFunc: func(obj interface{}) {
service, ok := obj.(*core.Service)
if !ok {
log.Info("service delete is failed. obj: %v", obj)
return
}
if !c.isTrackedObject(&service.ObjectMeta) {
return
}
log.V(3).M(service).Info("serviceInformer.DeleteFunc")
},
})
}

func (c *Controller) addEventHandlersCHI(
InformerFactory informers.SharedInformerFactory,
) {
InformerFactory.Lin().V1().Installations().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ch := obj.(*v1.Installation)
if !Config().IsWatchedNamespace(ch.Namespace) {
return
}
log.V(2).M(chi).Info("Informer.AddFunc")
c.enqueueObject(NewReconcile(reconcileAdd, nil, ch))
},
UpdateFunc: func(old, new interface{}) {
oldCh := old.(*v1.Installation)
newCh := new.(*v1.Installation)
if !Config().IsWatchedNamespace(newCh.Namespace) {
return
}
log.V(2).M(newCh).Info("Informer.UpdateFunc")
c.enqueueObject(NewReconcile(reconcileUpdate, oldCh, newCh))
},
DeleteFunc: func(obj interface{}) {
ch, ok := obj.(*v1.Installation)
if !ok {
log.Info("Installation delete is failed. obj: %v", obj)
return
}
if !Config().IsWatchedNamespace(ch.Namespace) {
return
}
log.V(2).M(chi).Info("Informer.DeleteFunc")
c.enqueueObject(NewReconcile(reconcileDelete, ch, nil))
},
})
}

if err := w.processItem(ctx, item); err != nil {
// Item not processed
// this code cannot return an error and needs to indicate error has been ignored
utilruntime.HandleError(err)
}

// Start Informers
kubeInformerFactory.Start(ctx.Done())
chopInformerFactory.Start(ctx.Done())

// Start Controller
Controller.Run(ctx)


原文始发于微信公众号(云原生内经):Kubernetes Informer 认知

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/167914.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!