9 min read

Kubernetes 源码分析(一) 之kubelet 流程

Kubernetes 组件分析之 kubelet

1.什么是 kubelet?

kubelet 是负责所有运行在工作节点上内容的组件,相当于每个节点上的节点代理,每个节点都会启动 kubelet 进程用来处理 Master 节点下发到本节点的任务。kublet 按照 PodSpec 描述来管理 Pod 和其中的容器(PodSpec 是用来描述一个 pod 的 YAML 或者 JSON 对象)。

2.kubelet 在kubernetes的位置

3.kubelet包含的主要组件

配置同步

特性门控(FeatureGate)

容器管理(Container Manager)

PLEG

CAdvisor&oomWatcher

GCManager

CPUManager

ProberManager

EvictionManager

StatusManager

VolumeManager

PluginManager

ContainerManager

CRI

kubelet主要负责通过grpc与RunC服务器通信,并通过调用CRI接口来与使用RunC提供的相应服务,从而实现对RunC的管理。

4.源码分析

kubelet的执行入口在D:\workspace\go\kubernetes\cmd\kubelet\kubelet.go,具体如下

func main() {
	rand.Seed(time.Now().UnixNano())

	command := app.NewKubeletCommand()
	logs.InitLogs()
	defer logs.FlushLogs()

	if err := command.Execute(); err != nil {
		os.Exit(1)
	}
}

其中通过NewKubeletCommand使用默认参数创建了一个*cobra.Command对象。cobra是一个 Golang 的包,提供了相应的接口用来处理命令行的各种参数输入。NewKubeletCommand函数中除了各种对输入参数的字符检查和配置处理外,主要生成了两个重要的配置结构KubeletServerkubeletDeps,如下所示:

// construct a KubeletServer from kubeletFlags and kubeletConfig
kubeletServer := &options.KubeletServer{
    KubeletFlags:         *kubeletFlags,
    KubeletConfiguration: *kubeletConfig,
}

// use kubeletServer to construct the default KubeletDeps
kubeletDeps, err := UnsecuredDependencies(kubeletServer, utilfeature.DefaultFeatureGate)
if err != nil {
    klog.ErrorS(err, "Failed to construct kubelet dependencies")
    os.Exit(1)
}

其中KubeletServer主要包含KubeletFlagsKubeletConfigurationKubeletFlags中存储的配置信息主要是node所在生命周期中基本不会改变并且不会被其他node共享的信息,例如 hostname 信息等,通常情况下不需要对其添加 flags 或者配置域。KubeletConfiguration则主要包含一些其他节点共享的信息,比如ClusterDomain等。
另外一个重要的配置结构为kubeletDeps, 其中官方对Deps的一个注释解释如下:

Dependencies is a bin for things we might consider "injected dependencies" -- objects constructed at runtime that are necessary for running the Kubelet. This is a temporary solution for grouping these objects while we figure out a more comprehensive dependency injection story for the Kubelet.

kubeletDeps包含认证接口、KubeClient、PodConfig、KubeletConfigController、RemoteRuntimeService、RemoteImageService、ContainerManager 等各种 kubelet 需要协同的依赖组件。
从命令行或者配置文件中初始化配置后开始执行对应的Run方法如下:

if err := Run(ctx, kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate); err != nil {
				klog.ErrorS(err, "Failed to run kubelet")
				os.Exit(1)
			}

继续跟进去

func Run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) error {
	logOption := &logs.Options{Config: s.Logging}
	logOption.Apply()
	// To help debugging, immediately log version
	klog.InfoS("Kubelet version", "kubeletVersion", version.Get())
	if err := initForOS(s.KubeletFlags.WindowsService, s.KubeletFlags.WindowsPriorityClass); err != nil {
		return fmt.Errorf("failed OS init: %w", err)
	}
	if err := run(ctx, s, kubeDeps, featureGate); err != nil {
		return fmt.Errorf("failed to run Kubelet: %w", err)
	}
	return nil
}
func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) {
// Set global feature gates based on the value on the initial KubeletServer
err = utilfeature.DefaultMutableFeatureGate.SetFromMap(s.KubeletConfiguration.FeatureGates)
if err != nil {
    return err
}
// validate the initial KubeletServer (we set feature gates first, because this validation depends on feature gates)
if err := options.ValidateKubeletServer(s); err != nil {
    return err
}

这里的run函数包含了各种服务的创建内容,这里挑选一些重要的位置讲解
首先run函数中根据上下文创建了一个授权认证的服务

if kubeDeps.Auth == nil {
    auth, runAuthenticatorCAReload, err := BuildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration)
    if err != nil {
        return err
    }
    kubeDeps.Auth = auth
    runAuthenticatorCAReload(ctx.Done())
}

