Kubernetes 组件分析之DeviceManager
kubernetes组件分析之DeviceManager
Kubernetes 1.8 引入的Device Plugin机制通过扩展的方式支持自定义的各种设备在kubernetes中的集成。其中Device Manager组件是在 kubelet内负责与自定义的设备插件Device Plugin进行交互和设备生命周期的管理。
一、 Device Plugin
Device Plugin 部署
设备供应商可以实现自定义设备插件, 通常可以作为DaemonSet来部署。这里 Device Plugin 将通过 pod 的形式部署在集群的所有节点上,当它检测到当前的节点包含有所需要的设备时,设备插件的服务器就会通过gRPC与kubelet进行通信,监控在spec文件中申请设备资源的设备,并在容器的创建过程中插入对应的设备。
Device Plugin 接口
设备供应商在实现设备插件过程中需要与 kubelet 通信,将自己的服务注册到 kubelet 中的gRPC服务器上,其中注册服务的接口规范在kubernetes community中进行了定义,详细请参加官方接口定义文档。
这里的Device Plugin需要要实现如下两个接口
service DevicePlugin {
// returns a stream of []Device
rpc ListAndWatch(Empty) returns (stream ListAndWatchResponse) {}
rpc Allocate(AllocateRequest) returns (AllocateResponse) {}
}
其中 ListAndWatch 主要负责对设备的状态进行监控,包括设备插入和删除,ListAndWatch 返回给kubelet一个当前存在的设备集合。
Allocate主要在容器创建过程中,设备插件需要提前对一些资源进行申请,如GPU通常需要申请对应的内存并将其清空等操作。
Device Plugin的基本流程
Device Plugin的基本流程分为三个部分,注册、设备监控、设备资源分配如图所示:
- 首先Device Plugin会向kubelet通过gRPC通信发送一个注册请求
- Kubelet返回一个注册响应
- 设备插件开始与Kubelet建立通信
其中设备插件必须在/var/lib/kubelet/device-plugins/
中实现一个socket接口以便于kubelet做gRPC的通信。
Device Plugin 的监控检查
Device Plugin 和 Kubelet保持了一个双端的监控检查,即当Device Plugin 和 Kubelet 有任意一方失败时都能有一套对应的错误处理机制。这个双端的监控检查通过 ListAndWatch 实现。即双方通过gRPC通信收到的结果来判断对方的状态,这里有如下几种情况:
- 当 kubelet 监控到 Device Plugin 出现故障后,kubelet 不会重启那些申请了设备资源的pod。
- 当 kubelet 监控到 Device Plugin 管控下的设备被移除时,kubelet 立刻与api-server 通信保证可用的资源节点数量正确。
- 当 kubelet 处于调用alloc申请设备所需的资源空间时,对应的pod应处于一个fail的状态
- 当kubelet 失败或者正处于重启过程中,Device Plugin 应该随时监控Kubelet状态,并重新建立连接,发送通信请求。
二、 Device Manager
Device Manager的源码入口
Device Manager是由kubernetes\pkg\kubelet\cm\container_manager_linux.go
中Kubelet组件在创建过程中NewContainerManager
创建容器管理时,可以看出整个设备管理是由Container Manager进行管理,并对关键接口进行了必要的封装如下
func (cm *containerManagerImpl) Start(node *v1.Node,
activePods ActivePodsFunc,
sourcesReady config.SourcesReady,
podStatusProvider status.PodStatusProvider,
runtimeService internalapi.RuntimeService) error {
//... 省略部分内容
// 在启动ContainerManager过程中启动了对应的设备管理
if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady); err != nil {
return err
}
//...
}
//对关键Device Manager接口在ContainerManager级别进行再封装
func (cm *containerManagerImpl) GetPluginRegistrationHandler() cache.PluginHandler {
return cm.deviceManager.GetWatcherHandler()
}
func (cm *containerManagerImpl) UpdatePluginResources(node *schedulerframework.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
return cm.deviceManager.UpdatePluginResources(node, attrs)
}
// TODO: 这里将GetReousrce逻辑后续移动到 PodContainerManager 进行管理
func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) {
opts := &kubecontainer.RunContainerOptions{}
// Allocate should already be called during predicateAdmitHandler.Admit(),
// just try to fetch device runtime information from cached state here
devOpts, err := cm.deviceManager.GetDeviceRunContainerOptions(pod, container)
...
opts.Devices = append(opts.Devices, devOpts.Devices...)
opts.Mounts = append(opts.Mounts, devOpts.Mounts...)
opts.Envs = append(opts.Envs, devOpts.Envs...)
...
}
func (cm *containerManagerImpl) GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler {
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager) {
return cm.topologyManager
}
// TODO: we need to think about a better way to do this. This will work for
// now so long as we have only the cpuManager and deviceManager relying on
// allocations here. However, going forward it is not generalized enough to
// work as we add more and more hint providers that the TopologyManager
// needs to call Allocate() on (that may not be directly intstantiated
// inside this component).
return &resourceAllocator{cm.cpuManager, cm.memoryManager, cm.deviceManager}
}
根据是否启用插件创建对应的 Device Manager 如下所示:
if devicePluginEnabled {
cm.deviceManager, err = devicemanager.NewManagerImpl(machineInfo.Topology, cm.topologyManager)
cm.topologyManager.AddHintProvider(cm.deviceManager)
} else {
cm.deviceManager, err = devicemanager.NewManagerStub()
}
if err != nil {
return nil, err
}
其中 Device Manager的核心源码放在 \kubernetes\pkg\kubelet\cm\devicemanager\manager.go
中,NewManagerImpl
关键部分如下:
func newManagerImpl(socketPath string, topology []cadvisorapi.Node, topologyAffinityStore topologymanager.Store) (*ManagerImpl, error) {
klog.V(2).InfoS("Creating Device Plugin manager", "path", socketPath)
if socketPath == "" || !filepath.IsAbs(socketPath) {
return nil, fmt.Errorf(errBadSocket+" %s", socketPath)
}
var numaNodes []int
for _, node := range topology {
numaNodes = append(numaNodes, node.Id)
}
dir, file := filepath.Split(socketPath)
manager := &ManagerImpl{
endpoints: make(map[string]endpointInfo),
socketname: file,
socketdir: dir,
//包含所有的资源名称对应的所有设备
allDevices: NewResourceDeviceInstances(),
//健康设备列表
healthyDevices: make(map[string]sets.String),
unhealthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
// contains pod to allocated device mapping.
podDevices: newPodDevices(),
numaNodes: numaNodes,
topologyAffinityStore: topologyAffinityStore,
devicesToReuse: make(PodReusableDevices),
}
//在这里可以看到manger注册了一个回调函数
manager.callback = manager.genericDeviceUpdateCallback
// The following structures are populated with real implementations in manager.Start()
// Before that, initializes them to perform no-op operations.
manager.activePods = func() []*v1.Pod { return []*v1.Pod{} }
manager.sourcesReady = &sourcesReadyStub{}
// 创建socketPath的checkpoint管理
checkpointManager, err := checkpointmanager.NewCheckpointManager(dir)
if err != nil {
return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err)
}
manager.checkpointManager = checkpointManager
return manager, nil
}
其中对allDevices进行展开,可以看到他是两层的嵌套map,关系如下:
这里可以看到通过健康检查更新healthyDevices列表和unhealthyDdevices列表
func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices []pluginapi.Device) {
m.mutex.Lock()
m.healthyDevices[resourceName] = sets.NewString()
m.unhealthyDevices[resourceName] = sets.NewString()
m.allDevices[resourceName] = make(map[string]pluginapi.Device)
for _, dev := range devices {
m.allDevices[resourceName][dev.ID] = dev
if dev.Health == pluginapi.Healthy {
m.healthyDevices[resourceName].Insert(dev.ID)
} else {
m.unhealthyDevices[resourceName].Insert(dev.ID)
}
}
m.mutex.Unlock()
if err := m.writeCheckpoint(); err != nil {
klog.ErrorS(err, "Writing checkpoint encountered")
}
}
Reference
[1] https://github.com/kubernetes/community/blob/master/contributors/design-proposals/resource-management/device-plugin.md
[2] https://github.com/kubernetes/kubernetes/pull/42116
[3] https://developer.aliyun.com/article/503747
Member discussion