【kubernete源码阅读】kube-apiserver访问主流程

apiserver代码给我整迷茫了。
想要搞清楚apiserver的主流程,那么需要回答以下几个问题:

  • restful API是如何创建的?
  • 创建后,又做了哪些初始化操作?
  • API是如何启动的?

go-restful介绍

在apiserver中使用的是go-restful框架进行对外提供服务。为了更好的阅读主流程,稍微理解一下go-restful的相关概念。
项目地址:github  
更多的框架使用,请参考:example,最简单的例子,如下所示:

func main() {
    // 1. 创建一个webService服务
    ws := new(restful.WebService)
    // 2. 将handler加入到Route中
    ws.Route(ws.GET("/hello").To(hello))
    // !!! 将ws加入到restful框架的Default Container中
    restful.Add(ws)
    log.Fatal(http.ListenAndServe(":8083"nil))
}

func hello(req *restful.Request, resp *restful.Response) {
    io.WriteString(resp, "world")
}

这个代码初一看,有个非常困惑的点,在于官方的net/http库中是如何知道上面的ws有哪些路由的。在go-restful中,会创建一个DefaultContainer,在Container对象中会将ServeMux字段设置为http.DefaultServeMux。然后再restful.Add的时候调用DefaultContainer.Add()方法,将ws中的handler加入到http中去。
在看go-restful的example的时候,发现一个很有趣的现象:

func main() {
 ws := new(restful.WebService)
 ws.Route(ws.GET("/hello").To(hello))
 restful.Add(ws)
 go func() {
  log.Fatal(http.ListenAndServe(":8080"nil))
 }()
    // !!!!!!!!!!!!!!!!!!!!!!!
 container2 := restful.NewContainer()
 ws2 := new(restful.WebService)
 ws2.Route(ws2.GET("/hello").To(hello2))
 container2.Add(ws2)
 server := &http.Server{Addr: ":8081", Handler: container2}
    // !!!!!!!!!!!!!!!!!!!!!!!!
 log.Fatal(server.ListenAndServe())
}

func hello(req *restful.Request, resp *restful.Response) {
 io.WriteString(resp, "default world")
}

func hello2(req *restful.Request, resp *restful.Response) {
 io.WriteString(resp, "second world")
}

在以上代码中,实现得到效果是该服务有2个http端口向外提供服务。仔细看它的实现方式,竟然采用的不是先将Container的ServeMux复制一个http.Server对象,而是相反操作,这就很反常识。 它是如何做到的呢? 这一点跟apiserver对外api的创建非常强的关联。
在net/http库中,http.Server对象的Handler是一个接口类型:

type Handler interface {
 ServeHTTP(ResponseWriter, *Request)
}

所以这里将container直接赋值给Handler也势必实现了这个接口。


// ServeHTTP implements net/http.Handler therefore a Container can be a Handler in a http.Server
func (c *Container) ServeHTTP(httpWriter http.ResponseWriter, httpRequest *http.Request) {
 // Skip, if content encoding is disabled
 if !c.contentEncodingEnabled {
  c.ServeMux.ServeHTTP(httpWriter, httpRequest)
  return
 }
    ....
    }
}

这一点非常作用,因为Apiserver中也用到了这类的封装。

Apiserver

从主入口进入,就可以看到关键的函数:

// CreateServerChain creates the apiservers connected via delegation.
func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) {
    // 创建整个ApiServer的相关配置
 kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions)
 if err != nil {
  return nil, err
 }

 // 创建ApiExtensionServer的配置文件
 apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
  serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(kubeAPIServerConfig.ExtraConfig.ProxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig, kubeAPIServerConfig.GenericConfig.TracerProvider))
 if err != nil {
  return nil, err
 }
    // 创建404 notFound 处理器
 notFoundHandler := notfoundhandler.New(kubeAPIServerConfig.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey)
 apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
 if err != nil {
  return nil, err
 }
    // 创建核心apiserver服务
 kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
 if err != nil {
  return nil, err
 }

 // 创建聚合服务器配置
 aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, kubeAPIServerConfig.ExtraConfig.ProxyTransport, pluginInitializer)
 if err != nil {
  return nil, err
 }
    // 创建聚合服务器
 aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
 if err != nil {
  // we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
  return nil, err
 }

 return aggregatorServer, nil
}

