Bootstrap

kubelet之volume manager源码分析

kubernetes ceph-csi 分析 - 目录导航:

基于tag v1.17.4

https://github.com/kubernetes/kubernetes/releases/tag/v1.17.4

概述

volume manager存在于kubelet中,主要是管理存储卷的attach/detach(与AD controller作用相同,通过kubelet启动参数控制哪个组件来做该操作,后续会详细介绍)、mount/umount等操作。

简介

容器的存储挂载分为两大步:

(1)attach;

(2)mount。

解除容器存储挂载分为两大步:

(1)umount;

(2)detach。

attach/detach操作可以由kube-controller-manager或者kubelet中的volume manager来完成,根据启动参数来决定;而mount/umount操作只由kubelet中的volume manager来完成。

VolumeManager接口

(1)运行在kubelet 里让存储Ready的部件,主要是mount/unmount(attach/detach可选);

(2)pod调度到这个node上后才会有卷的相应操作,所以它的触发端是kubelet(严格讲是kubelet里的pod manager),根据Pod Manager里pod spec里申明的存储来触发卷的挂载操作;

(3)Kubelet会监听到调度到该节点上的pod声明,会把pod缓存到Pod Manager中,VolumeManager通过Pod Manager获取PV/PVC的状态,并进行分析出具体的attach/detach、mount/umount, 操作然后调用plugin进行相应的业务处理。

// pkg/kubelet/volumemanager/volume_manager.go

// VolumeManager runs a set of asynchronous loops that figure out which volumes
// need to be attached/mounted/unmounted/detached based on the pods scheduled on
// this node and makes it so.
type VolumeManager interface {
  // Starts the volume manager and all the asynchronous loops that it controls
  Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})

  // WaitForAttachAndMount processes the volumes referenced in the specified
  // pod and blocks until they are all attached and mounted (reflected in
  // actual state of the world).
  // An error is returned if all volumes are not attached and mounted within
  // the duration defined in podAttachAndMountTimeout.
  WaitForAttachAndMount(pod *v1.Pod) error

  // GetMountedVolumesForPod returns a VolumeMap containing the volumes
  // referenced by the specified pod that are successfully attached and
  // mounted. The key in the map is the OuterVolumeSpecName (i.e.
  // pod.Spec.Volumes[x].Name). It returns an empty VolumeMap if pod has no
  // volumes.
  GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap

  // GetExtraSupplementalGroupsForPod returns a list of the extra
  // supplemental groups for the Pod. These extra supplemental groups come
  // from annotations on persistent volumes that the pod depends on.
  GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64

  // GetVolumesInUse returns a list of all volumes that implement the volume.Attacher
  // interface and are currently in use according to the actual and desired
  // state of the world caches. A volume is considered "in use" as soon as it
  // is added to the desired state of world, indicating it *should* be
  // attached to this node and remains "in use" until it is removed from both
  // the desired state of the world and the actual state of the world, or it
  // has been unmounted (as indicated in actual state of world).
  GetVolumesInUse() []v1.UniqueVolumeName

  // ReconcilerStatesHasBeenSynced returns true only after the actual states in reconciler
  // has been synced at least once after kubelet starts so that it is safe to update mounted
  // volume list retrieved from actual state.
  ReconcilerStatesHasBeenSynced() bool

  // VolumeIsAttached returns true if the given volume is attached to this
  // node.
  VolumeIsAttached(volumeName v1.UniqueVolumeName) bool

  // Marks the specified volume as having successfully been reported as "in
  // use" in the nodes's volume status.
  MarkVolumesAsReportedInUse(volumesReportedAsInUse []v1.UniqueVolumeName)
}

两个关键结构体

(1)desiredStateOfWorld: 集群中期望要达到的数据卷挂载状态,简称DSW。假设集群内新调度了一个Pod,此时要用到volume,Pod被分配到某节点NodeA上。 此时,对于AD controller来说,DSW中节点NodeA应该有被分配的volume在准备被这个Pod挂载。

(2)actualStateOfWorld: 集群中实际存在的数据卷挂载状态,简称ASW。实际状态未必是和期望状态一样,比如实际状态Node上有刚调度过来的Pod,但是还没有相应已经attached状态的volume。

actualStateOfWorld相关结构体

actualStateOfWorld

实际存储挂载状态结构体。

actualStateOfWorld: 实际存储挂载状态,简称ASW。包括了已经成功挂载到node节点的存储,以及已经成功挂载该存储的pod列表。

主要属性attachedVolumes,数据结构map,key为已经成功挂载到node的存储名称,value为已经成功挂载到node节点的存储信息。

// pkg/kubelet/volumemanager/cache/actual_state_of_world.go
type actualStateOfWorld struct {
  // nodeName is the name of this node. This value is passed to Attach/Detach
  nodeName types.NodeName

  // attachedVolumes is a map containing the set of volumes the kubelet volume
  // manager believes to be successfully attached to this node. Volume types
  // that do not implement an attacher interface are assumed to be in this
  // state by default.
  // The key in this map is the name of the volume and the value is an object
  // containing more information about the attached volume.
  attachedVolumes map[v1.UniqueVolumeName]attachedVolume

  // volumePluginMgr is the volume plugin manager used to create volume
  // plugin objects.
  volumePluginMgr *volume.VolumePluginMgr
  sync.RWMutex
}

attachedVolume

主要属性mountedPods,数据结构map,key为pod名称,value为已经成功挂载了该存储的pod列表。

// pkg/kubelet/volumemanager/cache/actual_state_of_world.go
// attachedVolume represents a volume the kubelet volume manager believes to be
// successfully attached to a node it is managing. Volume types that do not
// implement an attacher are assumed to be in this state.
type attachedVolume struct {
  // volumeName contains the unique identifier for this volume.
  volumeName v1.UniqueVolumeName

  // mountedPods is a map containing the set of pods that this volume has been
  // successfully mounted to. The key in this map is the name of the pod and
  // the value is a mountedPod object containing more information about the
  // pod.
  mountedPods map[volumetypes.UniquePodName]mountedPod

  // spec is the volume spec containing the specification for this volume.
  // Used to generate the volume plugin object, and passed to plugin methods.
  // In particular, the Unmount method uses spec.Name() as the volumeSpecName
  // in the mount path:
  // /var/lib/kubelet/pods/{podUID}/volumes/{escapeQualifiedPluginName}/{volumeSpecName}/
  spec *volume.Spec

  // pluginName is the Unescaped Qualified name of the volume plugin used to
  // attach and mount this volume. It is stored separately in case the full
  // volume spec (everything except the name) can not be reconstructed for a
  // volume that should be unmounted (which would be the case for a mount path
  // read from disk without a full volume spec).
  pluginName string

  // pluginIsAttachable indicates the volume plugin used to attach and mount
  // this volume implements the volume.Attacher interface
  pluginIsAttachable bool

  // globallyMounted indicates that the volume is mounted to the underlying
  // device at a global mount point. This global mount point must be unmounted
  // prior to detach.
  globallyMounted bool

  // devicePath contains the path on the node where the volume is attached for
  // attachable volumes
  devicePath string

  // deviceMountPath contains the path on the node where the device should
  // be mounted after it is attached.
  deviceMountPath string
}

