kubernetes 网络组件分析之kube-proxy
kubernetes 网络组件分析之kube-proxy
1 容器网络基础CNI
1.1 CNI及CNI插件的基本概念:
CNI 是一个容器网络接口(另外一个叫做CNM),依据网络命名空间实现网络协议栈的虚拟化。
CNI插件 是k8s提供的,遵守容器网络接口CNI规范的网络插件,实现对一个集群的通用默认网络。它的主要目的是实现pod节点之间的内部通信,提供对kube-proxy的支持。并提供诸如 hostPort 、流量整形 在使用插件时,需要记住两个 kubelet 命令行参数:
--cni-bin-dir: kubelet 在启动时探测这个目录中的插件,默认是/opt/cni/bin
--network-plugin: 要使用的网络插件来自 cni-bin-dir。 它必须与从插件目录探测到的插件报告的名称匹配。
两个命令行参数依赖基于docker的容器运行时
1.2 CNI插件的加载流程
基本流程如下:
- 首先在每个结点上配置 CNI 配置文件(/etc/cni/net.d/xxnet.conf),其中 xxnet.conf 是某一个网络配置文件的名称,这里的配置文件依赖基于docker的容器运行时。
- 或许容器运行时的相关信息,调用 CNI 配置文件中所对应的二进制插件。
- 在这个节点上创建 Pod 之后,Kubelet 就会根据 CNI 配置文件执行 CNI 插件配置网络。
- 上步执行完之后,Pod 的网络就配置完成了。
1.3 CNI 插件可以分为三种:Overlay、路由及 Underlay
这里对Overlay网络模型介绍(https://draveness.me/whys-the-design-overlay-network/)
1.4 CNI支持的插件对比
Flannel:
Flannel在每台主机上运行一个名为 flanneld 的小型二进制代理,并负责从较大的预配置地址空间中分配子网租约给每台主机。Flannel 直接使用 Kubernetes API 或 etcd 存储网络配置、分配的子网和任何辅助数据(如主机的公共IP)。数据包使用多种后端机制之一转发,包括VXLAN和各种云集成SDK等。
Calico:
是一个用于容器、虚拟机和裸机工作负载的开源网络和网络安全解决方案。卡利科使用标准Linux网络工具为云原生应用程序提供两项主要服务:
- 工作负载之间的网络连接
- 工作负载之间的网络连接
还有一些其他的CNI插件Canal和Weave等,一些关于CNI插件对比的参考文章如下:
- https://www.kubernetes.org.cn/6908.html
- https://www.huaweicloud.com/articles/9f8f079a498f7ff238774c9cab5836bc.html
- https://segmentfault.com/a/1190000017182169
- https://www.huaweicloud.com/articles/9f8f079a498f7ff238774c9cab5836bc.html
1.5 multus-cni 开启多网络平面
Multus CNI是Kubernetes的容器网络接口(CNI)插件,它允许将多个网络接口附加到Pod。通常在Kubernetes中,每个Pod只有一个网络接口。使用Multus-cni,您可以创建一个具有多个接口的Pod。这是由Multus充当“基础插件”来实现的,通过Multus可以调用多个其他CNI插件。Multus CNI通过对kubernets中创建的pod配置多个CNI对应的容器卡接口,使pod拥有不同的网段ip,并且各个CNI生成的内部网络在通常情况下处于相互隔离的状态,因此Multus CNI将pod置于隔离的不同网络平面下。
2 kube-proxy:网络代理与负载均衡
2.1 kube-proxy的基本概念:
Kubernetes 网络代理在每个节点上运行。网络代理反映了每个节点上 Kubernetes API 中定义的服务,并且可以执行简单的 TCP、UDP 和 SCTP 流转发,或者在一组后端进行 循环 TCP、UDP 和 SCTP 转发。 当前可通过 Docker-links-compatible 环境变量找到服务集群 IP 和端口, 这些环境变量指定了服务代理打开的端口。 有一个可选的插件,可以为这些集群 IP 提供集群 DNS。 用户必须使用 apiserver API 创建服务才能配置代理。另外,kube-proxy 是kubernets中提供的一个应用服务。作为服务,它需要依赖与底层基础插件CNI提供的特性从而实现不同pod访问的网络联通。
2.2 kube-proxy 的转发模式:
kube-proxy 的转发模式可以通过启动参数–proxy-mode进行设置,有userspace、iptables、IPVS等可选项,默认采用iptables。
2.3 kube-proxy 源码分析
// cmd/kube-proxy/proxy.go
// command := app.NewProxyCommand()
// Run runs the specified ProxyServer.
func (o *Options) Run() error {
defer close(o.errCh)
if len(o.WriteConfigTo) > 0 {
return o.writeConfigFile()
}
//ProxyServer 创建一个Proxy的结构体
proxyServer, err := NewProxyServer(o)
if err != nil {
return err
}
if o.CleanupAndExit {
return proxyServer.CleanupAndExit()
}
o.proxyServer = proxyServer
//开启一个监听器监听event
return o.runLoop()
}
在NewProxyServer中,这里只显示关键执行代码如下
func newProxyServer(){
// 根据模式生成四种Proxy代理的接口
var iptInterface utiliptables.Interface
var ipvsInterface utilipvs.Interface
var kernelHandler ipvs.KernelHandler
var ipsetInterface utilipset.Interface
// 包含代理服务器的具体操作,监听Endpoint和Service的变化等
proxier Proxier
// 生成事件监控器
Broadcaster
recorder
//生成两个通信通道,通过Restful和API Server进行通信
client
eventclient
return &ProxyServer{
Client: client,
EventClient: eventClient,
IptInterface: iptInterface,
IpvsInterface: ipvsInterface,
IpsetInterface: ipsetInterface,
execer: execer,
Proxier: proxier,
Broadcaster: eventBroadcaster,
Recorder: recorder,
ConntrackConfiguration: config.Conntrack,
Conntracker: &realConntracker{},
ProxyMode: proxyMode,
NodeRef: nodeRef,
MetricsBindAddress: config.MetricsBindAddress,
BindAddressHardFail: config.BindAddressHardFail,
EnableProfiling: config.EnableProfiling,
OOMScoreAdj: config.OOMScoreAdj,
ConfigSyncPeriod: config.ConfigSyncPeriod.Duration,
HealthzServer: healthzServer,
UseEndpointSlices: useEndpointSlices,
}, nil
}
为了生成重要的Proxier结构通过NewProxier函数生成,这里只显示关键执行代码
func NewProxier(...)(*Proxier, error){
// Proxy needs br_netfilter and bridge-nf-call-iptables=1 when containers
// are connected to a Linux bridge (but not SDN bridges). Until most
// plugins handle this, log when config is missing
// 这里以iptables中生成proxier操作为例,需要提供必要的Linux网桥支持
if val, err := sysctl.GetSysctl(sysctlBridgeCallIPTables); err == nil && val != 1 {
klog.InfoS("Missing br-netfilter module or unset sysctl br-nf-call-iptables; proxy may not work as intended")
}
proxier := &Proxier{
portsMap: make(map[utilnet.LocalPort]utilnet.Closeable),
serviceMap: make(proxy.ServiceMap),
//这里生成了一个 ServiceChangeTracker
/*
ServiceChangeTracker{
包含了一个对service改变的map
items: make(map[types.NamespacedName]*serviceChange),
makeServiceInfo: makeServiceInfo, 这里是一个生成ServiceInfo的生成函数,
ServiceInfo中主要包括端口链、防火墙链和LoardBalance链
recorder: recorder,
ipFamily: ipFamily,
processServiceMapChange: processServiceMapChange,
}
*/
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointChangeTracker(hostname, newEndpointInfo, ipFamily, recorder, nil),
syncPeriod: syncPeriod,
iptables: ipt,
masqueradeAll: masqueradeAll,
masqueradeMark: masqueradeMark,
exec: exec,
localDetector: localDetector,
hostname: hostname,
nodeIP: nodeIP,
portMapper: &utilnet.ListenPortOpener,
recorder: recorder,
serviceHealthServer: serviceHealthServer,
healthzServer: healthzServer,
precomputedProbabilities: make([]string, 0, 1001),
iptablesData: bytes.NewBuffer(nil),
existingFilterChainsData: bytes.NewBuffer(nil),
filterChains: bytes.NewBuffer(nil),
filterRules: bytes.NewBuffer(nil),
natChains: bytes.NewBuffer(nil),
natRules: bytes.NewBuffer(nil),
nodePortAddresses: nodePortAddresses,
networkInterfacer: utilproxy.RealNetwork{},
}
proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, time.Hour, burstSyncs)
}
执行Option的runLoop方法:
// runLoop will watch on the update change of the proxy server's configuration file.
// Return an error when updated
func (o *Options) runLoop() error {
//第一步 启动Option的watcher,这里的Watcher是一个filesystem.FSWatcher用于监控ConfigFile的更新
if o.watcher != nil {
o.watcher.Run()
}
// run the proxy in goroutine
go func() {
//通过接口执行的ProxyServer的Run方法
err := o.proxyServer.Run()
o.errCh <- err
}()
for {
err := <-o.errCh
if err != nil {
return err
}
}
}
ProxyServer的Run方法如下:
func (s *ProxyServer) Run() error {
//第一步执行一个OOM调用(OOM是一个保护机制,用于给进程的内存调用打分,用于避免在内存不足的时候不至于出现严重问题,把一些无关的进程优先杀掉)
if s.OOMScoreAdj != nil {
oomAdjuster = oom.NewOOMAdjuster()
if err := oomAdjuster.ApplyOOMScoreAdj(0, int(*s.OOMScoreAdj)); err != nil {
klog.V(2).Info(err)
}
}
//第二步开启一个Broadcaster对Event进行存储
if s.Broadcaster != nil && s.EventClient != nil {
stopCh := make(chan struct{})
s.Broadcaster.StartRecordingToSink(stopCh)
}
...
}
这里为了更好的对Broadcaster的机制做进一步分析,跳入Broadcaster.StartRecordingToSink函数中观察处理机制,方法关键内容如下:
func (e *eventBroadcasterImpl) StartRecordingToSink(stopCh <-chan struct{}) {
go wait.Until(e.refreshExistingEventSeries, refreshTime, stopCh)
go wait.Until(e.finishSeries, finishTime, stopCh)
e.startRecordingEvents(stopCh)
}
这里继续分析ProxyServer的Run方法,这里仅列出关键部分:
//标签选择器
labelSelector := labels.NewSelector()
labelSelector = labelSelector.Add(*noProxyName, *noHeadlessEndpoints)
// Make informers that filter out objects that want a non-default service proxy.
//使用通用的工厂方法
informerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod,
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = labelSelector.String()
}))
// Create configs (i.e. Watches for Services and Endpoints or EndpointSlices)
// Note: RegisterHandler() calls need to happen before creation of Sources because sources
// only notify on changes, and the initial update (on process start) may be lost if no handlers
// are registered yet.
//创建一个ServiceConfig
serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.ConfigSyncPeriod)
//将收到的Config变化使用注册的Handler进行处理
serviceConfig.RegisterEventHandler(s.Proxier)
go serviceConfig.Run(wait.NeverStop)
if s.UseEndpointSlices {
endpointSliceConfig := config.NewEndpointSliceConfig(informerFactory.Discovery().V1().EndpointSlices(), s.ConfigSyncPeriod)
endpointSliceConfig.RegisterEventHandler(s.Proxier)
go endpointSliceConfig.Run(wait.NeverStop)
} else {
endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod)
endpointsConfig.RegisterEventHandler(s.Proxier)
go endpointsConfig.Run(wait.NeverStop)
}
// This has to start after the calls to NewServiceConfig and NewEndpointsConfig because those
// functions must configure their shared informer event handlers first.
informerFactory.Start(wait.NeverStop)
if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) {
// Make an informer that selects for our nodename.
currentNodeInformerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod,
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = fields.OneTermEqualSelector("metadata.name", s.NodeRef.Name).String()
}))
nodeConfig := config.NewNodeConfig(currentNodeInformerFactory.Core().V1().Nodes(), s.ConfigSyncPeriod)
nodeConfig.RegisterEventHandler(s.Proxier)
go nodeConfig.Run(wait.NeverStop)
// This has to start after the calls to NewNodeConfig because that must
// configure the shared informer event handler first.
currentNodeInformerFactory.Start(wait.NeverStop)
}
// Birth Cry after the birth is successful
s.birthCry()
go s.Proxier.SyncLoop()
return <-errCh
Reference
[1] https://kubernetes.io/zh/docs/concepts/extend-kubernetes/compute-storage-net/network-plugins/
[2] https://github.com/kubernetes/kubernetes
[3] https://cizixs.com/2017/04/07/kube-proxy-source-code-analysis/
[4] https://rootdeep.github.io/posts/kube-proxy-code-analysis/
Member discussion