一个节点监听服务

if s.HealthzPort > 0 {
    mux := http.NewServeMux()
    healthz.InstallHandler(mux)
    go wait.Until(func() {
        err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), mux)
        if err != nil {
            klog.ErrorS(err, "Failed to start healthz server")
        }
    }, 5*time.Second, wait.NeverStop)
}

一个可选的配置同步服务

kubeDeps.KubeletConfigController.StartSync

一个后台通知服务

go daemon.SdNotify(false, "READY=1")

对RunC的服务做一些基础的初始化,这种初始化会根据是否是docker类型来判断是否需要启动Dockershim。对于RemoteRuntime类型启动RemoteRuntimeService和RemoteImageService。这里的xxxService就是通过grpc建立remoteRuntime连接,然后依据协议RCI接口调用runc的服务。预处理RunC服务成功后执行RunKubelet方法

err = kubelet.PreInitRuntimeService(&s.KubeletConfiguration,
    kubeDeps, &s.ContainerRuntimeOptions,
    s.ContainerRuntime,
    s.RuntimeCgroups,
    s.RemoteRuntimeEndpoint,
    s.RemoteImageEndpoint,
    s.NonMasqueradeCIDR)
if err != nil {
    return err
}

if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
    return err
}

这里对RunKubelet方法进行,节选后展示如下:

k, err := createAndInitKubelet(&kubeServer.KubeletConfiguration,
		kubeDeps,
		&kubeServer.ContainerRuntimeOptions,
		kubeServer.ContainerRuntime,
		hostname,
		hostnameOverridden,
		nodeName,
		nodeIPs,
		kubeServer.ProviderID,
		kubeServer.CloudProvider,
		kubeServer.CertDirectory,
		kubeServer.RootDirectory,
		kubeServer.ImageCredentialProviderConfigFile,
		kubeServer.ImageCredentialProviderBinDir,
		kubeServer.RegisterNode,
		kubeServer.RegisterWithTaints,
		kubeServer.AllowedUnsafeSysctls,
		kubeServer.ExperimentalMounterPath,
		kubeServer.KernelMemcgNotification,
		kubeServer.ExperimentalCheckNodeCapabilitiesBeforeMount,
		kubeServer.ExperimentalNodeAllocatableIgnoreEvictionThreshold,
		kubeServer.MinimumGCAge,
		kubeServer.MaxPerPodContainerCount,
		kubeServer.MaxContainerCount,
		kubeServer.MasterServiceNamespace,
		kubeServer.RegisterSchedulable,
		kubeServer.KeepTerminatedPodVolumes,
		kubeServer.NodeLabels,
		kubeServer.SeccompProfileRoot,
		kubeServer.NodeStatusMaxImages,
		kubeServer.KubeletFlags.SeccompDefault || kubeServer.KubeletConfiguration.SeccompDefault,
	)
// process pods and exit.
	if runOnce {
		if _, err := k.RunOnce(podCfg.Updates()); err != nil {
			return fmt.Errorf("runonce failed: %w", err)
		}
		klog.InfoS("Started kubelet as runonce")
	} else {
		startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
		klog.InfoS("Started kubelet")
	}

上面节选的代码内容通过createAndInitKubelet创建了一个满足kubelet.Bootstrap的实例 k。最后通过startKubelet正式运行kubelet服务。createAndInitKubelet中继续跟进可以发现kubelet.Bootstrap接口的其实就是 kubelet.NewMainKubelet 创建的一个Kubelet结构,在 NewMainKubelet 中可以看到Kubelet的整个初始化流程。Kubelet提供的内容如下:

klet := &Kubelet{
		hostname:                                hostname,
		hostnameOverridden:                      hostnameOverridden,
		nodeName:                                nodeName,
		kubeClient:                              kubeDeps.KubeClient,
		heartbeatClient:                         kubeDeps.HeartbeatClient,
		onRepeatedHeartbeatFailure:              kubeDeps.OnHeartbeatFailure,
		rootDirectory:                           rootDirectory,
		resyncInterval:                          kubeCfg.SyncFrequency.Duration,
		sourcesReady:                            config.NewSourcesReady(kubeDeps.PodConfig.SeenAllSources),
		registerNode:                            registerNode,
		registerWithTaints:                      registerWithTaints,
		registerSchedulable:                     registerSchedulable,
		dnsConfigurer:                           dns.NewConfigurer(kubeDeps.Recorder, nodeRef, nodeIPs, clusterDNS, kubeCfg.ClusterDomain, kubeCfg.ResolverConfig),
		serviceLister:                           serviceLister,
		serviceHasSynced:                        serviceHasSynced,
		nodeLister:                              nodeLister,
		nodeHasSynced:                           nodeHasSynced,
		masterServiceNamespace:                  masterServiceNamespace,
		streamingConnectionIdleTimeout:          kubeCfg.StreamingConnectionIdleTimeout.Duration,
		recorder:                                kubeDeps.Recorder,
		cadvisor:                                kubeDeps.CAdvisorInterface,
		cloud:                                   kubeDeps.Cloud,
		externalCloudProvider:                   cloudprovider.IsExternal(cloudProvider),
		providerID:                              providerID,
		nodeRef:                                 nodeRef,
		nodeLabels:                              nodeLabels,
		nodeStatusUpdateFrequency:               kubeCfg.NodeStatusUpdateFrequency.Duration,
		nodeStatusReportFrequency:               kubeCfg.NodeStatusReportFrequency.Duration,
		os:                                      kubeDeps.OSInterface,
		oomWatcher:                              oomWatcher,
		cgroupsPerQOS:                           kubeCfg.CgroupsPerQOS,
		cgroupRoot:                              kubeCfg.CgroupRoot,
		mounter:                                 kubeDeps.Mounter,
		hostutil:                                kubeDeps.HostUtil,
		subpather:                               kubeDeps.Subpather,
		maxPods:                                 int(kubeCfg.MaxPods),
		podsPerCore:                             int(kubeCfg.PodsPerCore),
		syncLoopMonitor:                         atomic.Value{},
		daemonEndpoints:                         daemonEndpoints,
		containerManager:                        kubeDeps.ContainerManager,
		containerRuntimeName:                    containerRuntime,
		nodeIPs:                                 nodeIPs,
		nodeIPValidator:                         validateNodeIP,
		clock:                                   clock.RealClock{},
		enableControllerAttachDetach:            kubeCfg.EnableControllerAttachDetach,
		makeIPTablesUtilChains:                  kubeCfg.MakeIPTablesUtilChains,
		iptablesMasqueradeBit:                   int(kubeCfg.IPTablesMasqueradeBit),
		iptablesDropBit:                         int(kubeCfg.IPTablesDropBit),
		experimentalHostUserNamespaceDefaulting: utilfeature.DefaultFeatureGate.Enabled(features.ExperimentalHostUserNamespaceDefaultingGate),
		keepTerminatedPodVolumes:                keepTerminatedPodVolumes,
		nodeStatusMaxImages:                     nodeStatusMaxImages,
		lastContainerStartedTime:                newTimeCache(),
	}

一些configMap和secret的处理

        klet.secretManager = secretManager
	klet.configMapManager = configMapManager

Kubelet的Pod管理器

klet.podManager = kubepod.NewBasicPodManager(mirrorPodClient, secretManager, configMapManager)

kubelet也会生成一个针对RunC的管理器。这里klet.containerRuntim是一个kubecontainer.Runtime接口,这个接口被kubeGenericRuntimeManager实现。

runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
    kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
    klet.livenessManager,
    klet.readinessManager,
    klet.startupManager,
    seccompProfileRoot,
    machineInfo,
    klet.podWorkers,
    kubeDeps.OSInterface,
    klet,
    httpClient,
    imageBackOff,
    kubeCfg.SerializeImagePulls,
    float32(kubeCfg.RegistryPullQPS),
    int(kubeCfg.RegistryBurst),
    imageCredentialProviderConfigFile,
    imageCredentialProviderBinDir,
    kubeCfg.CPUCFSQuota,
    kubeCfg.CPUCFSQuotaPeriod,
    kubeDeps.RemoteRuntimeService,
    kubeDeps.RemoteImageService,
    kubeDeps.ContainerManager.InternalContainerLifecycle(),
    kubeDeps.dockerLegacyService,
    klet.containerLogManager,
    klet.runtimeClassManager,
    seccompDefault,
    kubeCfg.MemorySwap.SwapBehavior,
    kubeDeps.ContainerManager.GetNodeAllocatableAbsolute,
    *kubeCfg.MemoryThrottlingFactor,
)
if err != nil {
    return nil, err
}
klet.containerRuntime = runtime

这里重新将目标专注于 RunKubelet 函数,在初始化完成后会执行下面的内容:

// process pods and exit.
if runOnce {
    if _, err := k.RunOnce(podCfg.Updates()); err != nil {
        return fmt.Errorf("runonce failed: %w", err)
    }
    klog.InfoS("Started kubelet as runonce")
} else {
    startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
    klog.InfoS("Started kubelet")
}