mountedPod

pod相关信息。

// pkg/kubelet/volumemanager/cache/actual_state_of_world.go
// The mountedPod object represents a pod for which the kubelet volume manager
// believes the underlying volume has been successfully been mounted.
type mountedPod struct {
  // the name of the pod
  podName volumetypes.UniquePodName

  // the UID of the pod
  podUID types.UID

  // mounter used to mount
  mounter volume.Mounter

  // mapper used to block volumes support
  blockVolumeMapper volume.BlockVolumeMapper

  // spec is the volume spec containing the specification for this volume.
  // Used to generate the volume plugin object, and passed to plugin methods.
  // In particular, the Unmount method uses spec.Name() as the volumeSpecName
  // in the mount path:
  // /var/lib/kubelet/pods/{podUID}/volumes/{escapeQualifiedPluginName}/{volumeSpecName}/
  volumeSpec *volume.Spec

  // outerVolumeSpecName is the volume.Spec.Name() of the volume as referenced
  // directly in the pod. If the volume was referenced through a persistent
  // volume claim, this contains the volume.Spec.Name() of the persistent
  // volume claim
  outerVolumeSpecName string

  // remountRequired indicates the underlying volume has been successfully
  // mounted to this pod but it should be remounted to reflect changes in the
  // referencing pod.
  // Atomically updating volumes depend on this to update the contents of the
  // volume. All volume mounting calls should be idempotent so a second mount
  // call for volumes that do not need to update contents should not fail.
  remountRequired bool

  // volumeGidValue contains the value of the GID annotation, if present.
  volumeGidValue string

  // fsResizeRequired indicates the underlying volume has been successfully
  // mounted to this pod but its size has been expanded after that.
  fsResizeRequired bool
}

desiredStateOfWorld相关结构体

desiredStateOfWorld

期望存储挂载状态结构体。

desiredStateOfWorld: 期望的存储挂载状态,简称DSW。包括了期望挂载到node节点的存储,以及期望挂载该存储的pod列表。

主要属性volumesToMount,数据结构map,key为期望挂载到该node节点的存储,value为该存储相关信息。

// pkg/kubelet/volumemanager/cache/actual_state_of_world.go
type desiredStateOfWorld struct {
  // volumesToMount is a map containing the set of volumes that should be
  // attached to this node and mounted to the pods referencing it. The key in
  // the map is the name of the volume and the value is a volume object
  // containing more information about the volume.
  volumesToMount map[v1.UniqueVolumeName]volumeToMount
  // volumePluginMgr is the volume plugin manager used to create volume
  // plugin objects.
  volumePluginMgr *volume.VolumePluginMgr
  // podErrors are errors caught by desiredStateOfWorldPopulator about volumes for a given pod.
  podErrors map[types.UniquePodName]sets.String

  sync.RWMutex
}

volumeToMount

主要属性podsToMount,数据结构map,key为pod名称,value为期望挂载该存储的所有pod的相关信息。

// pkg/kubelet/volumemanager/cache/actual_state_of_world.go
// The volume object represents a volume that should be attached to this node,
// and mounted to podsToMount.
type volumeToMount struct {
  // volumeName contains the unique identifier for this volume.
  volumeName v1.UniqueVolumeName

  // podsToMount is a map containing the set of pods that reference this
  // volume and should mount it once it is attached. The key in the map is
  // the name of the pod and the value is a pod object containing more
  // information about the pod.
  podsToMount map[types.UniquePodName]podToMount

  // pluginIsAttachable indicates that the plugin for this volume implements
  // the volume.Attacher interface
  pluginIsAttachable bool

  // pluginIsDeviceMountable indicates that the plugin for this volume implements
  // the volume.DeviceMounter interface
  pluginIsDeviceMountable bool

  // volumeGidValue contains the value of the GID annotation, if present.
  volumeGidValue string

  // reportedInUse indicates that the volume was successfully added to the
  // VolumesInUse field in the node's status.
  reportedInUse bool

  // desiredSizeLimit indicates the desired upper bound on the size of the volume
  // (if so implemented)
  desiredSizeLimit *resource.Quantity
}

podToMount

podToMount结构体主要记录了pod信息。

// pkg/kubelet/volumemanager/cache/actual_state_of_world.go
// The pod object represents a pod that references the underlying volume and
// should mount it once it is attached.
type podToMount struct {
  // podName contains the name of this pod.
  podName types.UniquePodName

  // Pod to mount the volume to. Used to create NewMounter.
  pod *v1.Pod

  // volume spec containing the specification for this volume. Used to
  // generate the volume plugin object, and passed to plugin methods.
  // For non-PVC volumes this is the same as defined in the pod object. For
  // PVC volumes it is from the dereferenced PV object.
  volumeSpec *volume.Spec

  // outerVolumeSpecName is the volume.Spec.Name() of the volume as referenced
  // directly in the pod. If the volume was referenced through a persistent
  // volume claim, this contains the volume.Spec.Name() of the persistent
  // volume claim
  outerVolumeSpecName string
}

方法入口分析

kubelet管理volume的方式基于两个不同的状态:

(1)DesiredStateOfWorld:预期中,volume的挂载情况,简称预期状态。从pod对象中获取预期状态;

(2)ActualStateOfWorld:实际中,voluem的挂载情况,简称实际状态。从node.Status.VolumesAttached获取实际状态,并根据调谐更新实际状态。

Run方法中主要包含了2个方法:

(1)vm.desiredStateOfWorldPopulator.Run

(2)vm.reconciler.Run

下面将一一分析。

// pkg/kubelet/volumemanager/volume_manager.go
func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
  defer runtime.HandleCrash()
    
    // 从apiserver同步pod信息,来更新DesiredStateOfWorld
  go vm.desiredStateOfWorldPopulator.Run(sourcesReady, stopCh)
  klog.V(2).Infof("The desired_state_of_world populator starts")

  klog.Infof("Starting Kubelet Volume Manager")
  // 预期状态和实际状态的协调者,负责调整实际状态至预期状态
  go vm.reconciler.Run(stopCh)

  metrics.Register(vm.actualStateOfWorld, vm.desiredStateOfWorld, vm.volumePluginMgr)

  if vm.kubeClient != nil {
    // start informer for CSIDriver
    vm.volumePluginMgr.Run(stopCh)
  }

  <-stopCh
  klog.Infof("Shutting down Kubelet Volume Manager")
}

