apiserver代码给我整迷茫了。
想要搞清楚apiserver
的主流程,那么需要回答以下几个问题:
-
restful API是如何创建的? -
创建后,又做了哪些初始化操作? -
API是如何启动的?
go-restful介绍
在apiserver中使用的是go-restful
框架进行对外提供服务。为了更好的阅读主流程,稍微理解一下go-restful
的相关概念。
项目地址:github
更多的框架使用,请参考:example,最简单的例子,如下所示:
func main() {
// 1. 创建一个webService服务
ws := new(restful.WebService)
// 2. 将handler加入到Route中
ws.Route(ws.GET("/hello").To(hello))
// !!! 将ws加入到restful框架的Default Container中
restful.Add(ws)
log.Fatal(http.ListenAndServe(":8083", nil))
}
func hello(req *restful.Request, resp *restful.Response) {
io.WriteString(resp, "world")
}
这个代码初一看,有个非常困惑的点,在于官方的net/http库中是如何知道上面的ws有哪些路由的。在go-restful
中,会创建一个DefaultContainer
,在Container对象中会将ServeMux字段设置为http.DefaultServeMux。然后再restful.Add的时候调用DefaultContainer.Add()
方法,将ws中的handler加入到http中去。
在看go-restful的example的时候,发现一个很有趣的现象:
func main() {
ws := new(restful.WebService)
ws.Route(ws.GET("/hello").To(hello))
restful.Add(ws)
go func() {
log.Fatal(http.ListenAndServe(":8080", nil))
}()
// !!!!!!!!!!!!!!!!!!!!!!!
container2 := restful.NewContainer()
ws2 := new(restful.WebService)
ws2.Route(ws2.GET("/hello").To(hello2))
container2.Add(ws2)
server := &http.Server{Addr: ":8081", Handler: container2}
// !!!!!!!!!!!!!!!!!!!!!!!!
log.Fatal(server.ListenAndServe())
}
func hello(req *restful.Request, resp *restful.Response) {
io.WriteString(resp, "default world")
}
func hello2(req *restful.Request, resp *restful.Response) {
io.WriteString(resp, "second world")
}
在以上代码中,实现得到效果是该服务有2个http端口向外提供服务。仔细看它的实现方式,竟然采用的不是先将Container的ServeMux复制一个http.Server对象,而是相反操作,这就很反常识。 它是如何做到的呢? 这一点跟apiserver
对外api的创建非常强的关联。
在net/http库中,http.Server对象的Handler是一个接口类型:
type Handler interface {
ServeHTTP(ResponseWriter, *Request)
}
所以这里将container直接赋值给Handler也势必实现了这个接口。
// ServeHTTP implements net/http.Handler therefore a Container can be a Handler in a http.Server
func (c *Container) ServeHTTP(httpWriter http.ResponseWriter, httpRequest *http.Request) {
// Skip, if content encoding is disabled
if !c.contentEncodingEnabled {
c.ServeMux.ServeHTTP(httpWriter, httpRequest)
return
}
....
}
}
这一点非常作用,因为Apiserver
中也用到了这类的封装。
Apiserver
从主入口进入,就可以看到关键的函数:
// CreateServerChain creates the apiservers connected via delegation.
func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) {
// 创建整个ApiServer的相关配置
kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions)
if err != nil {
return nil, err
}
// 创建ApiExtensionServer的配置文件
apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(kubeAPIServerConfig.ExtraConfig.ProxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig, kubeAPIServerConfig.GenericConfig.TracerProvider))
if err != nil {
return nil, err
}
// 创建404 notFound 处理器
notFoundHandler := notfoundhandler.New(kubeAPIServerConfig.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey)
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
if err != nil {
return nil, err
}
// 创建核心apiserver服务
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
if err != nil {
return nil, err
}
// 创建聚合服务器配置
aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, kubeAPIServerConfig.ExtraConfig.ProxyTransport, pluginInitializer)
if err != nil {
return nil, err
}
// 创建聚合服务器
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
if err != nil {
// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
return nil, err
}
return aggregatorServer, nil
}
在这里创建了3个对外服务对象以及一个404 notFound的Handler对象。这个流程可以理解:
运行时,堆栈信息:
在创建过程中,3个Server最终调用的函数为:
func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
nonGoRestfulMux := mux.NewPathRecorderMux(name)
if notFoundHandler != nil {
nonGoRestfulMux.NotFoundHandler(notFoundHandler)
}
gorestfulContainer := restful.NewContainer()
gorestfulContainer.ServeMux = http.NewServeMux()
gorestfulContainer.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
logStackOnRecover(s, panicReason, httpWriter)
})
gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
serviceErrorHandler(s, serviceErr, request, response)
})
director := director{
name: name,
goRestfulContainer: gorestfulContainer,
nonGoRestfulMux: nonGoRestfulMux,
}
return &APIServerHandler{
FullHandlerChain: handlerChainBuilder(director),
GoRestfulContainer: gorestfulContainer,
NonGoRestfulMux: nonGoRestfulMux,
Director: director,
}
}
部分参数说明:
handlerChainBuilder:从配置文件中带下来的最终为DefaultBuildHandlerChain,源码路线为:
--> CreateKubeAPIServerConfig: cmd/kube-apiserver/app/server.go:237
--> buildGenericConfig(): cmd/kube-apiserver/app/server.go:245
--> genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs): vendor/k8s.io/apiserver/pkg/server/config.go:323
--> BuildHandlerChainFunc: DefaultBuildHandlerChain, : vendor/k8s.io/apiserver/pkg/server/config.go:333
notFoundHandler:这里就是上图的包含关系,在该层没有匹配到对应的handler,往下层继续寻找。
结构体对象说明:
director
type director struct {
// 名称
name string
// 当前服务提供的api接口
goRestfulContainer *restful.Container
// 没有找到向下传递的地址
nonGoRestfulMux *mux.PathRecorderMux
}
func (d director) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
结构体实现了ServeHTTP接口,那么其也可以作为Handler传递给HTTP。而从上面代码来看APIServer最终使用的handler方法为:FullHandlerChain,即使用handlerChainBuilder封装的director。
而handlerChainBuilder为DefaultBuildHandlerChain,接着往下看看。
DefaultBuildHandlerChain
源码如下:
func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler := filterlatency.TrackCompleted(apiHandler)
handler = genericapifilters.WithAuthorization(handler, c.Authorization.Authorizer, c.Serializer)
handler = filterlatency.TrackStarted(handler, "authorization")
if c.FlowControl != nil {
requestWorkEstimator := flowcontrolrequest.NewWorkEstimator(c.StorageObjectCountTracker.Get, c.FlowControl.GetInterestedWatchCount)
handler = filterlatency.TrackCompleted(handler)
handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator)
handler = filterlatency.TrackStarted(handler, "priorityandfairness")
} else {
handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)
}
handler = filterlatency.TrackCompleted(handler)
handler = genericapifilters.WithImpersonation(handler, c.Authorization.Authorizer, c.Serializer)
handler = filterlatency.TrackStarted(handler, "impersonation")
handler = filterlatency.TrackCompleted(handler)
handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator, c.LongRunningFunc)
handler = filterlatency.TrackStarted(handler, "audit")
failedHandler := genericapifilters.Unauthorized(c.Serializer)
failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.AuditBackend, c.AuditPolicyRuleEvaluator)
failedHandler = filterlatency.TrackCompleted(failedHandler)
handler = filterlatency.TrackCompleted(handler)
handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences)
handler = filterlatency.TrackStarted(handler, "authentication")
handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
// WithTimeoutForNonLongRunningRequests will call the rest of the request handling in a go-routine with the
// context with deadline. The go-routine can keep running, while the timeout logic will return a timeout to the client.
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc)
handler = genericapifilters.WithRequestDeadline(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator,
c.LongRunningFunc, c.Serializer, c.RequestTimeout)
handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)
if c.SecureServing != nil && !c.SecureServing.DisableHTTP2 && c.GoawayChance > 0 {
handler = genericfilters.WithProbabilisticGoaway(handler, c.GoawayChance)
}
handler = genericapifilters.WithAuditAnnotations(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator)
handler = genericapifilters.WithWarningRecorder(handler)
handler = genericapifilters.WithCacheControl(handler)
handler = genericfilters.WithHSTS(handler, c.HSTSDirectives)
if c.ShutdownSendRetryAfter {
handler = genericfilters.WithRetryAfter(handler, c.lifecycleSignals.AfterShutdownDelayDuration.Signaled())
}
handler = genericfilters.WithHTTPLogging(handler)
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {
handler = genericapifilters.WithTracing(handler, c.TracerProvider)
}
handler = genericapifilters.WithLatencyTrackers(handler)
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
handler = genericapifilters.WithRequestReceivedTimestamp(handler)
handler = genericapifilters.WithMuxAndDiscoveryComplete(handler, c.lifecycleSignals.MuxAndDiscoveryComplete.Signaled())
handler = genericfilters.WithPanicRecovery(handler, c.RequestInfoResolver)
handler = genericapifilters.WithAuditID(handler)
return handler
}
这个代码很长,但是代码逻辑都很相似,即公共的filter链。采用的是栈调用的方式,先初始化的最后调用,即在http请求进入时,最先调用的方法为WithAuditID()。
apiserver 启动
目前源码已经知晓了,api handler的创建(参考上一章),filter的创建,apiserver的创建, 最终就是apisever的启动位置;
--> prepared.Run(stopCh): vendor/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go:413
--> func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}): vendor/k8s.io/apiserver/pkg/server/genericapiserver.go:394
--> stoppedCh, listenerStoppedCh, err := s.NonBlockingRun(stopHttpServerCh, shutdownTimeout): vendor/k8s.io/apiserver/pkg/server/genericapiserver.go:455
--> stoppedCh, listenerStoppedCh, err = s.SecureServingInfo.Serve(s.Handler, shutdownTimeout, internalStopCh): vendor/k8s.io/apiserver/pkg/server/genericapiserver.go:522
--> RunServer(): vendor/k8s.io/apiserver/pkg/server/secure_serving.go:220
在以上函数执行完成后,[:]:6443接口即对外提供服务; 这里需要注意一个对象:
secureServer := &http.Server{
Addr: s.Listener.Addr().String(),
Handler: handler,
MaxHeaderBytes: 1 << 20,
TLSConfig: tlsConfig,
IdleTimeout: 90 * time.Second, // matches http.DefaultTransport keep-alive timeout
ReadHeaderTimeout: 32 * time.Second, // just shy of requestTimeoutUpperBound
}
在这里将handler赋值给http.server,而这个handler对象具体是什么呢? kube-aggregator,如下图:
从上图也能看到最先调用的handler为withAudit,与我们的分析一致。
以上。
小结
apiserver
的启动过程,以及主体逻辑已经分析清楚,后续将通过问题来查看apiserver是如何处理的。
原文始发于微信公众号(小唐云原生):【kubernete源码阅读】kube-apiserver访问主流程
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/288939.html