6 min read

Kubernetes 组件分析之DeviceManager

kubernetes组件分析之DeviceManager

Kubernetes 1.8 引入的Device Plugin机制通过扩展的方式支持自定义的各种设备在kubernetes中的集成。其中Device Manager组件是在 kubelet内负责与自定义的设备插件Device Plugin进行交互和设备生命周期的管理。

一、 Device Plugin

Device Plugin 部署

设备供应商可以实现自定义设备插件, 通常可以作为DaemonSet来部署。这里 Device Plugin 将通过 pod 的形式部署在集群的所有节点上,当它检测到当前的节点包含有所需要的设备时,设备插件的服务器就会通过gRPC与kubelet进行通信,监控在spec文件中申请设备资源的设备,并在容器的创建过程中插入对应的设备。

Device Plugin 接口

设备供应商在实现设备插件过程中需要与 kubelet 通信,将自己的服务注册到 kubelet 中的gRPC服务器上,其中注册服务的接口规范在kubernetes community中进行了定义,详细请参加官方接口定义文档。

这里的Device Plugin需要要实现如下两个接口

service DevicePlugin {
	// returns a stream of []Device
	rpc ListAndWatch(Empty) returns (stream ListAndWatchResponse) {}
	rpc Allocate(AllocateRequest) returns (AllocateResponse) {}
}

其中 ListAndWatch 主要负责对设备的状态进行监控,包括设备插入和删除,ListAndWatch 返回给kubelet一个当前存在的设备集合。
Allocate主要在容器创建过程中,设备插件需要提前对一些资源进行申请,如GPU通常需要申请对应的内存并将其清空等操作。

Device Plugin的基本流程

Device Plugin的基本流程分为三个部分,注册、设备监控、设备资源分配如图所示:

  1. 首先Device Plugin会向kubelet通过gRPC通信发送一个注册请求
  2. Kubelet返回一个注册响应
  3. 设备插件开始与Kubelet建立通信

其中设备插件必须在/var/lib/kubelet/device-plugins/中实现一个socket接口以便于kubelet做gRPC的通信。

Device Plugin 的监控检查

Device Plugin 和 Kubelet保持了一个双端的监控检查,即当Device Plugin 和 Kubelet 有任意一方失败时都能有一套对应的错误处理机制。这个双端的监控检查通过 ListAndWatch 实现。即双方通过gRPC通信收到的结果来判断对方的状态,这里有如下几种情况:

  1. 当 kubelet 监控到 Device Plugin 出现故障后,kubelet 不会重启那些申请了设备资源的pod。
  2. 当 kubelet 监控到 Device Plugin 管控下的设备被移除时,kubelet 立刻与api-server 通信保证可用的资源节点数量正确。
  3. 当 kubelet 处于调用alloc申请设备所需的资源空间时,对应的pod应处于一个fail的状态
  4. 当kubelet 失败或者正处于重启过程中,Device Plugin 应该随时监控Kubelet状态,并重新建立连接,发送通信请求。

二、 Device Manager

Device Manager的源码入口
Device Manager是由kubernetes\pkg\kubelet\cm\container_manager_linux.go中Kubelet组件在创建过程中NewContainerManager创建容器管理时,可以看出整个设备管理是由Container Manager进行管理,并对关键接口进行了必要的封装如下

func (cm *containerManagerImpl) Start(node *v1.Node,
	activePods ActivePodsFunc,
	sourcesReady config.SourcesReady,
	podStatusProvider status.PodStatusProvider,
	runtimeService internalapi.RuntimeService) error {
	//... 省略部分内容
	// 在启动ContainerManager过程中启动了对应的设备管理
	if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady); err != nil {
		return err
	}
	//...
}
//对关键Device Manager接口在ContainerManager级别进行再封装
func (cm *containerManagerImpl) GetPluginRegistrationHandler() cache.PluginHandler {
	return cm.deviceManager.GetWatcherHandler()
}

func (cm *containerManagerImpl) UpdatePluginResources(node *schedulerframework.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
	return cm.deviceManager.UpdatePluginResources(node, attrs)
}