1 vm.desiredStateOfWorldPopulator.Run

主要逻辑为调用dswp.populatorLoop(),根据pod的volume信息,来更新DesiredStateOfWorld;并且不断循环调用该方法,来不断更新DesiredStateOfWorld。

dswp.loopSleepDuration的值为100ms。

// pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go
func (dswp *desiredStateOfWorldPopulator) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
  // Wait for the completion of a loop that started after sources are all ready, then set hasAddedPods accordingly
  klog.Infof("Desired state populator starts to run")
  wait.PollUntil(dswp.loopSleepDuration, func() (bool, error) {
    done := sourcesReady.AllReady()
    dswp.populatorLoop()
    return done, nil
  }, stopCh)
  dswp.hasAddedPodsLock.Lock()
  dswp.hasAddedPods = true
  dswp.hasAddedPodsLock.Unlock()
  wait.Until(dswp.populatorLoop, dswp.loopSleepDuration, stopCh)
}

1.1 dswp.populatorLoop()

两个关键方法:

dswp.findAndAddNewPods():添加volume进DesiredStateOfWorld;

dswp.findAndRemoveDeletedPods():从DesiredStateOfWorld删除volume。

// pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go
func (dswp *desiredStateOfWorldPopulator) populatorLoop() {
  dswp.findAndAddNewPods()

  // findAndRemoveDeletedPods() calls out to the container runtime to
  // determine if the containers for a given pod are terminated. This is
  // an expensive operation, therefore we limit the rate that
  // findAndRemoveDeletedPods() is called independently of the main
  // populator loop.
  if time.Since(dswp.timeOfLastGetPodStatus) < dswp.getPodStatusRetryDuration {
    klog.V(5).Infof(
      "Skipping findAndRemoveDeletedPods(). Not permitted until %v (getPodStatusRetryDuration %v).",
      dswp.timeOfLastGetPodStatus.Add(dswp.getPodStatusRetryDuration),
      dswp.getPodStatusRetryDuration)

    return
  }

  dswp.findAndRemoveDeletedPods()
}

1.1.1 dswp.findAndAddNewPods()

主要逻辑:

(1)如果kubelet开启了features.ExpandInUsePersistentVolumes,处理一下map mountedVolumesForPod,用于后续处理标记存储扩容逻辑;

(2)遍历pod列表,调用dswp.processPodVolumes将pod中的volume添加到DesiredStateOfWorld(pvc已经处于bound状态的volume才会添加到DesiredStateOfWorld);同时processPodVolumes方法里有标记某存储是否需要扩容的逻辑,用于后续触发存储扩容操作。

// pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go
// Iterate through all pods and add to desired state of world if they don't
// exist but should
func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() {
  // Map unique pod name to outer volume name to MountedVolume.
  mountedVolumesForPod := make(map[volumetypes.UniquePodName]map[string]cache.MountedVolume)
  if utilfeature.DefaultFeatureGate.Enabled(features.ExpandInUsePersistentVolumes) {
    for _, mountedVolume := range dswp.actualStateOfWorld.GetMountedVolumes() {
      mountedVolumes, exist := mountedVolumesForPod[mountedVolume.PodName]
      if !exist {
        mountedVolumes = make(map[string]cache.MountedVolume)
        mountedVolumesForPod[mountedVolume.PodName] = mountedVolumes
      }
      mountedVolumes[mountedVolume.OuterVolumeSpecName] = mountedVolume
    }
  }

  processedVolumesForFSResize := sets.NewString()
  for _, pod := range dswp.podManager.GetPods() {
      // pod如果terminated了,则跳过该pod的volume
    if dswp.isPodTerminated(pod) {
      // Do not (re)add volumes for terminated pods
      continue
    }
    // 将pod中的volume添加到 the desired state of the world
    dswp.processPodVolumes(pod, mountedVolumesForPod, processedVolumesForFSResize)
  }
}

func (dswp *desiredStateOfWorldPopulator) isPodTerminated(pod *v1.Pod) bool {
  podStatus, found := dswp.podStatusProvider.GetPodStatus(pod.UID)
  if !found {
    podStatus = pod.Status
  }
  return util.IsPodTerminated(pod, podStatus)
}

pod属于terminated的判断看下列代码:

// pkg/volume/util/util.go
// IsPodTerminated checks if pod is terminated
func IsPodTerminated(pod *v1.Pod, podStatus v1.PodStatus) bool {
  return podStatus.Phase == v1.PodFailed || podStatus.Phase == v1.PodSucceeded || (pod.DeletionTimestamp != nil && notRunning(podStatus.ContainerStatuses))
}

// notRunning returns true if every status is terminated or waiting, or the status list
// is empty.
func notRunning(statuses []v1.ContainerStatus) bool {
  for _, status := range statuses {
    if status.State.Terminated == nil && status.State.Waiting == nil {
      return false
    }
  }
  return true
}

dswp.processPodVolumes()主要逻辑:

(1)调用dswp.podPreviouslyProcessed判断指定pod的volume是否已经被处理过了,处理过则直接返回;

(2)如果开启了features.ExpandInUsePersistentVolumes,则调用dswp.checkVolumeFSResize来标记需要扩容的volume信息;

(3)循环遍历pod.Spec.Volumes,做(4)(5)处理;

(4),调用dswp.createVolumeSpec,根据pvc与pv对象等信息,构造并返回volume.spec属性(方法中会获取pvc以及pv对象,并判断pvc对象是否与pv对象bound,没有bound则返回错误,另外,该方法还判断pvc的volumeMode等属性是否与pod内volume配置一致);

(5)调用dswp.desiredStateOfWorld.AddPodToVolume将pod的volume信息加入desiredStateOfWorld中;

(6)调用dswp.markPodProcessed标记该pod的volume信息已被处理。

// pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go
// processPodVolumes processes the volumes in the given pod and adds them to the
// desired state of the world.
func (dswp *desiredStateOfWorldPopulator) processPodVolumes(
  pod *v1.Pod,
  mountedVolumesForPod map[volumetypes.UniquePodName]map[string]cache.MountedVolume,
  processedVolumesForFSResize sets.String) {
  if pod == nil {
    return
  }

  uniquePodName := util.GetUniquePodName(pod)
  if dswp.podPreviouslyProcessed(uniquePodName) {
    return
  }

  allVolumesAdded := true
  mounts, devices := util.GetPodVolumeNames(pod)

  expandInUsePV := utilfeature.DefaultFeatureGate.Enabled(features.ExpandInUsePersistentVolumes)
  // Process volume spec for each volume defined in pod
  for _, podVolume := range pod.Spec.Volumes {
    if !mounts.Has(podVolume.Name) && !devices.Has(podVolume.Name) {
      // Volume is not used in the pod, ignore it.
      klog.V(4).Infof("Skipping unused volume %q for pod %q", podVolume.Name, format.Pod(pod))
      continue
    }

    pvc, volumeSpec, volumeGidValue, err :=
      dswp.createVolumeSpec(podVolume, pod.Name, pod.Namespace, mounts, devices)
    if err != nil {
      klog.Errorf(
        "Error processing volume %q for pod %q: %v",
        podVolume.Name,
        format.Pod(pod),
        err)
      dswp.desiredStateOfWorld.AddErrorToPod(uniquePodName, err.Error())
      allVolumesAdded = false
      continue
    }

    // for local volume
    err = dswp.checkLocalVolumePV(pod, volumeSpec)
    if err != nil {
      klog.Errorf(
        "Error processing volume %q for pod %q: %v",
        podVolume.Name,
        format.Pod(pod),
        err)
      allVolumesAdded = false
      continue
    }


    // Add volume to desired state of world
    _, err = dswp.desiredStateOfWorld.AddPodToVolume(
      uniquePodName, pod, volumeSpec, podVolume.Name, volumeGidValue)
    if err != nil {
      klog.Errorf(
        "Failed to add volume %s (specName: %s) for pod %q to desiredStateOfWorld: %v",
        podVolume.Name,
        volumeSpec.Name(),
        uniquePodName,
        err)
      dswp.desiredStateOfWorld.AddErrorToPod(uniquePodName, err.Error())
      allVolumesAdded = false
    } else {
      klog.V(4).Infof(
        "Added volume %q (volSpec=%q) for pod %q to desired state.",
        podVolume.Name,
        volumeSpec.Name(),
        uniquePodName)
    }

    if expandInUsePV {
      dswp.checkVolumeFSResize(pod, podVolume, pvc, volumeSpec,
        uniquePodName, mountedVolumesForPod, processedVolumesForFSResize)
    }
  }

  // some of the volume additions may have failed, should not mark this pod as fully processed
  if allVolumesAdded {
    dswp.markPodProcessed(uniquePodName)
    // New pod has been synced. Re-mount all volumes that need it
    // (e.g. DownwardAPI)
    dswp.actualStateOfWorld.MarkRemountRequired(uniquePodName)
    // Remove any stored errors for the pod, everything went well in this processPodVolumes
    dswp.desiredStateOfWorld.PopPodErrors(uniquePodName)
  }

}

再来看一个关键方法checkVolumeFSResize():

与存储扩容相关,当pv.Spec.Capacity大小大于pvc.Status.Capacity时,将该存储标记为需要扩容

// pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go
// checkVolumeFSResize checks whether a PVC mounted by the pod requires file
// system resize or not. If so, marks this volume as fsResizeRequired in ASW.
// - mountedVolumesForPod stores all mounted volumes in ASW, because online
//   volume resize only considers mounted volumes.
// - processedVolumesForFSResize stores all volumes we have checked in current loop,
//   because file system resize operation is a global operation for volume, so
//   we only need to check it once if more than one pod use it.
func (dswp *desiredStateOfWorldPopulator) checkVolumeFSResize(
  pod *v1.Pod,
  podVolume v1.Volume,
  pvc *v1.PersistentVolumeClaim,
  volumeSpec *volume.Spec,
  uniquePodName volumetypes.UniquePodName,
  mountedVolumesForPod map[volumetypes.UniquePodName]map[string]cache.MountedVolume,
  processedVolumesForFSResize sets.String) {
  if podVolume.PersistentVolumeClaim == nil {
    // Only PVC supports resize operation.
    return
  }
  uniqueVolumeName, exist := getUniqueVolumeName(uniquePodName, podVolume.Name, mountedVolumesForPod)
  if !exist {
    // Volume not exist in ASW, we assume it hasn't been mounted yet. If it needs resize,
    // it will be handled as offline resize(if it indeed hasn't been mounted yet),
    // or online resize in subsequent loop(after we confirm it has been mounted).
    return
  }
  if processedVolumesForFSResize.Has(string(uniqueVolumeName)) {
    // File system resize operation is a global operation for volume,
    // so we only need to check it once if more than one pod use it.
    return
  }
  // volumeSpec.ReadOnly is the value that determines if volume could be formatted when being mounted.
  // This is the same flag that determines filesystem resizing behaviour for offline resizing and hence
  // we should use it here. This value comes from Pod.spec.volumes.persistentVolumeClaim.readOnly.
  if volumeSpec.ReadOnly {
    // This volume is used as read only by this pod, we don't perform resize for read only volumes.
    klog.V(5).Infof("Skip file system resize check for volume %s in pod %s/%s "+
      "as the volume is mounted as readonly", podVolume.Name, pod.Namespace, pod.Name)
    return
  }
  if volumeRequiresFSResize(pvc, volumeSpec.PersistentVolume) {
    dswp.actualStateOfWorld.MarkFSResizeRequired(uniqueVolumeName, uniquePodName)
  }
  processedVolumesForFSResize.Insert(string(uniqueVolumeName))
}

// pv.Spec.Capacity大小大于pvc.Status.Capacity时返回true
func volumeRequiresFSResize(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) bool {
  capacity := pvc.Status.Capacity[v1.ResourceStorage]
  requested := pv.Spec.Capacity[v1.ResourceStorage]
  return requested.Cmp(capacity) > 0
}

1.1.2 dswp.findAndRemoveDeletedPods

主要逻辑:

(1)从dswp.desiredStateOfWorld中获取已挂载volume的pod,从podManager获取指定pod是否存在,不存在则再从containerRuntime中查询pod中的容器是否都已terminated,如以上两个条件都符合,则继续往下执行,否则直接返回;

(2)调用dswp.desiredStateOfWorld.DeletePodFromVolume,将pod从dsw.volumesToMount[volumeName].podsToMount[podName]中去除,即将volume挂载到指定pod的信息从desiredStateOfWorld中去除。

// pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go
// Iterate through all pods in desired state of world, and remove if they no
// longer exist
func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() {
  var runningPods []*kubecontainer.Pod

  runningPodsFetched := false
  for _, volumeToMount := range dswp.desiredStateOfWorld.GetVolumesToMount() {
    pod, podExists := dswp.podManager.GetPodByUID(volumeToMount.Pod.UID)
    if podExists {
      // Skip running pods
      if !dswp.isPodTerminated(pod) {
        continue
      }
      if dswp.keepTerminatedPodVolumes {
        continue
      }
    }

    // Once a pod has been deleted from kubelet pod manager, do not delete
    // it immediately from volume manager. Instead, check the kubelet
    // containerRuntime to verify that all containers in the pod have been
    // terminated.
    if !runningPodsFetched {
      var getPodsErr error
      runningPods, getPodsErr = dswp.kubeContainerRuntime.GetPods(false)
      if getPodsErr != nil {
        klog.Errorf(
          "kubeContainerRuntime.findAndRemoveDeletedPods returned error %v.",
          getPodsErr)
        continue
      }

      runningPodsFetched = true
      dswp.timeOfLastGetPodStatus = time.Now()
    }

    runningContainers := false
    for _, runningPod := range runningPods {
      if runningPod.ID == volumeToMount.Pod.UID {
        if len(runningPod.Containers) > 0 {
          runningContainers = true
        }

        break
      }
    }

    if runningContainers {
      klog.V(4).Infof(
        "Pod %q still has one or more containers in the non-exited state. Therefore, it will not be removed from desired state.",
        format.Pod(volumeToMount.Pod))
      continue
    }
    exists, _, _ := dswp.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)
    if !exists && podExists {
      klog.V(4).Infof(
        volumeToMount.GenerateMsgDetailed(fmt.Sprintf("Actual state has not yet has this volume mounted information and pod (%q) still exists in pod manager, skip removing volume from desired state",
          format.Pod(volumeToMount.Pod)), ""))
      continue
    }
    klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("Removing volume from desired state", ""))

    dswp.desiredStateOfWorld.DeletePodFromVolume(
      volumeToMount.PodName, volumeToMount.VolumeName)
    dswp.deleteProcessedPod(volumeToMount.PodName)
  }

  podsWithError := dswp.desiredStateOfWorld.GetPodsWithErrors()
  for _, podName := range podsWithError {
    if _, podExists := dswp.podManager.GetPodByUID(types.UID(podName)); !podExists {
      dswp.desiredStateOfWorld.PopPodErrors(podName)
    }
  }
}

2 vm.reconciler.Run

主要是调用rc.reconcile(),做存储的预期状态和实际状态的协调,负责调整实际状态至预期状态。

dswp.loopSleepDuration的值为100ms。

// pkg/kubelet/volumemanager/reconciler/reconciler.go
func (rc *reconciler) Run(stopCh <-chan struct{}) {
  wait.Until(rc.reconciliationLoopFunc(), rc.loopSleepDuration, stopCh)
}

func (rc *reconciler) reconciliationLoopFunc() func() {
  return func() {
    rc.reconcile()

    // Sync the state with the reality once after all existing pods are added to the desired state from all sources.
    // Otherwise, the reconstruct process may clean up pods' volumes that are still in use because
    // desired state of world does not contain a complete list of pods.
    if rc.populatorHasAddedPods() && !rc.StatesHasBeenSynced() {
      klog.Infof("Reconciler: start to sync state")
      rc.sync()
    }
  }
}

2.1 rc.reconcile

rc.reconcile()主要逻辑:

(1)对于实际已经挂载了的存储,如果期望挂载信息中无该存储,或期望挂载存储的pod列表中没有该pod,则指定pod的指定存储需要unmount(最终调用csi.NodeUnpublishVolume);

(2)从desiredStateOfWorld中获取需要mount到pod的volome信息列表:

a.当存储未attach到node时:调用方法将存储先attach到node上(此处会判断是不是由kubelet来做attach操作,是则创建VolumeAttachment对象并等待该对象的.status.attached的值为 true,不是则等待AD controller来做attach操作,kubelet将会根据node对象的.Status.VolumesAttached属性来判断该存储是否已attach到node上);

b.当存储attach后,但未mount给pod或者需要remount时:调用方法进行volume mount(最终调用csi.NodeStageVolume与csi.NodePublishVolume);

c.当存储需要扩容时,调用方法进行存储扩容(最终调用csi.NodeExpandVolume);

(3)对比actualStateOfWorld,从desiredStateOfWorld中获取需要detached的volomes(detached意思为把存储从node上解除挂载):

a.当actualStateOfWorld中表明,某volume没有被任何pod挂载,且desiredStateOfWorld中也不期望该volume被任何pod挂载,且attachedVolume.GloballyMounted属性为true时(device与global mount path的挂载关系还在),会调用到UnmountDevice,主要是调用csi.NodeUnstageVolume解除node上global mount path的存储挂载;

b.当actualStateOfWorld中表明,某volume没有被任何pod挂载,且desiredStateOfWorld中也不存在该volume,且attachedVolume.GloballyMounted属性为false时(已经调用过UnmountDevice,device与global mount path的挂载关系已解除),会调用到UnmountDevice,主要是从etcd中删除VolumeAttachment对象,并等待删除成功。

reconcile()涉及主要方法:

(1)rc.operationExecutor.UnmountVolume:当actualStateOfWorld中表明,pod已经挂载了某volume,但desiredStateOfWorld中期望挂载某volume的pod列表中不存在该pod时(即表明存储已经挂载给pod,但该pod已经不存在了,需要解除该挂载),会调用到UnmountVolume,主要是调用csi.NodeUnpublishVolume将pod mount path解除挂载;

(2)rc.operationExecutor.AttachVolume:当actualStateOfWorld中已经挂载到node节点的volume信息中不存在某volume,但desiredStateOfWorld中期望某volume挂载到node节点上时(即表明需要挂载到node节点的存储未挂载),会调用到AttachVolume,主要是创建VolumeAttachment对象,并等待其.status.attached属性值更新为true;

(3)rc.operationExecutor.MountVolume:当desiredStateOfWorld中期望某volume挂载给某pod,但actualStateOfWorld中表明该volume并没有挂载给该pod,且该volume已经挂载到了node节点上,(或者该pod的volume需要remount),会调用到MountVolume,主要是调用csi.NodeStageVolume将存储挂载到node上的global mount path,调用csi.NodePublishVolume将存储从global mount path挂载到pod mount path;

(4)rc.operationExecutor.ExpandInUseVolume:主要负责在controller端的存储扩容操作完成后,做node端的存储扩容操作(后续会单独分析存储扩容操作)。

(5)rc.operationExecutor.UnmountDevice:当actualStateOfWorld中表明,某volume没有被任何pod挂载,且desiredStateOfWorld中也不期望该volume被任何pod挂载,且attachedVolume.GloballyMounted属性为true时(device与global mount path的挂载关系还在),会调用到UnmountDevice,主要是调用csi.NodeUnstageVolume解除node上global mount path的存储挂载;

(6)rc.operationExecutor.DetachVolume:当actualStateOfWorld中表明,某volume没有被任何pod挂载,且desiredStateOfWorld中也不存在该volume,且attachedVolume.GloballyMounted属性为false时(已经调用过UnmountDevice,device与global mount path的挂载关系已解除),会调用到UnmountDevice,主要是从etcd中删除VolumeAttachment对象,并等待删除成功。

