9 min read

kubernetes 网络组件分析之kube-proxy

kubernetes 网络组件分析之kube-proxy

1 容器网络基础CNI

1.1 CNI及CNI插件的基本概念:

CNI 是一个容器网络接口(另外一个叫做CNM),依据网络命名空间实现网络协议栈的虚拟化。
CNI插件 是k8s提供的,遵守容器网络接口CNI规范的网络插件,实现对一个集群的通用默认网络。它的主要目的是实现pod节点之间的内部通信,提供对kube-proxy的支持。并提供诸如 hostPort 、流量整形 在使用插件时,需要记住两个 kubelet 命令行参数:

  1. --cni-bin-dir: kubelet 在启动时探测这个目录中的插件,默认是/opt/cni/bin
  2. --network-plugin: 要使用的网络插件来自 cni-bin-dir。 它必须与从插件目录探测到的插件报告的名称匹配。
    两个命令行参数依赖基于docker的容器运行时

1.2 CNI插件的加载流程

基本流程如下:

  1. 首先在每个结点上配置 CNI 配置文件(/etc/cni/net.d/xxnet.conf),其中 xxnet.conf 是某一个网络配置文件的名称,这里的配置文件依赖基于docker的容器运行时。
  2. 或许容器运行时的相关信息,调用 CNI 配置文件中所对应的二进制插件。
  3. 在这个节点上创建 Pod 之后,Kubelet 就会根据 CNI 配置文件执行 CNI 插件配置网络。
  4. 上步执行完之后,Pod 的网络就配置完成了。

1.3 CNI 插件可以分为三种:Overlay、路由及 Underlay