在这里创建了3个对外服务对象以及一个404 notFound的Handler对象。这个流程可以理解:
【kubernete源码阅读】kube-apiserver访问主流程
运行时,堆栈信息:
【kubernete源码阅读】kube-apiserver访问主流程
在创建过程中,3个Server最终调用的函数为:

func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
     nonGoRestfulMux := mux.NewPathRecorderMux(name)
 if notFoundHandler != nil {
  nonGoRestfulMux.NotFoundHandler(notFoundHandler)
 }

 gorestfulContainer := restful.NewContainer()
 gorestfulContainer.ServeMux = http.NewServeMux()
 gorestfulContainer.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
 gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
  logStackOnRecover(s, panicReason, httpWriter)
 })
 gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
  serviceErrorHandler(s, serviceErr, request, response)
 })

 director := director{
  name:               name,
  goRestfulContainer: gorestfulContainer,
  nonGoRestfulMux:    nonGoRestfulMux,
 }

 return &APIServerHandler{
  FullHandlerChain:   handlerChainBuilder(director),
  GoRestfulContainer: gorestfulContainer,
  NonGoRestfulMux:    nonGoRestfulMux,
  Director:           director,
 }
}

部分参数说明:
handlerChainBuilder:从配置文件中带下来的最终为DefaultBuildHandlerChain,源码路线为:

--> CreateKubeAPIServerConfig: cmd/kube-apiserver/app/server.go:237
--> buildGenericConfig(): cmd/kube-apiserver/app/server.go:245
--> genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs): vendor/k8s.io/apiserver/pkg/server/config.go:323
--> BuildHandlerChainFunc: DefaultBuildHandlerChain, : vendor/k8s.io/apiserver/pkg/server/config.go:333

notFoundHandler:这里就是上图的包含关系,在该层没有匹配到对应的handler,往下层继续寻找。
结构体对象说明:
director

type director struct {
    // 名称
 name               string
    // 当前服务提供的api接口
 goRestfulContainer *restful.Container
    // 没有找到向下传递的地址
 nonGoRestfulMux    *mux.PathRecorderMux
}

func (d director) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}

结构体实现了ServeHTTP接口,那么其也可以作为Handler传递给HTTP。而从上面代码来看APIServer最终使用的handler方法为:FullHandlerChain,即使用handlerChainBuilder封装的director。
而handlerChainBuilder为DefaultBuildHandlerChain,接着往下看看。

DefaultBuildHandlerChain

源码如下:

func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
 handler := filterlatency.TrackCompleted(apiHandler)
 handler = genericapifilters.WithAuthorization(handler, c.Authorization.Authorizer, c.Serializer)
 handler = filterlatency.TrackStarted(handler, "authorization")

 if c.FlowControl != nil {
  requestWorkEstimator := flowcontrolrequest.NewWorkEstimator(c.StorageObjectCountTracker.Get, c.FlowControl.GetInterestedWatchCount)
  handler = filterlatency.TrackCompleted(handler)
  handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator)
  handler = filterlatency.TrackStarted(handler, "priorityandfairness")
 } else {
  handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)
 }

 handler = filterlatency.TrackCompleted(handler)
 handler = genericapifilters.WithImpersonation(handler, c.Authorization.Authorizer, c.Serializer)
 handler = filterlatency.TrackStarted(handler, "impersonation")

 handler = filterlatency.TrackCompleted(handler)
 handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator, c.LongRunningFunc)
 handler = filterlatency.TrackStarted(handler, "audit")

 failedHandler := genericapifilters.Unauthorized(c.Serializer)
 failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.AuditBackend, c.AuditPolicyRuleEvaluator)

 failedHandler = filterlatency.TrackCompleted(failedHandler)
 handler = filterlatency.TrackCompleted(handler)
 handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences)
 handler = filterlatency.TrackStarted(handler, "authentication")

 handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nilnilnil"true")

 // WithTimeoutForNonLongRunningRequests will call the rest of the request handling in a go-routine with the
 // context with deadline. The go-routine can keep running, while the timeout logic will return a timeout to the client.
 handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc)

 handler = genericapifilters.WithRequestDeadline(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator,
  c.LongRunningFunc, c.Serializer, c.RequestTimeout)
 handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)
 if c.SecureServing != nil && !c.SecureServing.DisableHTTP2 && c.GoawayChance > 0 {
  handler = genericfilters.WithProbabilisticGoaway(handler, c.GoawayChance)
 }
 handler = genericapifilters.WithAuditAnnotations(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator)
 handler = genericapifilters.WithWarningRecorder(handler)
 handler = genericapifilters.WithCacheControl(handler)
 handler = genericfilters.WithHSTS(handler, c.HSTSDirectives)
 if c.ShutdownSendRetryAfter {
  handler = genericfilters.WithRetryAfter(handler, c.lifecycleSignals.AfterShutdownDelayDuration.Signaled())
 }
 handler = genericfilters.WithHTTPLogging(handler)
 if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {
  handler = genericapifilters.WithTracing(handler, c.TracerProvider)
 }
 handler = genericapifilters.WithLatencyTrackers(handler)
 handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
 handler = genericapifilters.WithRequestReceivedTimestamp(handler)
 handler = genericapifilters.WithMuxAndDiscoveryComplete(handler, c.lifecycleSignals.MuxAndDiscoveryComplete.Signaled())
 handler = genericfilters.WithPanicRecovery(handler, c.RequestInfoResolver)
 handler = genericapifilters.WithAuditID(handler)
 return handler
}

这个代码很长,但是代码逻辑都很相似,即公共的filter链。采用的是栈调用的方式,先初始化的最后调用,即在http请求进入时,最先调用的方法为WithAuditID()。

apiserver 启动

目前源码已经知晓了,api handler的创建(参考上一章),filter的创建,apiserver的创建, 最终就是apisever的启动位置;

--> prepared.Run(stopCh): vendor/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go:413
--> func (s preparedGenericAPIServer) Run(stopCh <-chan struct{})vendor/k8s.io/apiserver/pkg/server/genericapiserver.go:394
--> stoppedCh, listenerStoppedCh, err := s.NonBlockingRun(stopHttpServerCh, shutdownTimeout): vendor/k8s.io/apiserver/pkg/server/genericapiserver.go:455
--> stoppedCh, listenerStoppedCh, err = s.SecureServingInfo.Serve(s.Handler, shutdownTimeout, internalStopCh): vendor/k8s.io/apiserver/pkg/server/genericapiserver.go:522
--> RunServer(): vendor/k8s.io/apiserver/pkg/server/secure_serving.go:220

在以上函数执行完成后,[:]:6443接口即对外提供服务; 这里需要注意一个对象:

secureServer := &http.Server{
  Addr:           s.Listener.Addr().String(),
  Handler:        handler,
  MaxHeaderBytes: 1 << 20,
  TLSConfig:      tlsConfig,

  IdleTimeout:       90 * time.Second, // matches http.DefaultTransport keep-alive timeout
  ReadHeaderTimeout: 32 * time.Second, // just shy of requestTimeoutUpperBound
 }

在这里将handler赋值给http.server,而这个handler对象具体是什么呢? kube-aggregator,如下图:
【kubernete源码阅读】kube-apiserver访问主流程
从上图也能看到最先调用的handler为withAudit,与我们的分析一致。

以上。

小结

apiserver的启动过程,以及主体逻辑已经分析清楚,后续将通过问题来查看apiserver是如何处理的。


原文始发于微信公众号(小唐云原生):【kubernete源码阅读】kube-apiserver访问主流程

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/288939.html

(0)
土豆大侠的头像土豆大侠

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!