pod挂载存储的调用流程:AttachVolume(csi-attacher.Attach) --> MountVolume(csi-attacher.MountDevice --> csi-mounter.SetUp)

解除pod存储挂载的调用流程:UnmountVolume(csi-mounter.TearDown) --> UnmountDevice(csi-attacher.UnmountDevice) --> DetachVolume(csi-attcher.Detach)

另外说明:controllerAttachDetachEnabled
该参数的值由kubelet启动参数--enable-controller-attach-detach决定,该启动参数设置为 true 表示启用 Attach/Detach controller进行Attach/Detach 操作,同时禁用 kubelet 执行 Attach/Detach 操作(默认值为 true)。对于csi plugin来说,实际上Attach/Detach 操作只是创建/删除VolumeAttachment对象。

// pkg/kubelet/volumemanager/reconciler/reconciler.go

func (rc *reconciler) reconcile() {
  // Unmounts are triggered before mounts so that a volume that was
  // referenced by a pod that was deleted and is now referenced by another
  // pod is unmounted from the first pod before being mounted to the new
  // pod.
    
    // 对于实际已经挂载了的存储,如果期望挂载信息中无该存储,或期望挂载存储的pod列表中没有该pod,则指定pod的指定存储需要unmount
  // Ensure volumes that should be unmounted are unmounted.
  // 从实际挂载信息结构体中获取存储挂载信息
  for _, mountedVolume := range rc.actualStateOfWorld.GetMountedVolumes() {
      // 判断期望挂载信息结构体中是否存在指定存储挂载到指定pod的信息,如果不存在,则调用rc.operationExecutor.UnmountVolume 从指定pod中unmount掉指定存储
    if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) {
      // Volume is mounted, unmount it
      klog.V(5).Infof(mountedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountVolume", ""))
      err := rc.operationExecutor.UnmountVolume(
        mountedVolume.MountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir)
      if err != nil &&
        !nestedpendingoperations.IsAlreadyExists(err) &&
        !exponentialbackoff.IsExponentialBackoff(err) {
        // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
        // Log all other errors.
        klog.Errorf(mountedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
      }
      if err == nil {
        klog.Infof(mountedVolume.GenerateMsgDetailed("operationExecutor.UnmountVolume started", ""))
      }
    }
  }
    
    // 从desiredStateOfWorld中获取需要mount到pod的volome信息列表
  // Ensure volumes that should be attached/mounted are attached/mounted.
  for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
      // 调用rc.actualStateOfWorld.PodExistsInVolume查询指定volume是否已mount到指定pod
    volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)
    volumeToMount.DevicePath = devicePath
    
    // 当存储未attach到node时,调用方法将存储先attach到node上
    if cache.IsVolumeNotAttachedError(err) {
        // 判断是否是controller来进行Attach/Detach操作
      if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable {
        // Volume is not attached (or doesn't implement attacher), kubelet attach is disabled, wait
        // for controller to finish attaching volume.
        klog.V(5).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.VerifyControllerAttachedVolume", ""))
        
        // 如果是controller来进行Attach/Detach操作,则调用VerifyControllerAttachedVolume方法来判断controller是否已经执行完Attach/Detach操作
        err := rc.operationExecutor.VerifyControllerAttachedVolume(
          volumeToMount.VolumeToMount,
          rc.nodeName,
          rc.actualStateOfWorld)
        if err != nil &&
          !nestedpendingoperations.IsAlreadyExists(err) &&
          !exponentialbackoff.IsExponentialBackoff(err) {
          // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
          // Log all other errors.
          klog.Errorf(volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.VerifyControllerAttachedVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
        }
        if err == nil {
          klog.Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume started", ""))
        }
      } else {
        // Volume is not attached to node, kubelet attach is enabled, volume implements an attacher,
        // so attach it
        volumeToAttach := operationexecutor.VolumeToAttach{
          VolumeName: volumeToMount.VolumeName,
          VolumeSpec: volumeToMount.VolumeSpec,
          NodeName:   rc.nodeName,
        }
        klog.V(5).Infof(volumeToAttach.GenerateMsgDetailed("Starting operationExecutor.AttachVolume", ""))
        err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld)
        if err != nil &&
          !nestedpendingoperations.IsAlreadyExists(err) &&
          !exponentialbackoff.IsExponentialBackoff(err) {
          // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
          // Log all other errors.
          klog.Errorf(volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.AttachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
        }
        if err == nil {
          klog.Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.AttachVolume started", ""))
        }
      }
      
    // 当存储attach后,但未mount给pod或者需要remount时,调用方法进行volume mount
    } else if !volMounted || cache.IsRemountRequiredError(err) {
      // Volume is not mounted, or is already mounted, but requires remounting
      remountingLogStr := ""
      isRemount := cache.IsRemountRequiredError(err)
      if isRemount {
        remountingLogStr = "Volume is already mounted to pod, but remount was requested."
      }
      klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.MountVolume", remountingLogStr))
      err := rc.operationExecutor.MountVolume(
        rc.waitForAttachTimeout,
        volumeToMount.VolumeToMount,
        rc.actualStateOfWorld,
        isRemount)
      if err != nil &&
        !nestedpendingoperations.IsAlreadyExists(err) &&
        !exponentialbackoff.IsExponentialBackoff(err) {
        // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
        // Log all other errors.
        klog.Errorf(volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.MountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
      }
      if err == nil {
        if remountingLogStr == "" {
          klog.V(1).Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr))
        } else {
          klog.V(5).Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr))
        }
      }
    
    // 当存储需要扩容时,调用方法进行存储扩容
    } else if cache.IsFSResizeRequiredError(err) &&
      utilfeature.DefaultFeatureGate.Enabled(features.ExpandInUsePersistentVolumes) {
      klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.ExpandInUseVolume", ""))
      err := rc.operationExecutor.ExpandInUseVolume(
        volumeToMount.VolumeToMount,
        rc.actualStateOfWorld)
      if err != nil &&
        !nestedpendingoperations.IsAlreadyExists(err) &&
        !exponentialbackoff.IsExponentialBackoff(err) {
        // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
        // Log all other errors.
        klog.Errorf(volumeToMount.GenerateErrorDetailed("operationExecutor.ExpandInUseVolume failed", err).Error())
      }
      if err == nil {
        klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.ExpandInUseVolume started", ""))
      }
    }
  }
    
    // 对比actualStateOfWorld,从desiredStateOfWorld中获取需要detached的volomes(detached意思为把存储从node上解除挂载)
  // Ensure devices that should be detached/unmounted are detached/unmounted.
  for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() {
    // Check IsOperationPending to avoid marking a volume as detached if it's in the process of mounting.
    if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) &&
      !rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName) {
      
      // 当volume已经从node上解除挂载后,GloballyMounted的值被赋值为false
      if attachedVolume.GloballyMounted {
        // Volume is globally mounted to device, unmount it
        klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", ""))
        err := rc.operationExecutor.UnmountDevice(
          attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.hostutil)
        if err != nil &&
          !nestedpendingoperations.IsAlreadyExists(err) &&
          !exponentialbackoff.IsExponentialBackoff(err) {
          // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
          // Log all other errors.
          klog.Errorf(attachedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountDevice failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
        }
        if err == nil {
          klog.Infof(attachedVolume.GenerateMsgDetailed("operationExecutor.UnmountDevice started", ""))
        }
      } else {
        // Volume is attached to node, detach it
        // Kubelet not responsible for detaching or this volume has a non-attachable volume plugin.
        if rc.controllerAttachDetachEnabled || !attachedVolume.PluginIsAttachable {
          rc.actualStateOfWorld.MarkVolumeAsDetached(attachedVolume.VolumeName, attachedVolume.NodeName)
          klog.Infof(attachedVolume.GenerateMsgDetailed("Volume detached", fmt.Sprintf("DevicePath %q", attachedVolume.DevicePath)))
        } else {
          // Only detach if kubelet detach is enabled
          klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.DetachVolume", ""))
          err := rc.operationExecutor.DetachVolume(
            attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld)
          if err != nil &&
            !nestedpendingoperations.IsAlreadyExists(err) &&
            !exponentialbackoff.IsExponentialBackoff(err) {
            // Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected.
            // Log all other errors.
            klog.Errorf(attachedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.DetachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
          }
          if err == nil {
            klog.Infof(attachedVolume.GenerateMsgDetailed("operationExecutor.DetachVolume started", ""))
          }
        }
      }
    }
  }
}

