Skip to content

Commit b1bccd2

Browse files
committed
use csiNode informer to prevent hit the API server for all csiNode resource request
Signed-off-by: Mucahit Kurt <[email protected]>
1 parent 94262e8 commit b1bccd2

File tree

5 files changed

+128
-52
lines changed

5 files changed

+128
-52
lines changed

cmd/csi-provisioner/csi-provisioner.go

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ import (
4242
"k8s.io/klog"
4343

4444
utilfeature "k8s.io/apiserver/pkg/util/feature"
45+
"k8s.io/client-go/informers"
46+
storagelisters "k8s.io/client-go/listers/storage/v1beta1"
4547
utilflag "k8s.io/component-base/cli/flag"
4648
csitrans "k8s.io/csi-translation-lib"
4749
)
@@ -181,11 +183,32 @@ func main() {
181183
provisionerOptions = append(provisionerOptions, controller.AdditionalProvisionerNames([]string{supportsMigrationFromInTreePluginName}))
182184
}
183185

186+
var csiNodeLister storagelisters.CSINodeLister
187+
var factory informers.SharedInformerFactory
188+
if ctrl.SupportsTopology(pluginCapabilities) {
189+
// Create informer to prevent hit the API server for all resource request
190+
factory = informers.NewSharedInformerFactory(clientset, ctrl.ResyncPeriodOfCsiNodeInformer)
191+
csiNodeLister = factory.Storage().V1beta1().CSINodes().Lister()
192+
}
193+
184194
// Create the provisioner: it implements the Provisioner interface expected by
185195
// the controller
186-
csiProvisioner := ctrl.NewCSIProvisioner(clientset, *operationTimeout, identity, *volumeNamePrefix,
187-
*volumeNameUUIDLength, grpcClient, snapClient, provisionerName, pluginCapabilities,
188-
controllerCapabilities, supportsMigrationFromInTreePluginName, *strictTopology, translator)
196+
csiProvisioner := ctrl.NewCSIProvisioner(
197+
clientset,
198+
*operationTimeout,
199+
identity,
200+
*volumeNamePrefix,
201+
*volumeNameUUIDLength,
202+
grpcClient,
203+
snapClient,
204+
provisionerName,
205+
pluginCapabilities,
206+
controllerCapabilities,
207+
supportsMigrationFromInTreePluginName,
208+
*strictTopology,
209+
translator,
210+
csiNodeLister)
211+
189212
provisionController = controller.NewProvisionController(
190213
clientset,
191214
provisionerName,
@@ -195,6 +218,17 @@ func main() {
195218
)
196219

197220
run := func(context.Context) {
221+
if factory != nil {
222+
stopCh := context.Background().Done()
223+
factory.Start(stopCh)
224+
cacheSyncResult := factory.WaitForCacheSync(stopCh)
225+
for _, v := range cacheSyncResult {
226+
if !v {
227+
klog.Fatalf("Failed to sync CsiNodeInformer!")
228+
}
229+
}
230+
}
231+
198232
provisionController.Run(wait.NeverStop)
199233
}
200234

pkg/controller/controller.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,25 +30,24 @@ import (
3030

3131
"github.com/container-storage-interface/spec/lib/go/csi"
3232
"github.com/kubernetes-csi/csi-lib-utils/connection"
33-
"github.com/kubernetes-csi/external-provisioner/pkg/features"
3433
snapapi "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1alpha1"
3534
snapclientset "github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned"
3635
"sigs.k8s.io/sig-storage-lib-external-provisioner/controller"
3736
"sigs.k8s.io/sig-storage-lib-external-provisioner/util"
3837

39-
v1 "k8s.io/api/core/v1"
38+
"k8s.io/api/core/v1"
4039
storagev1 "k8s.io/api/storage/v1"
4140
"k8s.io/apimachinery/pkg/api/resource"
4241
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
4342
_ "k8s.io/apimachinery/pkg/util/json"
4443
"k8s.io/apimachinery/pkg/util/sets"
4544
"k8s.io/apimachinery/pkg/util/validation"
46-
utilfeature "k8s.io/apiserver/pkg/util/feature"
4745
"k8s.io/client-go/kubernetes"
4846
"k8s.io/client-go/rest"
4947
"k8s.io/klog"
5048

5149
"google.golang.org/grpc"
50+
storagelisters "k8s.io/client-go/listers/storage/v1beta1"
5251
)
5352

5453
//secretParamsMap provides a mapping of current as well as deprecated secret keys
@@ -115,6 +114,8 @@ const (
115114
tokenPVCNameKey = "pvc.name"
116115
tokenPVCNameSpaceKey = "pvc.namespace"
117116

117+
ResyncPeriodOfCsiNodeInformer = 1 * time.Hour
118+
118119
deleteVolumeRetryCount = 5
119120

120121
annStorageProvisioner = "volume.beta.kubernetes.io/storage-provisioner"
@@ -195,6 +196,7 @@ type csiProvisioner struct {
195196
supportsMigrationFromInTreePluginName string
196197
strictTopology bool
197198
translator ProvisionerCSITranslator
199+
csiNodeLister storagelisters.CSINodeLister
198200
}
199201

200202
var _ controller.Provisioner = &csiProvisioner{}
@@ -254,7 +256,8 @@ func NewCSIProvisioner(client kubernetes.Interface,
254256
controllerCapabilities connection.ControllerCapabilitySet,
255257
supportsMigrationFromInTreePluginName string,
256258
strictTopology bool,
257-
translator ProvisionerCSITranslator) controller.Provisioner {
259+
translator ProvisionerCSITranslator,
260+
csiNodeLister storagelisters.CSINodeLister) controller.Provisioner {
258261

259262
csiClient := csi.NewControllerClient(grpcClient)
260263
provisioner := &csiProvisioner{
@@ -272,6 +275,7 @@ func NewCSIProvisioner(client kubernetes.Interface,
272275
supportsMigrationFromInTreePluginName: supportsMigrationFromInTreePluginName,
273276
strictTopology: strictTopology,
274277
translator: translator,
278+
csiNodeLister: csiNodeLister,
275279
}
276280
return provisioner
277281
}
@@ -499,7 +503,8 @@ func (p *csiProvisioner) ProvisionExt(options controller.ProvisionOptions) (*v1.
499503
options.PVC.Name,
500504
options.StorageClass.AllowedTopologies,
501505
options.SelectedNode,
502-
p.strictTopology)
506+
p.strictTopology,
507+
p.csiNodeLister)
503508
if err != nil {
504509
return nil, controller.ProvisioningNoChange, fmt.Errorf("error generating accessibility requirements: %v", err)
505510
}
@@ -664,8 +669,7 @@ func (p *csiProvisioner) ProvisionExt(options controller.ProvisionOptions) (*v1.
664669
}
665670

666671
func (p *csiProvisioner) supportsTopology() bool {
667-
return p.pluginCapabilities[csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS] &&
668-
utilfeature.DefaultFeatureGate.Enabled(features.Topology)
672+
return SupportsTopology(p.pluginCapabilities)
669673
}
670674

671675
func removePrefixedParameters(param map[string]string) (map[string]string, error) {

pkg/controller/controller_test.go

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import (
3939
crdv1 "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1alpha1"
4040
"github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned/fake"
4141
"google.golang.org/grpc"
42-
v1 "k8s.io/api/core/v1"
42+
"k8s.io/api/core/v1"
4343
storagev1 "k8s.io/api/storage/v1"
4444
storagev1beta1 "k8s.io/api/storage/v1beta1"
4545
"k8s.io/apimachinery/pkg/api/resource"
@@ -401,7 +401,7 @@ func TestCreateDriverReturnsInvalidCapacityDuringProvision(t *testing.T) {
401401

402402
pluginCaps, controllerCaps := provisionCapabilities()
403403
csiProvisioner := NewCSIProvisioner(nil, 5*time.Second, "test-provisioner", "test",
404-
5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New())
404+
5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil)
405405

406406
// Requested PVC with requestedBytes storage
407407
deletePolicy := v1.PersistentVolumeReclaimDelete
@@ -1464,7 +1464,7 @@ func runProvisionTest(t *testing.T, k string, tc provisioningTestcase, requested
14641464

14651465
pluginCaps, controllerCaps := provisionCapabilities()
14661466
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn,
1467-
nil, provisionDriverName, pluginCaps, controllerCaps, supportsMigrationFromInTreePluginName, false, csitrans.New())
1467+
nil, provisionDriverName, pluginCaps, controllerCaps, supportsMigrationFromInTreePluginName, false, csitrans.New(), nil)
14681468

14691469
out := &csi.CreateVolumeResponse{
14701470
Volume: &csi.Volume{
@@ -2031,7 +2031,7 @@ func TestProvisionFromSnapshot(t *testing.T) {
20312031

20322032
pluginCaps, controllerCaps := provisionFromSnapshotCapabilities()
20332033
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn,
2034-
client, driverName, pluginCaps, controllerCaps, "", false, csitrans.New())
2034+
client, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil)
20352035

20362036
out := &csi.CreateVolumeResponse{
20372037
Volume: &csi.Volume{
@@ -2186,7 +2186,7 @@ func TestProvisionWithTopologyEnabled(t *testing.T) {
21862186
}
21872187

21882188
nodes := buildNodes(tc.nodeLabels, k8sTopologyBetaVersion.String())
2189-
nodeInfos := buildNodeInfos(tc.topologyKeys)
2189+
csiNodes := buildCSINodes(tc.topologyKeys)
21902190

21912191
var (
21922192
pluginCaps connection.PluginCapabilitySet
@@ -2199,9 +2199,13 @@ func TestProvisionWithTopologyEnabled(t *testing.T) {
21992199
pluginCaps, controllerCaps = provisionCapabilities()
22002200
}
22012201

2202-
clientSet := fakeclientset.NewSimpleClientset(nodes, nodeInfos)
2202+
clientSet := fakeclientset.NewSimpleClientset(nodes, csiNodes)
2203+
2204+
csiNodeLister, stopChan := csiNodeLister(clientSet, t)
2205+
defer close(stopChan)
2206+
22032207
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5,
2204-
csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New())
2208+
csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), csiNodeLister)
22052209

22062210
pv, err := csiProvisioner.Provision(controller.ProvisionOptions{
22072211
StorageClass: &storagev1.StorageClass{},
@@ -2256,7 +2260,7 @@ func TestProvisionWithTopologyDisabled(t *testing.T) {
22562260
clientSet := fakeclientset.NewSimpleClientset()
22572261
pluginCaps, controllerCaps := provisionWithTopologyCapabilities()
22582262
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5,
2259-
csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New())
2263+
csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil)
22602264

22612265
out := &csi.CreateVolumeResponse{
22622266
Volume: &csi.Volume{
@@ -2436,7 +2440,7 @@ func runDeleteTest(t *testing.T, k string, tc deleteTestcase) {
24362440

24372441
pluginCaps, controllerCaps := provisionCapabilities()
24382442
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5,
2439-
csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New())
2443+
csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil)
24402444

24412445
err = csiProvisioner.Delete(tc.persistentVolume)
24422446
if tc.expectErr && err == nil {
@@ -3092,7 +3096,7 @@ func TestProvisionFromPVC(t *testing.T) {
30923096
}
30933097

30943098
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn,
3095-
nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New())
3099+
nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil)
30963100

30973101
pv, err := csiProvisioner.Provision(tc.volOpts)
30983102
if tc.expectErr && err == nil {
@@ -3171,7 +3175,7 @@ func TestProvisionWithMigration(t *testing.T) {
31713175
pluginCaps, controllerCaps := provisionCapabilities()
31723176
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner",
31733177
"test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps,
3174-
inTreePluginName, false, mockTranslator)
3178+
inTreePluginName, false, mockTranslator, nil)
31753179

31763180
// Set up return values (AnyTimes to avoid overfitting on implementation)
31773181

@@ -3331,7 +3335,7 @@ func TestDeleteMigration(t *testing.T) {
33313335
pluginCaps, controllerCaps := provisionCapabilities()
33323336
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner",
33333337
"test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "",
3334-
false, mockTranslator)
3338+
false, mockTranslator, nil)
33353339

33363340
// Set mock return values (AnyTimes to avoid overfitting on implementation details)
33373341
mockTranslator.EXPECT().IsPVMigratable(gomock.Any()).Return(tc.expectTranslation).AnyTimes()

pkg/controller/topology.go

Lines changed: 29 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,16 @@ import (
2525
"strings"
2626

2727
"github.com/container-storage-interface/spec/lib/go/csi"
28-
v1 "k8s.io/api/core/v1"
28+
"github.com/kubernetes-csi/csi-lib-utils/connection"
29+
"github.com/kubernetes-csi/external-provisioner/pkg/features"
30+
"k8s.io/api/core/v1"
2931
storage "k8s.io/api/storage/v1beta1"
3032
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33+
"k8s.io/apimachinery/pkg/labels"
3134
"k8s.io/apimachinery/pkg/util/version"
35+
utilfeature "k8s.io/apiserver/pkg/util/feature"
3236
"k8s.io/client-go/kubernetes"
37+
storagelisters "k8s.io/client-go/listers/storage/v1beta1"
3338
"k8s.io/klog"
3439
)
3540

@@ -69,12 +74,18 @@ func GenerateVolumeNodeAffinity(accessibleTopology []*csi.Topology) *v1.VolumeNo
6974
}
7075
}
7176

77+
// SupportsTopology returns whether topology is supported both for plugin and external provisioner
78+
func SupportsTopology(pluginCapabilities connection.PluginCapabilitySet) bool {
79+
return pluginCapabilities[csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS] &&
80+
utilfeature.DefaultFeatureGate.Enabled(features.Topology)
81+
}
82+
7283
// GenerateAccessibilityRequirements returns the CSI TopologyRequirement
7384
// to pass into the CSI CreateVolume request.
7485
//
7586
// This function is only called if the topology feature is enabled
7687
// in the external-provisioner and the CSI driver implements the
77-
// CSI accessbility capability. It is disabled by default.
88+
// CSI accessibility capability. It is disabled by default.
7889
//
7990
// If enabled, we require that the K8s API server is on at least
8091
// K8s 1.14, and that the K8s CSINode feature gate is enabled. In
@@ -140,7 +151,8 @@ func GenerateAccessibilityRequirements(
140151
pvcName string,
141152
allowedTopologies []v1.TopologySelectorTerm,
142153
selectedNode *v1.Node,
143-
strictTopology bool) (*csi.TopologyRequirement, error) {
154+
strictTopology bool,
155+
csiNodeLister storagelisters.CSINodeLister) (*csi.TopologyRequirement, error) {
144156
requirement := &csi.TopologyRequirement{}
145157

146158
var (
@@ -152,7 +164,7 @@ func GenerateAccessibilityRequirements(
152164

153165
// 1. Get CSINode for the selected node
154166
if selectedNode != nil {
155-
selectedCSINode, err = getSelectedCSINode(kubeClient, selectedNode)
167+
selectedCSINode, err = getSelectedCSINode(csiNodeLister, selectedNode)
156168
if err != nil {
157169
return nil, err
158170
}
@@ -207,7 +219,7 @@ func GenerateAccessibilityRequirements(
207219
requisiteTerms = flatten(allowedTopologies)
208220
} else {
209221
// Aggregate existing topologies in nodes across the entire cluster.
210-
requisiteTerms, err = aggregateTopologies(kubeClient, driverName, selectedCSINode)
222+
requisiteTerms, err = aggregateTopologies(kubeClient, driverName, selectedCSINode, csiNodeLister)
211223
if err != nil {
212224
return nil, err
213225
}
@@ -263,11 +275,10 @@ func GenerateAccessibilityRequirements(
263275

264276
// getSelectedCSINode returns the CSINode object for the given selectedNode.
265277
func getSelectedCSINode(
266-
kubeClient kubernetes.Interface,
278+
csiNodeLister storagelisters.CSINodeLister,
267279
selectedNode *v1.Node) (*storage.CSINode, error) {
268280

269-
// TODO (#144): use informers
270-
selectedNodeInfo, err := kubeClient.StorageV1beta1().CSINodes().Get(selectedNode.Name, metav1.GetOptions{})
281+
selectedCSINode, err := csiNodeLister.Get(selectedNode.Name)
271282
if err != nil {
272283
// If the Node is before 1.14, then we fallback to "topology disabled" behavior
273284
// to retain backwards compatibility in a single-topology environment with
@@ -289,37 +300,35 @@ func getSelectedCSINode(
289300
// error with the API server.
290301
return nil, fmt.Errorf("error getting CSINode for selected node %q: %v", selectedNode.Name, err)
291302
}
292-
return selectedNodeInfo, nil
303+
return selectedCSINode, nil
293304
}
294305

295306
// aggregateTopologies returns all the supported topology values in the cluster that
296307
// match the driver's topology keys.
297308
func aggregateTopologies(
298309
kubeClient kubernetes.Interface,
299310
driverName string,
300-
selectedCSINode *storage.CSINode) ([]topologyTerm, error) {
311+
selectedCSINode *storage.CSINode,
312+
csiNodeLister storagelisters.CSINodeLister) ([]topologyTerm, error) {
301313

302314
// 1. Determine topologyKeys to use for aggregation
303315
var topologyKeys []string
304316
if selectedCSINode == nil {
305317
// Immediate binding
306-
307-
// TODO (#144): use informers
308-
nodeInfos, err := kubeClient.StorageV1beta1().CSINodes().List(metav1.ListOptions{})
318+
csiNodes, err := csiNodeLister.List(labels.Everything())
309319
if err != nil {
310320
// Require CSINode beta feature on K8s apiserver to be enabled.
311321
// We don't want to fallback and provision in the wrong topology if there's some temporary
312322
// error with the API server.
313323
return nil, fmt.Errorf("error listing CSINodes: %v", err)
314324
}
315-
316-
rand.Shuffle(len(nodeInfos.Items), func(i, j int) {
317-
nodeInfos.Items[i], nodeInfos.Items[j] = nodeInfos.Items[j], nodeInfos.Items[i]
325+
rand.Shuffle(len(csiNodes), func(i, j int) {
326+
csiNodes[i], csiNodes[j] = csiNodes[j], csiNodes[i]
318327
})
319328

320329
// Pick the first node with topology keys
321-
for _, nodeInfo := range nodeInfos.Items {
322-
topologyKeys = getTopologyKeys(&nodeInfo, driverName)
330+
for _, csiNode := range csiNodes {
331+
topologyKeys = getTopologyKeys(csiNode, driverName)
323332
if topologyKeys != nil {
324333
break
325334
}
@@ -478,8 +487,8 @@ func sortAndShift(terms []topologyTerm, primary topologyTerm, shiftIndex uint32)
478487
return preferredTerms
479488
}
480489

481-
func getTopologyKeys(nodeInfo *storage.CSINode, driverName string) []string {
482-
for _, driver := range nodeInfo.Spec.Drivers {
490+
func getTopologyKeys(csiNode *storage.CSINode, driverName string) []string {
491+
for _, driver := range csiNode.Spec.Drivers {
483492
if driver.Name == driverName {
484493
return driver.TopologyKeys
485494
}

0 commit comments

Comments
 (0)