// TODO: 这里将GetReousrce逻辑后续移动到 PodContainerManager 进行管理
func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) {
	opts := &kubecontainer.RunContainerOptions{}
	// Allocate should already be called during predicateAdmitHandler.Admit(),
	// just try to fetch device runtime information from cached state here
	devOpts, err := cm.deviceManager.GetDeviceRunContainerOptions(pod, container)
	...
	opts.Devices = append(opts.Devices, devOpts.Devices...)
	opts.Mounts = append(opts.Mounts, devOpts.Mounts...)
	opts.Envs = append(opts.Envs, devOpts.Envs...)
	...
}

func (cm *containerManagerImpl) GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler {
	if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager) {
		return cm.topologyManager
	}
	// TODO: we need to think about a better way to do this. This will work for
	// now so long as we have only the cpuManager and deviceManager relying on
	// allocations here. However, going forward it is not generalized enough to
	// work as we add more and more hint providers that the TopologyManager
	// needs to call Allocate() on (that may not be directly intstantiated
	// inside this component).
	return &resourceAllocator{cm.cpuManager, cm.memoryManager, cm.deviceManager}
}

根据是否启用插件创建对应的 Device Manager 如下所示:

if devicePluginEnabled {
	cm.deviceManager, err = devicemanager.NewManagerImpl(machineInfo.Topology, cm.topologyManager)
	cm.topologyManager.AddHintProvider(cm.deviceManager)
} else {
	cm.deviceManager, err = devicemanager.NewManagerStub()
}
if err != nil {
	return nil, err
}

其中 Device Manager的核心源码放在 \kubernetes\pkg\kubelet\cm\devicemanager\manager.go中,NewManagerImpl关键部分如下:

func newManagerImpl(socketPath string, topology []cadvisorapi.Node, topologyAffinityStore topologymanager.Store) (*ManagerImpl, error) {
	klog.V(2).InfoS("Creating Device Plugin manager", "path", socketPath)

	if socketPath == "" || !filepath.IsAbs(socketPath) {
		return nil, fmt.Errorf(errBadSocket+" %s", socketPath)
	}

	var numaNodes []int
	for _, node := range topology {
		numaNodes = append(numaNodes, node.Id)
	}

	dir, file := filepath.Split(socketPath)
	manager := &ManagerImpl{
		endpoints: make(map[string]endpointInfo),

		socketname:            file,
		socketdir:             dir,
		//包含所有的资源名称对应的所有设备
		allDevices:            NewResourceDeviceInstances(),
		//健康设备列表
		healthyDevices:        make(map[string]sets.String),
		unhealthyDevices:      make(map[string]sets.String),
		allocatedDevices:      make(map[string]sets.String),
		// contains pod to allocated device mapping.
		podDevices:            newPodDevices(),
		numaNodes:             numaNodes,
		topologyAffinityStore: topologyAffinityStore,
		devicesToReuse:        make(PodReusableDevices),
	}
	//在这里可以看到manger注册了一个回调函数
	manager.callback = manager.genericDeviceUpdateCallback

	// The following structures are populated with real implementations in manager.Start()
	// Before that, initializes them to perform no-op operations.
	manager.activePods = func() []*v1.Pod { return []*v1.Pod{} }
	manager.sourcesReady = &sourcesReadyStub{}
	// 创建socketPath的checkpoint管理
	checkpointManager, err := checkpointmanager.NewCheckpointManager(dir)
	if err != nil {
		return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err)
	}
	manager.checkpointManager = checkpointManager

	return manager, nil
}

其中对allDevices进行展开,可以看到他是两层的嵌套map,关系如下:

这里可以看到通过健康检查更新healthyDevices列表和unhealthyDdevices列表

func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices []pluginapi.Device) {
	m.mutex.Lock()
	m.healthyDevices[resourceName] = sets.NewString()
	m.unhealthyDevices[resourceName] = sets.NewString()
	m.allDevices[resourceName] = make(map[string]pluginapi.Device)
	for _, dev := range devices {
		m.allDevices[resourceName][dev.ID] = dev
		if dev.Health == pluginapi.Healthy {
			m.healthyDevices[resourceName].Insert(dev.ID)
		} else {
			m.unhealthyDevices[resourceName].Insert(dev.ID)
		}
	}
	m.mutex.Unlock()
	if err := m.writeCheckpoint(); err != nil {
		klog.ErrorS(err, "Writing checkpoint encountered")
	}
}

Reference

[1] https://github.com/kubernetes/community/blob/master/contributors/design-proposals/resource-management/device-plugin.md
[2] https://github.com/kubernetes/kubernetes/pull/42116
[3] https://developer.aliyun.com/article/503747