2.1.1 rc.operationExecutor.VerifyControllerAttachedVolume

因为attach/detach操作由AD controller来完成,所以volume manager只能通过node对象来获取指定volume是否已经attach,如已经attach,则更新actualStateOfWorld。

主要逻辑:从node对象中获取.Status.VolumesAttached,从而判断volume是否已经attach,然后更新actualStateOfWorld。

// pkg/volume/util/operationexecutor/operation_executor.go
func (oe *operationExecutor) VerifyControllerAttachedVolume(
  volumeToMount VolumeToMount,
  nodeName types.NodeName,
  actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
  generatedOperations, err :=
    oe.operationGenerator.GenerateVerifyControllerAttachedVolumeFunc(volumeToMount, nodeName, actualStateOfWorld)
  if err != nil {
    return err
  }

  return oe.pendingOperations.Run(
    volumeToMount.VolumeName, "" /* podName */, generatedOperations)
}

func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc(
  volumeToMount VolumeToMount,
  nodeName types.NodeName,
  actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
  volumePlugin, err :=
    og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec)
  if err != nil || volumePlugin == nil {
    return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("VerifyControllerAttachedVolume.FindPluginBySpec failed", err)
  }

  verifyControllerAttachedVolumeFunc := func() (error, error) {
    if !volumeToMount.PluginIsAttachable {
      // If the volume does not implement the attacher interface, it is
      // assumed to be attached and the actual state of the world is
      // updated accordingly.

      addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached(
        volumeToMount.VolumeName, volumeToMount.VolumeSpec, nodeName, "" /* devicePath */)
      if addVolumeNodeErr != nil {
        // On failure, return error. Caller will log and retry.
        return volumeToMount.GenerateError("VerifyControllerAttachedVolume.MarkVolumeAsAttachedByUniqueVolumeName failed", addVolumeNodeErr)
      }

      return nil, nil
    }

    if !volumeToMount.ReportedInUse {
      // If the given volume has not yet been added to the list of
      // VolumesInUse in the node's volume status, do not proceed, return
      // error. Caller will log and retry. The node status is updated
      // periodically by kubelet, so it may take as much as 10 seconds
      // before this clears.
      // Issue #28141 to enable on demand status updates.
      return volumeToMount.GenerateError("Volume has not been added to the list of VolumesInUse in the node's volume status", nil)
    }

    // Fetch current node object
    node, fetchErr := og.kubeClient.CoreV1().Nodes().Get(string(nodeName), metav1.GetOptions{})
    if fetchErr != nil {
      // On failure, return error. Caller will log and retry.
      return volumeToMount.GenerateError("VerifyControllerAttachedVolume failed fetching node from API server", fetchErr)
    }

    if node == nil {
      // On failure, return error. Caller will log and retry.
      return volumeToMount.GenerateError(
        "VerifyControllerAttachedVolume failed",
        fmt.Errorf("Node object retrieved from API server is nil"))
    }

    for _, attachedVolume := range node.Status.VolumesAttached {
      if attachedVolume.Name == volumeToMount.VolumeName {
        addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached(
          v1.UniqueVolumeName(""), volumeToMount.VolumeSpec, nodeName, attachedVolume.DevicePath)
        klog.Infof(volumeToMount.GenerateMsgDetailed("Controller attach succeeded", fmt.Sprintf("device path: %q", attachedVolume.DevicePath)))
        if addVolumeNodeErr != nil {
          // On failure, return error. Caller will log and retry.
          return volumeToMount.GenerateError("VerifyControllerAttachedVolume.MarkVolumeAsAttached failed", addVolumeNodeErr)
        }
        return nil, nil
      }
    }

    // Volume not attached, return error. Caller will log and retry.
    return volumeToMount.GenerateError("Volume not attached according to node status", nil)
  }

  return volumetypes.GeneratedOperations{
    OperationName:     "verify_controller_attached_volume",
    OperationFunc:     verifyControllerAttachedVolumeFunc,
    CompleteFunc:      util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePlugin.GetPluginName(), volumeToMount.VolumeSpec), "verify_controller_attached_volume"),
    EventRecorderFunc: nil, // nil because we do not want to generate event on error
  }, nil

}

2.2 rc.sync()

rc.sync()调用时机:在vm.desiredStateOfWorldPopulator.Run中已经将所有pod的volume信息更新到了desiredStateOfWorld中。

rc.sync()主要逻辑:扫描node上所有pod目录下的volume目录,来更新desiredStateOfWorld与actualStateOfWorld。