这里我们不考虑runOnce的情况,正常会执行 startKubelet 方法, startKubelet方法内容如下

func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
	// start the kubelet
	go k.Run(podCfg.Updates())

	// start the kubelet server
	if enableServer {
		go k.ListenAndServe(kubeCfg, kubeDeps.TLSOptions, kubeDeps.Auth)
	}
	if kubeCfg.ReadOnlyPort > 0 {
		go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
	}
	if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
		go k.ListenAndServePodResources()
	}
}

这里的k.Run是一个Bootstrap接口中的执行函数,因为k本质上是一个Kubelet结构,所有这里执行的是Kubelet结构的Run方法,我们找到Kubelet结构的Run方法,Kubelet结构的文件位置在kubernetes\pkg\kubelet\kubelet.go ,这里由于该结构过长不做展示,仅展示其对应的Run方法在kubelet文件的1449行位置,内容如下:

func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
	if kl.logServer == nil {
		kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
	}
	if kl.kubeClient == nil {
		klog.InfoS("No API server defined - no node status update will be sent")
	}

	// Start the cloud provider sync manager
	if kl.cloudResourceSyncManager != nil {
		go kl.cloudResourceSyncManager.Run(wait.NeverStop)
	}

	if err := kl.initializeModules(); err != nil {
		kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
		klog.ErrorS(err, "Failed to initialize internal modules")
		os.Exit(1)
	}

	// Start volume manager
	go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)

	if kl.kubeClient != nil {
		// Start syncing node status immediately, this may set up things the runtime needs to run.
		go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
		go kl.fastStatusUpdateOnce()

		// start syncing lease
		go kl.nodeLeaseController.Run(wait.NeverStop)
	}
	go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)

	// Set up iptables util rules
	if kl.makeIPTablesUtilChains {
		kl.initNetworkUtil()
	}

	// Start component sync loops.
	kl.statusManager.Start()

	// Start syncing RuntimeClasses if enabled.
	if kl.runtimeClassManager != nil {
		kl.runtimeClassManager.Start(wait.NeverStop)
	}

	// Start the pod lifecycle event generator.
	kl.pleg.Start()
	kl.syncLoop(updates, kl)
}

如上所示,这里可以看到一些重要组件的启动入口如volumeManager、pleg.Start()等,执行完成后最后启动syncLoop方法进行循环,syncLoop方法如下:

for {
		if err := kl.runtimeState.runtimeErrors(); err != nil {
			klog.ErrorS(err, "Skipping pod synchronization")
			// exponential backoff
			time.Sleep(duration)
			duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
			continue
		}
		// reset backoff if we have a success
		duration = base

		kl.syncLoopMonitor.Store(kl.clock.Now())
		if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
			break
		}
		kl.syncLoopMonitor.Store(kl.clock.Now())
	}

继续跟进syncLoopIteration如下:

func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
	syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
	select {
	case u, open := <-configCh:
		// Update from a config source; dispatch it to the right handler
		// callback.
		if !open {
			klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")
			return false
		}

		switch u.Op {
		case kubetypes.ADD:
			klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", format.Pods(u.Pods))
			// After restarting, kubelet will get all existing pods through
			// ADD as if they are new pods. These pods will then go through the
			// admission process and *may* be rejected. This can be resolved
			// once we have checkpointing.
			handler.HandlePodAdditions(u.Pods)
		case kubetypes.UPDATE:
			klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", format.Pods(u.Pods))
			handler.HandlePodUpdates(u.Pods)
		case kubetypes.REMOVE:
			klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", format.Pods(u.Pods))
			handler.HandlePodRemoves(u.Pods)
		case kubetypes.RECONCILE:
			klog.V(4).InfoS("SyncLoop RECONCILE", "source", u.Source, "pods", format.Pods(u.Pods))
			handler.HandlePodReconcile

这里的 syncLoopIteration 不断从各种管道中读取对应的内容,并按照对应的Handler对该事件进行处理。

最后,作者对本文选用的函数名进行一个结构梳理如下:

Reference

[1] https://zhuanlan.zhihu.com/p/111241825
[2] 《Kubelets in Action》
[3] https://kubernetes.io/zh/docs/reference/command-line-tools-reference/kubelet/
[4] https://zhuanlan.zhihu.com/p/338462784
[5] cyberark.com/resources/threat-research-blog/using-kubelet-client-to-attack-the-kubernetes-cluster
[6] https://www.alibabacloud.com/blog/from-confused-to-proficient-always-ready-for-kubernetes-cluster-nodes_595654