这里对Overlay网络模型介绍(https://draveness.me/whys-the-design-overlay-network/)

1.4 CNI支持的插件对比

Flannel
Flannel在每台主机上运行一个名为 flanneld 的小型二进制代理,并负责从较大的预配置地址空间中分配子网租约给每台主机。Flannel 直接使用 Kubernetes API 或 etcd 存储网络配置、分配的子网和任何辅助数据(如主机的公共IP)。数据包使用多种后端机制之一转发,包括VXLAN和各种云集成SDK等。

Calico:
是一个用于容器、虚拟机和裸机工作负载的开源网络和网络安全解决方案。卡利科使用标准Linux网络工具为云原生应用程序提供两项主要服务:

  • 工作负载之间的网络连接
  • 工作负载之间的网络连接

还有一些其他的CNI插件Canal和Weave等,一些关于CNI插件对比的参考文章如下:

  1. https://www.kubernetes.org.cn/6908.html
  2. https://www.huaweicloud.com/articles/9f8f079a498f7ff238774c9cab5836bc.html
  3. https://segmentfault.com/a/1190000017182169
  4. https://www.huaweicloud.com/articles/9f8f079a498f7ff238774c9cab5836bc.html

1.5 multus-cni 开启多网络平面

Multus CNI是Kubernetes的容器网络接口(CNI)插件,它允许将多个网络接口附加到Pod。通常在Kubernetes中,每个Pod只有一个网络接口。使用Multus-cni,您可以创建一个具有多个接口的Pod。这是由Multus充当“基础插件”来实现的,通过Multus可以调用多个其他CNI插件。Multus CNI通过对kubernets中创建的pod配置多个CNI对应的容器卡接口,使pod拥有不同的网段ip,并且各个CNI生成的内部网络在通常情况下处于相互隔离的状态,因此Multus CNI将pod置于隔离的不同网络平面下。

2 kube-proxy:网络代理与负载均衡

2.1 kube-proxy的基本概念:

Kubernetes 网络代理在每个节点上运行。网络代理反映了每个节点上 Kubernetes API 中定义的服务,并且可以执行简单的 TCP、UDP 和 SCTP 流转发,或者在一组后端进行 循环 TCP、UDP 和 SCTP 转发。 当前可通过 Docker-links-compatible 环境变量找到服务集群 IP 和端口, 这些环境变量指定了服务代理打开的端口。 有一个可选的插件,可以为这些集群 IP 提供集群 DNS。 用户必须使用 apiserver API 创建服务才能配置代理。另外,kube-proxy 是kubernets中提供的一个应用服务。作为服务,它需要依赖与底层基础插件CNI提供的特性从而实现不同pod访问的网络联通。

2.2 kube-proxy 的转发模式:

kube-proxy 的转发模式可以通过启动参数–proxy-mode进行设置,有userspace、iptables、IPVS等可选项,默认采用iptables。

2.3 kube-proxy 源码分析

// cmd/kube-proxy/proxy.go
// command := app.NewProxyCommand()
// Run runs the specified ProxyServer.
func (o *Options) Run() error {
    defer close(o.errCh)
    if len(o.WriteConfigTo) > 0 {
        return o.writeConfigFile()
    }
    //ProxyServer 创建一个Proxy的结构体
    proxyServer, err := NewProxyServer(o)
    if err != nil {
        return err
    }

    if o.CleanupAndExit {
        return proxyServer.CleanupAndExit()
    }

    o.proxyServer = proxyServer
    //开启一个监听器监听event
    return o.runLoop()
}

在NewProxyServer中,这里只显示关键执行代码如下

func newProxyServer(){
    // 根据模式生成四种Proxy代理的接口
    var iptInterface utiliptables.Interface
	var ipvsInterface utilipvs.Interface
	var kernelHandler ipvs.KernelHandler
	var ipsetInterface utilipset.Interface
    // 包含代理服务器的具体操作,监听Endpoint和Service的变化等
    proxier Proxier
    // 生成事件监控器
    Broadcaster
    recorder
    //生成两个通信通道,通过Restful和API Server进行通信
    client 
    eventclient

    return &ProxyServer{
		Client:                 client,
		EventClient:            eventClient,
		IptInterface:           iptInterface,
		IpvsInterface:          ipvsInterface,
		IpsetInterface:         ipsetInterface,
		execer:                 execer,
		Proxier:                proxier,
		Broadcaster:            eventBroadcaster,
		Recorder:               recorder,
		ConntrackConfiguration: config.Conntrack,
		Conntracker:            &realConntracker{},
		ProxyMode:              proxyMode,
		NodeRef:                nodeRef,
		MetricsBindAddress:     config.MetricsBindAddress,
		BindAddressHardFail:    config.BindAddressHardFail,
		EnableProfiling:        config.EnableProfiling,
		OOMScoreAdj:            config.OOMScoreAdj,
		ConfigSyncPeriod:       config.ConfigSyncPeriod.Duration,
		HealthzServer:          healthzServer,
		UseEndpointSlices:      useEndpointSlices,
	}, nil
}

为了生成重要的Proxier结构通过NewProxier函数生成,这里只显示关键执行代码

func NewProxier(...)(*Proxier, error){
    // Proxy needs br_netfilter and bridge-nf-call-iptables=1 when containers
    // are connected to a Linux bridge (but not SDN bridges).  Until most
    // plugins handle this, log when config is missing
    // 这里以iptables中生成proxier操作为例,需要提供必要的Linux网桥支持
    if val, err := sysctl.GetSysctl(sysctlBridgeCallIPTables); err == nil && val != 1 {
        klog.InfoS("Missing br-netfilter module or unset sysctl br-nf-call-iptables; proxy may not work as intended")
    }

    proxier := &Proxier{
        portsMap:                 make(map[utilnet.LocalPort]utilnet.Closeable),
        serviceMap:               make(proxy.ServiceMap),
        //这里生成了一个 ServiceChangeTracker
        /*
        ServiceChangeTracker{
        包含了一个对service改变的map
        items:                   make(map[types.NamespacedName]*serviceChange),
        makeServiceInfo:         makeServiceInfo, 这里是一个生成ServiceInfo的生成函数,
        ServiceInfo中主要包括端口链、防火墙链和LoardBalance链
        recorder:                recorder,
        ipFamily:                ipFamily,
        processServiceMapChange: processServiceMapChange,
        } 
        */
        serviceChanges:           proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
        endpointsMap:             make(proxy.EndpointsMap),
        endpointsChanges:         proxy.NewEndpointChangeTracker(hostname, newEndpointInfo, ipFamily, recorder, nil),
        syncPeriod:               syncPeriod,
        iptables:                 ipt,
        masqueradeAll:            masqueradeAll,
        masqueradeMark:           masqueradeMark,
        exec:                     exec,
        localDetector:            localDetector,
        hostname:                 hostname,
        nodeIP:                   nodeIP,
        portMapper:               &utilnet.ListenPortOpener,
        recorder:                 recorder,
        serviceHealthServer:      serviceHealthServer,
        healthzServer:            healthzServer,
        precomputedProbabilities: make([]string, 0, 1001),
        iptablesData:             bytes.NewBuffer(nil),
        existingFilterChainsData: bytes.NewBuffer(nil),
        filterChains:             bytes.NewBuffer(nil),
        filterRules:              bytes.NewBuffer(nil),
        natChains:                bytes.NewBuffer(nil),
        natRules:                 bytes.NewBuffer(nil),
        nodePortAddresses:        nodePortAddresses,
        networkInterfacer:        utilproxy.RealNetwork{},
    }
    proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, time.Hour, burstSyncs)
}

执行Option的runLoop方法:

// runLoop will watch on the update change of the proxy server's configuration file.
// Return an error when updated
func (o *Options) runLoop() error {
    //第一步 启动Option的watcher,这里的Watcher是一个filesystem.FSWatcher用于监控ConfigFile的更新
    if o.watcher != nil {
        o.watcher.Run()
    }

    // run the proxy in goroutine
    go func() {
        //通过接口执行的ProxyServer的Run方法
        err := o.proxyServer.Run()
        o.errCh <- err
    }()

    for {
        err := <-o.errCh
        if err != nil {
            return err
        }
    }
}

ProxyServer的Run方法如下:

func (s *ProxyServer) Run() error {
    //第一步执行一个OOM调用(OOM是一个保护机制,用于给进程的内存调用打分,用于避免在内存不足的时候不至于出现严重问题,把一些无关的进程优先杀掉)
    if s.OOMScoreAdj != nil {
		oomAdjuster = oom.NewOOMAdjuster()
		if err := oomAdjuster.ApplyOOMScoreAdj(0, int(*s.OOMScoreAdj)); err != nil {
			klog.V(2).Info(err)
		}
	}
    //第二步开启一个Broadcaster对Event进行存储
    if s.Broadcaster != nil && s.EventClient != nil {
		stopCh := make(chan struct{})
		s.Broadcaster.StartRecordingToSink(stopCh)
	}
    ...
}

这里为了更好的对Broadcaster的机制做进一步分析,跳入Broadcaster.StartRecordingToSink函数中观察处理机制,方法关键内容如下:

func (e *eventBroadcasterImpl) StartRecordingToSink(stopCh <-chan struct{}) {
	go wait.Until(e.refreshExistingEventSeries, refreshTime, stopCh)
	go wait.Until(e.finishSeries, finishTime, stopCh)
	e.startRecordingEvents(stopCh)
}

这里继续分析ProxyServer的Run方法,这里仅列出关键部分:

    //标签选择器
    labelSelector := labels.NewSelector()
    labelSelector = labelSelector.Add(*noProxyName, *noHeadlessEndpoints)

    // Make informers that filter out objects that want a non-default service proxy.
    //使用通用的工厂方法
    informerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod,
        informers.WithTweakListOptions(func(options *metav1.ListOptions) {
            options.LabelSelector = labelSelector.String()
        }))

    // Create configs (i.e. Watches for Services and Endpoints or EndpointSlices)
    // Note: RegisterHandler() calls need to happen before creation of Sources because sources
    // only notify on changes, and the initial update (on process start) may be lost if no handlers
    // are registered yet.
    //创建一个ServiceConfig
    serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.ConfigSyncPeriod)
    //将收到的Config变化使用注册的Handler进行处理
    serviceConfig.RegisterEventHandler(s.Proxier)
    go serviceConfig.Run(wait.NeverStop)

    if s.UseEndpointSlices {
        endpointSliceConfig := config.NewEndpointSliceConfig(informerFactory.Discovery().V1().EndpointSlices(), s.ConfigSyncPeriod)
        endpointSliceConfig.RegisterEventHandler(s.Proxier)
        go endpointSliceConfig.Run(wait.NeverStop)
    } else {
        endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod)
        endpointsConfig.RegisterEventHandler(s.Proxier)
        go endpointsConfig.Run(wait.NeverStop)
    }

    // This has to start after the calls to NewServiceConfig and NewEndpointsConfig because those
    // functions must configure their shared informer event handlers first.
    informerFactory.Start(wait.NeverStop)

    if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) {
        // Make an informer that selects for our nodename.
        currentNodeInformerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod,
            informers.WithTweakListOptions(func(options *metav1.ListOptions) {
                options.FieldSelector = fields.OneTermEqualSelector("metadata.name", s.NodeRef.Name).String()
            }))
        nodeConfig := config.NewNodeConfig(currentNodeInformerFactory.Core().V1().Nodes(), s.ConfigSyncPeriod)
        nodeConfig.RegisterEventHandler(s.Proxier)
        go nodeConfig.Run(wait.NeverStop)

        // This has to start after the calls to NewNodeConfig because that must
        // configure the shared informer event handler first.
        currentNodeInformerFactory.Start(wait.NeverStop)
    }

    // Birth Cry after the birth is successful
    s.birthCry()

    go s.Proxier.SyncLoop()

    return <-errCh

Reference

[1] https://kubernetes.io/zh/docs/concepts/extend-kubernetes/compute-storage-net/network-plugins/
[2] https://github.com/kubernetes/kubernetes
[3] https://cizixs.com/2017/04/07/kube-proxy-source-code-analysis/
[4] https://rootdeep.github.io/posts/kube-proxy-code-analysis/