// pkg/kubelet/volumemanager/reconciler/reconciler.go
func (rc *reconciler) reconciliationLoopFunc() func() {
  return func() {
    rc.reconcile()

    // Sync the state with the reality once after all existing pods are added to the desired state from all sources.
    // Otherwise, the reconstruct process may clean up pods' volumes that are still in use because
    // desired state of world does not contain a complete list of pods.
    if rc.populatorHasAddedPods() && !rc.StatesHasBeenSynced() {
      klog.Infof("Reconciler: start to sync state")
      rc.sync()
    }
  }
}

// pkg/kubelet/volumemanager/reconciler/reconciler.go

// sync process tries to observe the real world by scanning all pods' volume directories from the disk.
// If the actual and desired state of worlds are not consistent with the observed world, it means that some
// mounted volumes are left out probably during kubelet restart. This process will reconstruct
// the volumes and update the actual and desired states. For the volumes that cannot support reconstruction,
// it will try to clean up the mount paths with operation executor.
func (rc *reconciler) sync() {
  defer rc.updateLastSyncTime()
  rc.syncStates()
}

// syncStates scans the volume directories under the given pod directory.
// If the volume is not in desired state of world, this function will reconstruct
// the volume related information and put it in both the actual and desired state of worlds.
// For some volume plugins that cannot support reconstruction, it will clean up the existing
// mount points since the volume is no long needed (removed from desired state)
func (rc *reconciler) syncStates() {
  // Get volumes information by reading the pod's directory
  podVolumes, err := getVolumesFromPodDir(rc.kubeletPodsDir)
  if err != nil {
    klog.Errorf("Cannot get volumes from disk %v", err)
    return
  }
  volumesNeedUpdate := make(map[v1.UniqueVolumeName]*reconstructedVolume)
  volumeNeedReport := []v1.UniqueVolumeName{}
  for _, volume := range podVolumes {
    if rc.actualStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName) {
      klog.V(4).Infof("Volume exists in actual state (volume.SpecName %s, pod.UID %s), skip cleaning up mounts", volume.volumeSpecName, volume.podName)
      // There is nothing to reconstruct
      continue
    }
    volumeInDSW := rc.desiredStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName)

    reconstructedVolume, err := rc.reconstructVolume(volume)
    if err != nil {
      if volumeInDSW {
        // Some pod needs the volume, don't clean it up and hope that
        // reconcile() calls SetUp and reconstructs the volume in ASW.
        klog.V(4).Infof("Volume exists in desired state (volume.SpecName %s, pod.UID %s), skip cleaning up mounts", volume.volumeSpecName, volume.podName)
        continue
      }
      // No pod needs the volume.
      klog.Warningf("Could not construct volume information, cleanup the mounts. (pod.UID %s, volume.SpecName %s): %v", volume.podName, volume.volumeSpecName, err)
      rc.cleanupMounts(volume)
      continue
    }
    if volumeInDSW {
      // Some pod needs the volume. And it exists on disk. Some previous
      // kubelet must have created the directory, therefore it must have
      // reported the volume as in use. Mark the volume as in use also in
      // this new kubelet so reconcile() calls SetUp and re-mounts the
      // volume if it's necessary.
      volumeNeedReport = append(volumeNeedReport, reconstructedVolume.volumeName)
      klog.V(4).Infof("Volume exists in desired state (volume.SpecName %s, pod.UID %s), marking as InUse", volume.volumeSpecName, volume.podName)
      continue
    }
    // There is no pod that uses the volume.
    if rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, nestedpendingoperations.EmptyUniquePodName) {
      klog.Warning("Volume is in pending operation, skip cleaning up mounts")
    }
    klog.V(2).Infof(
      "Reconciler sync states: could not find pod information in desired state, update it in actual state: %+v",
      reconstructedVolume)
    volumesNeedUpdate[reconstructedVolume.volumeName] = reconstructedVolume
  }

  if len(volumesNeedUpdate) > 0 {
    if err = rc.updateStates(volumesNeedUpdate); err != nil {
      klog.Errorf("Error occurred during reconstruct volume from disk: %v", err)
    }
  }
  if len(volumeNeedReport) > 0 {
    rc.desiredStateOfWorld.MarkVolumesReportedInUse(volumeNeedReport)
  }
}

总结

volume manager作用

volume manager存在于kubelet中,主要是管理卷的attach/detach(与AD controller作用相同,通过kubelet启动参数控制哪个组件来做该操作,后续会详细介绍)、mount/umount等操作。

volume manager中pod挂载存储的调用流程

AttachVolume(csi-attacher.Attach) --> MountVolume(csi-attacher.MountDevice --> csi-mounter.SetUp)

volume manager中解除pod存储挂载的调用流程

UnmountVolume(csi-mounter.TearDown) --> UnmountDevice(csi-attacher.UnmountDevice) --> DetachVolume(csi-attcher.Detach)

volume manager的vm.reconciler.Run中各个方法的调用链

(1)AttachVolume

Volume is not attached to node, kubelet attach is enabled

vm.reconciler.Run --> rc.operationExecutor.AttachVolume --> oe.operationGenerator.GenerateAttachVolumeFunc --> csi-attacher.Attach(pkg/volume/csi/csi_attacher.go)--> create VolumeAttachment

(2)MountVolume

Volume is not mounted, or is already mounted, but requires remounting

vm.reconciler.Run --> rc.operationExecutor.MountVolume --> oe.operationGenerator.GenerateMountVolumeFunc --> 1.csi-attacer.WaitForAttach(等待VolumeAttachment的.status.attached属性值更新为true) 2.csi-attacer.MountDevice(--

csi.NodeStageVolume) 3.csi-mounter.SetUp(-->csi.NodePublishVolume)

(3)ExpandInUseVolume

Volume is mounted, but it needs to resize

vm.reconciler.Run --> rc.operationExecutor.ExpandInUseVolume --> oe.operationGenerator.GenerateExpandInUseVolumeFunc --> og.doOnlineExpansion --> og.nodeExpandVolume --> expander.NodeExpand (pkg/volume/csi/expander.go) --> csi.NodeExpandVolume

(4)UnmountVolume

Volume is mounted, unmount it

vm.reconciler.Run --> rc.operationExecutor.UnmountVolume --> oe.operationGenerator.GenerateUnmountVolumeFunc --> csi-mounter.TearDown(pkg/volume/csi/csi_mounter.go)--> csi.NodeUnpublishVolume

(5)UnmountDevice

Volume is globally mounted to device, unmount it

vm.reconciler.Run --> rc.operationExecutor.UnmountDevice --> oe.operationGenerator.GenerateUnmountVolumeFunc --> csi-attacher.UnmountDevice --> csi.NodeUnstageVolume

(6)DetachVolume

Volume is attached to node, detach it

vm.reconciler.Run --> rc.operationExecutor.DetachVolume --> oe.operationGenerator.GenerateDetachVolumeFunc --> csi-attacher.Detach --> delete VolumeAttachment