Kubernetes 源码分析(一) 之kubelet 流程
Kubernetes 组件分析之 kubelet
1.什么是 kubelet?
kubelet 是负责所有运行在工作节点上内容的组件,相当于每个节点上的节点代理,每个节点都会启动 kubelet 进程用来处理 Master 节点下发到本节点的任务。kublet 按照 PodSpec 描述来管理 Pod 和其中的容器(PodSpec 是用来描述一个 pod 的 YAML 或者 JSON 对象)。
2.kubelet 在kubernetes的位置
3.kubelet包含的主要组件
配置同步
特性门控(FeatureGate)
容器管理(Container Manager)
PLEG
CAdvisor&oomWatcher
GCManager
CPUManager
ProberManager
EvictionManager
StatusManager
VolumeManager
PluginManager
ContainerManager
CRI
kubelet主要负责通过grpc与RunC服务器通信,并通过调用CRI接口来与使用RunC提供的相应服务,从而实现对RunC的管理。
4.源码分析
kubelet的执行入口在D:\workspace\go\kubernetes\cmd\kubelet\kubelet.go
,具体如下
func main() {
rand.Seed(time.Now().UnixNano())
command := app.NewKubeletCommand()
logs.InitLogs()
defer logs.FlushLogs()
if err := command.Execute(); err != nil {
os.Exit(1)
}
}
其中通过NewKubeletCommand
使用默认参数创建了一个*cobra.Command
对象。cobra
是一个 Golang 的包,提供了相应的接口用来处理命令行的各种参数输入。NewKubeletCommand
函数中除了各种对输入参数的字符检查和配置处理外,主要生成了两个重要的配置结构KubeletServer
和kubeletDeps
,如下所示:
// construct a KubeletServer from kubeletFlags and kubeletConfig
kubeletServer := &options.KubeletServer{
KubeletFlags: *kubeletFlags,
KubeletConfiguration: *kubeletConfig,
}
// use kubeletServer to construct the default KubeletDeps
kubeletDeps, err := UnsecuredDependencies(kubeletServer, utilfeature.DefaultFeatureGate)
if err != nil {
klog.ErrorS(err, "Failed to construct kubelet dependencies")
os.Exit(1)
}
其中KubeletServer
主要包含KubeletFlags
和KubeletConfiguration
。KubeletFlags
中存储的配置信息主要是node
所在生命周期中基本不会改变并且不会被其他node
共享的信息,例如 hostname 信息等,通常情况下不需要对其添加 flags 或者配置域。KubeletConfiguration
则主要包含一些其他节点共享的信息,比如ClusterDomain
等。
另外一个重要的配置结构为kubeletDeps
, 其中官方对Deps的一个注释解释如下:
Dependencies is a bin for things we might consider "injected dependencies" -- objects constructed at runtime that are necessary for running the Kubelet. This is a temporary solution for grouping these objects while we figure out a more comprehensive dependency injection story for the Kubelet.
kubeletDeps
包含认证接口、KubeClient、PodConfig、KubeletConfigController、RemoteRuntimeService、RemoteImageService、ContainerManager 等各种 kubelet 需要协同的依赖组件。
从命令行或者配置文件中初始化配置后开始执行对应的Run方法如下:
if err := Run(ctx, kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate); err != nil {
klog.ErrorS(err, "Failed to run kubelet")
os.Exit(1)
}
继续跟进去
func Run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) error {
logOption := &logs.Options{Config: s.Logging}
logOption.Apply()
// To help debugging, immediately log version
klog.InfoS("Kubelet version", "kubeletVersion", version.Get())
if err := initForOS(s.KubeletFlags.WindowsService, s.KubeletFlags.WindowsPriorityClass); err != nil {
return fmt.Errorf("failed OS init: %w", err)
}
if err := run(ctx, s, kubeDeps, featureGate); err != nil {
return fmt.Errorf("failed to run Kubelet: %w", err)
}
return nil
}
func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) {
// Set global feature gates based on the value on the initial KubeletServer
err = utilfeature.DefaultMutableFeatureGate.SetFromMap(s.KubeletConfiguration.FeatureGates)
if err != nil {
return err
}
// validate the initial KubeletServer (we set feature gates first, because this validation depends on feature gates)
if err := options.ValidateKubeletServer(s); err != nil {
return err
}
这里的run
函数包含了各种服务的创建内容,这里挑选一些重要的位置讲解
首先run
函数中根据上下文创建了一个授权认证的服务
if kubeDeps.Auth == nil {
auth, runAuthenticatorCAReload, err := BuildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration)
if err != nil {
return err
}
kubeDeps.Auth = auth
runAuthenticatorCAReload(ctx.Done())
}
一个节点监听服务
if s.HealthzPort > 0 {
mux := http.NewServeMux()
healthz.InstallHandler(mux)
go wait.Until(func() {
err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), mux)
if err != nil {
klog.ErrorS(err, "Failed to start healthz server")
}
}, 5*time.Second, wait.NeverStop)
}
一个可选的配置同步服务
kubeDeps.KubeletConfigController.StartSync
一个后台通知服务
go daemon.SdNotify(false, "READY=1")
对RunC的服务做一些基础的初始化,这种初始化会根据是否是docker类型来判断是否需要启动Dockershim。对于RemoteRuntime类型启动RemoteRuntimeService和RemoteImageService。这里的xxxService就是通过grpc建立remoteRuntime连接,然后依据协议RCI接口调用runc的服务。预处理RunC服务成功后执行RunKubelet方法
err = kubelet.PreInitRuntimeService(&s.KubeletConfiguration,
kubeDeps, &s.ContainerRuntimeOptions,
s.ContainerRuntime,
s.RuntimeCgroups,
s.RemoteRuntimeEndpoint,
s.RemoteImageEndpoint,
s.NonMasqueradeCIDR)
if err != nil {
return err
}
if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
return err
}
这里对RunKubelet方法进行,节选后展示如下:
k, err := createAndInitKubelet(&kubeServer.KubeletConfiguration,
kubeDeps,
&kubeServer.ContainerRuntimeOptions,
kubeServer.ContainerRuntime,
hostname,
hostnameOverridden,
nodeName,
nodeIPs,
kubeServer.ProviderID,
kubeServer.CloudProvider,
kubeServer.CertDirectory,
kubeServer.RootDirectory,
kubeServer.ImageCredentialProviderConfigFile,
kubeServer.ImageCredentialProviderBinDir,
kubeServer.RegisterNode,
kubeServer.RegisterWithTaints,
kubeServer.AllowedUnsafeSysctls,
kubeServer.ExperimentalMounterPath,
kubeServer.KernelMemcgNotification,
kubeServer.ExperimentalCheckNodeCapabilitiesBeforeMount,
kubeServer.ExperimentalNodeAllocatableIgnoreEvictionThreshold,
kubeServer.MinimumGCAge,
kubeServer.MaxPerPodContainerCount,
kubeServer.MaxContainerCount,
kubeServer.MasterServiceNamespace,
kubeServer.RegisterSchedulable,
kubeServer.KeepTerminatedPodVolumes,
kubeServer.NodeLabels,
kubeServer.SeccompProfileRoot,
kubeServer.NodeStatusMaxImages,
kubeServer.KubeletFlags.SeccompDefault || kubeServer.KubeletConfiguration.SeccompDefault,
)
// process pods and exit.
if runOnce {
if _, err := k.RunOnce(podCfg.Updates()); err != nil {
return fmt.Errorf("runonce failed: %w", err)
}
klog.InfoS("Started kubelet as runonce")
} else {
startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
klog.InfoS("Started kubelet")
}
上面节选的代码内容通过createAndInitKubelet创建了一个满足kubelet.Bootstrap的实例 k。最后通过startKubelet正式运行kubelet服务。createAndInitKubelet中继续跟进可以发现kubelet.Bootstrap接口的其实就是 kubelet.NewMainKubelet 创建的一个Kubelet结构,在 NewMainKubelet 中可以看到Kubelet的整个初始化流程。Kubelet提供的内容如下:
klet := &Kubelet{
hostname: hostname,
hostnameOverridden: hostnameOverridden,
nodeName: nodeName,
kubeClient: kubeDeps.KubeClient,
heartbeatClient: kubeDeps.HeartbeatClient,
onRepeatedHeartbeatFailure: kubeDeps.OnHeartbeatFailure,
rootDirectory: rootDirectory,
resyncInterval: kubeCfg.SyncFrequency.Duration,
sourcesReady: config.NewSourcesReady(kubeDeps.PodConfig.SeenAllSources),
registerNode: registerNode,
registerWithTaints: registerWithTaints,
registerSchedulable: registerSchedulable,
dnsConfigurer: dns.NewConfigurer(kubeDeps.Recorder, nodeRef, nodeIPs, clusterDNS, kubeCfg.ClusterDomain, kubeCfg.ResolverConfig),
serviceLister: serviceLister,
serviceHasSynced: serviceHasSynced,
nodeLister: nodeLister,
nodeHasSynced: nodeHasSynced,
masterServiceNamespace: masterServiceNamespace,
streamingConnectionIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration,
recorder: kubeDeps.Recorder,
cadvisor: kubeDeps.CAdvisorInterface,
cloud: kubeDeps.Cloud,
externalCloudProvider: cloudprovider.IsExternal(cloudProvider),
providerID: providerID,
nodeRef: nodeRef,
nodeLabels: nodeLabels,
nodeStatusUpdateFrequency: kubeCfg.NodeStatusUpdateFrequency.Duration,
nodeStatusReportFrequency: kubeCfg.NodeStatusReportFrequency.Duration,
os: kubeDeps.OSInterface,
oomWatcher: oomWatcher,
cgroupsPerQOS: kubeCfg.CgroupsPerQOS,
cgroupRoot: kubeCfg.CgroupRoot,
mounter: kubeDeps.Mounter,
hostutil: kubeDeps.HostUtil,
subpather: kubeDeps.Subpather,
maxPods: int(kubeCfg.MaxPods),
podsPerCore: int(kubeCfg.PodsPerCore),
syncLoopMonitor: atomic.Value{},
daemonEndpoints: daemonEndpoints,
containerManager: kubeDeps.ContainerManager,
containerRuntimeName: containerRuntime,
nodeIPs: nodeIPs,
nodeIPValidator: validateNodeIP,
clock: clock.RealClock{},
enableControllerAttachDetach: kubeCfg.EnableControllerAttachDetach,
makeIPTablesUtilChains: kubeCfg.MakeIPTablesUtilChains,
iptablesMasqueradeBit: int(kubeCfg.IPTablesMasqueradeBit),
iptablesDropBit: int(kubeCfg.IPTablesDropBit),
experimentalHostUserNamespaceDefaulting: utilfeature.DefaultFeatureGate.Enabled(features.ExperimentalHostUserNamespaceDefaultingGate),
keepTerminatedPodVolumes: keepTerminatedPodVolumes,
nodeStatusMaxImages: nodeStatusMaxImages,
lastContainerStartedTime: newTimeCache(),
}
一些configMap和secret的处理
klet.secretManager = secretManager
klet.configMapManager = configMapManager
Kubelet的Pod管理器
klet.podManager = kubepod.NewBasicPodManager(mirrorPodClient, secretManager, configMapManager)
kubelet也会生成一个针对RunC的管理器。这里klet.containerRuntim是一个kubecontainer.Runtime接口,这个接口被kubeGenericRuntimeManager实现。
runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
klet.livenessManager,
klet.readinessManager,
klet.startupManager,
seccompProfileRoot,
machineInfo,
klet.podWorkers,
kubeDeps.OSInterface,
klet,
httpClient,
imageBackOff,
kubeCfg.SerializeImagePulls,
float32(kubeCfg.RegistryPullQPS),
int(kubeCfg.RegistryBurst),
imageCredentialProviderConfigFile,
imageCredentialProviderBinDir,
kubeCfg.CPUCFSQuota,
kubeCfg.CPUCFSQuotaPeriod,
kubeDeps.RemoteRuntimeService,
kubeDeps.RemoteImageService,
kubeDeps.ContainerManager.InternalContainerLifecycle(),
kubeDeps.dockerLegacyService,
klet.containerLogManager,
klet.runtimeClassManager,
seccompDefault,
kubeCfg.MemorySwap.SwapBehavior,
kubeDeps.ContainerManager.GetNodeAllocatableAbsolute,
*kubeCfg.MemoryThrottlingFactor,
)
if err != nil {
return nil, err
}
klet.containerRuntime = runtime
这里重新将目标专注于 RunKubelet 函数,在初始化完成后会执行下面的内容:
// process pods and exit.
if runOnce {
if _, err := k.RunOnce(podCfg.Updates()); err != nil {
return fmt.Errorf("runonce failed: %w", err)
}
klog.InfoS("Started kubelet as runonce")
} else {
startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
klog.InfoS("Started kubelet")
}
这里我们不考虑runOnce的情况,正常会执行 startKubelet 方法, startKubelet方法内容如下
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
// start the kubelet
go k.Run(podCfg.Updates())
// start the kubelet server
if enableServer {
go k.ListenAndServe(kubeCfg, kubeDeps.TLSOptions, kubeDeps.Auth)
}
if kubeCfg.ReadOnlyPort > 0 {
go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
}
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
go k.ListenAndServePodResources()
}
}
这里的k.Run是一个Bootstrap接口中的执行函数,因为k本质上是一个Kubelet结构,所有这里执行的是Kubelet结构的Run方法,我们找到Kubelet结构的Run方法,Kubelet结构的文件位置在kubernetes\pkg\kubelet\kubelet.go ,这里由于该结构过长不做展示,仅展示其对应的Run方法在kubelet文件的1449行位置,内容如下:
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
if kl.logServer == nil {
kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
}
if kl.kubeClient == nil {
klog.InfoS("No API server defined - no node status update will be sent")
}
// Start the cloud provider sync manager
if kl.cloudResourceSyncManager != nil {
go kl.cloudResourceSyncManager.Run(wait.NeverStop)
}
if err := kl.initializeModules(); err != nil {
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
klog.ErrorS(err, "Failed to initialize internal modules")
os.Exit(1)
}
// Start volume manager
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
if kl.kubeClient != nil {
// Start syncing node status immediately, this may set up things the runtime needs to run.
go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
go kl.fastStatusUpdateOnce()
// start syncing lease
go kl.nodeLeaseController.Run(wait.NeverStop)
}
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
// Set up iptables util rules
if kl.makeIPTablesUtilChains {
kl.initNetworkUtil()
}
// Start component sync loops.
kl.statusManager.Start()
// Start syncing RuntimeClasses if enabled.
if kl.runtimeClassManager != nil {
kl.runtimeClassManager.Start(wait.NeverStop)
}
// Start the pod lifecycle event generator.
kl.pleg.Start()
kl.syncLoop(updates, kl)
}
如上所示,这里可以看到一些重要组件的启动入口如volumeManager、pleg.Start()等,执行完成后最后启动syncLoop方法进行循环,syncLoop方法如下:
for {
if err := kl.runtimeState.runtimeErrors(); err != nil {
klog.ErrorS(err, "Skipping pod synchronization")
// exponential backoff
time.Sleep(duration)
duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
continue
}
// reset backoff if we have a success
duration = base
kl.syncLoopMonitor.Store(kl.clock.Now())
if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
break
}
kl.syncLoopMonitor.Store(kl.clock.Now())
}
继续跟进syncLoopIteration如下:
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
// Update from a config source; dispatch it to the right handler
// callback.
if !open {
klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")
return false
}
switch u.Op {
case kubetypes.ADD:
klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", format.Pods(u.Pods))
// After restarting, kubelet will get all existing pods through
// ADD as if they are new pods. These pods will then go through the
// admission process and *may* be rejected. This can be resolved
// once we have checkpointing.
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", format.Pods(u.Pods))
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", format.Pods(u.Pods))
handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
klog.V(4).InfoS("SyncLoop RECONCILE", "source", u.Source, "pods", format.Pods(u.Pods))
handler.HandlePodReconcile
这里的 syncLoopIteration 不断从各种管道中读取对应的内容,并按照对应的Handler对该事件进行处理。
最后,作者对本文选用的函数名进行一个结构梳理如下:
Reference
[1] https://zhuanlan.zhihu.com/p/111241825
[2] 《Kubelets in Action》
[3] https://kubernetes.io/zh/docs/reference/command-line-tools-reference/kubelet/
[4] https://zhuanlan.zhihu.com/p/338462784
[5] cyberark.com/resources/threat-research-blog/using-kubelet-client-to-attack-the-kubernetes-cluster
[6] https://www.alibabacloud.com/blog/from-confused-to-proficient-always-ready-for-kubernetes-cluster-nodes_595654
Member discussion