Skip to content

Commit 70c2877

Browse files
committed
use csiNode informer to prevent hit the API server for all csiNode resource request
Signed-off-by: Mucahit Kurt <[email protected]>
1 parent fd26b54 commit 70c2877

File tree

6 files changed

+129
-50
lines changed

6 files changed

+129
-50
lines changed

Gopkg.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cmd/csi-provisioner/csi-provisioner.go

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ import (
4242
"k8s.io/klog"
4343

4444
utilfeature "k8s.io/apiserver/pkg/util/feature"
45+
"k8s.io/client-go/informers"
46+
storagelisters "k8s.io/client-go/listers/storage/v1beta1"
4547
utilflag "k8s.io/component-base/cli/flag"
4648
csitrans "k8s.io/csi-translation-lib"
4749
)
@@ -181,11 +183,32 @@ func main() {
181183
provisionerOptions = append(provisionerOptions, controller.AdditionalProvisionerNames([]string{supportsMigrationFromInTreePluginName}))
182184
}
183185

186+
var csiNodeLister storagelisters.CSINodeLister
187+
var factory informers.SharedInformerFactory
188+
if ctrl.SupportsTopology(pluginCapabilities) {
189+
// Create informer to prevent hit the API server for all resource request
190+
factory = informers.NewSharedInformerFactory(clientset, ctrl.ResyncPeriodOfCsiNodeInformer)
191+
csiNodeLister = factory.Storage().V1beta1().CSINodes().Lister()
192+
}
193+
184194
// Create the provisioner: it implements the Provisioner interface expected by
185195
// the controller
186-
csiProvisioner := ctrl.NewCSIProvisioner(clientset, *operationTimeout, identity, *volumeNamePrefix,
187-
*volumeNameUUIDLength, grpcClient, snapClient, provisionerName, pluginCapabilities,
188-
controllerCapabilities, supportsMigrationFromInTreePluginName, *strictTopology, translator)
196+
csiProvisioner := ctrl.NewCSIProvisioner(
197+
clientset,
198+
*operationTimeout,
199+
identity,
200+
*volumeNamePrefix,
201+
*volumeNameUUIDLength,
202+
grpcClient,
203+
snapClient,
204+
provisionerName,
205+
pluginCapabilities,
206+
controllerCapabilities,
207+
supportsMigrationFromInTreePluginName,
208+
*strictTopology,
209+
translator,
210+
csiNodeLister)
211+
189212
provisionController = controller.NewProvisionController(
190213
clientset,
191214
provisionerName,
@@ -195,6 +218,17 @@ func main() {
195218
)
196219

197220
run := func(context.Context) {
221+
if factory != nil {
222+
stopCh := context.Background().Done()
223+
factory.Start(stopCh)
224+
cacheSyncResult := factory.WaitForCacheSync(stopCh)
225+
for _, v := range cacheSyncResult {
226+
if !v {
227+
klog.Fatalf("Failed to sync CsiNodeInformer!")
228+
}
229+
}
230+
}
231+
198232
provisionController.Run(wait.NeverStop)
199233
}
200234

pkg/controller/controller.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,25 +30,24 @@ import (
3030

3131
"github.com/container-storage-interface/spec/lib/go/csi"
3232
"github.com/kubernetes-csi/csi-lib-utils/connection"
33-
"github.com/kubernetes-csi/external-provisioner/pkg/features"
3433
snapapi "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1alpha1"
3534
snapclientset "github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned"
3635
"sigs.k8s.io/sig-storage-lib-external-provisioner/controller"
3736
"sigs.k8s.io/sig-storage-lib-external-provisioner/util"
3837

39-
v1 "k8s.io/api/core/v1"
38+
"k8s.io/api/core/v1"
4039
storagev1 "k8s.io/api/storage/v1"
4140
"k8s.io/apimachinery/pkg/api/resource"
4241
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
4342
_ "k8s.io/apimachinery/pkg/util/json"
4443
"k8s.io/apimachinery/pkg/util/sets"
4544
"k8s.io/apimachinery/pkg/util/validation"
46-
utilfeature "k8s.io/apiserver/pkg/util/feature"
4745
"k8s.io/client-go/kubernetes"
4846
"k8s.io/client-go/rest"
4947
"k8s.io/klog"
5048

5149
"google.golang.org/grpc"
50+
storagelisters "k8s.io/client-go/listers/storage/v1beta1"
5251
)
5352

5453
//secretParamsMap provides a mapping of current as well as deprecated secret keys
@@ -115,6 +114,8 @@ const (
115114
tokenPVCNameKey = "pvc.name"
116115
tokenPVCNameSpaceKey = "pvc.namespace"
117116

117+
ResyncPeriodOfCsiNodeInformer = 1 * time.Hour
118+
118119
deleteVolumeRetryCount = 5
119120

120121
annStorageProvisioner = "volume.beta.kubernetes.io/storage-provisioner"
@@ -195,6 +196,7 @@ type csiProvisioner struct {
195196
supportsMigrationFromInTreePluginName string
196197
strictTopology bool
197198
translator ProvisionerCSITranslator
199+
csiNodeLister storagelisters.CSINodeLister
198200
}
199201

200202
var _ controller.Provisioner = &csiProvisioner{}
@@ -254,7 +256,8 @@ func NewCSIProvisioner(client kubernetes.Interface,
254256
controllerCapabilities connection.ControllerCapabilitySet,
255257
supportsMigrationFromInTreePluginName string,
256258
strictTopology bool,
257-
translator ProvisionerCSITranslator) controller.Provisioner {
259+
translator ProvisionerCSITranslator,
260+
csiNodeLister storagelisters.CSINodeLister) controller.Provisioner {
258261

259262
csiClient := csi.NewControllerClient(grpcClient)
260263
provisioner := &csiProvisioner{
@@ -272,6 +275,7 @@ func NewCSIProvisioner(client kubernetes.Interface,
272275
supportsMigrationFromInTreePluginName: supportsMigrationFromInTreePluginName,
273276
strictTopology: strictTopology,
274277
translator: translator,
278+
csiNodeLister: csiNodeLister,
275279
}
276280
return provisioner
277281
}
@@ -499,7 +503,8 @@ func (p *csiProvisioner) ProvisionExt(options controller.ProvisionOptions) (*v1.
499503
options.PVC.Name,
500504
options.StorageClass.AllowedTopologies,
501505
options.SelectedNode,
502-
p.strictTopology)
506+
p.strictTopology,
507+
p.csiNodeLister)
503508
if err != nil {
504509
return nil, controller.ProvisioningNoChange, fmt.Errorf("error generating accessibility requirements: %v", err)
505510
}
@@ -664,8 +669,7 @@ func (p *csiProvisioner) ProvisionExt(options controller.ProvisionOptions) (*v1.
664669
}
665670

666671
func (p *csiProvisioner) supportsTopology() bool {
667-
return p.pluginCapabilities[csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS] &&
668-
utilfeature.DefaultFeatureGate.Enabled(features.Topology)
672+
return SupportsTopology(p.pluginCapabilities)
669673
}
670674

671675
func removePrefixedParameters(param map[string]string) (map[string]string, error) {

pkg/controller/controller_test.go

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import (
3939
crdv1 "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1alpha1"
4040
"github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned/fake"
4141
"google.golang.org/grpc"
42-
v1 "k8s.io/api/core/v1"
42+
"k8s.io/api/core/v1"
4343
storagev1 "k8s.io/api/storage/v1"
4444
storagev1beta1 "k8s.io/api/storage/v1beta1"
4545
"k8s.io/apimachinery/pkg/api/resource"
@@ -399,7 +399,7 @@ func TestCreateDriverReturnsInvalidCapacityDuringProvision(t *testing.T) {
399399
defer driver.Stop()
400400

401401
pluginCaps, controllerCaps := provisionCapabilities()
402-
csiProvisioner := NewCSIProvisioner(nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New())
402+
csiProvisioner := NewCSIProvisioner(nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil)
403403

404404
// Requested PVC with requestedBytes storage
405405
deletePolicy := v1.PersistentVolumeReclaimDelete
@@ -1503,7 +1503,7 @@ func runProvisionTest(t *testing.T, k string, tc provisioningTestcase, requested
15031503
}
15041504

15051505
pluginCaps, controllerCaps := provisionCapabilities()
1506-
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, provisionDriverName, pluginCaps, controllerCaps, supportsMigrationFromInTreePluginName, false, csitrans.New())
1506+
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, provisionDriverName, pluginCaps, controllerCaps, supportsMigrationFromInTreePluginName, false, csitrans.New(), nil)
15071507

15081508
out := &csi.CreateVolumeResponse{
15091509
Volume: &csi.Volume{
@@ -2069,7 +2069,7 @@ func TestProvisionFromSnapshot(t *testing.T) {
20692069
})
20702070

20712071
pluginCaps, controllerCaps := provisionFromSnapshotCapabilities()
2072-
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, client, driverName, pluginCaps, controllerCaps, "", false, csitrans.New())
2072+
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, client, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil)
20732073

20742074
out := &csi.CreateVolumeResponse{
20752075
Volume: &csi.Volume{
@@ -2224,7 +2224,7 @@ func TestProvisionWithTopologyEnabled(t *testing.T) {
22242224
}
22252225

22262226
nodes := buildNodes(tc.nodeLabels, k8sTopologyBetaVersion.String())
2227-
nodeInfos := buildNodeInfos(tc.topologyKeys)
2227+
csiNodes := buildCSINodes(tc.topologyKeys)
22282228

22292229
var (
22302230
pluginCaps connection.PluginCapabilitySet
@@ -2237,8 +2237,12 @@ func TestProvisionWithTopologyEnabled(t *testing.T) {
22372237
pluginCaps, controllerCaps = provisionCapabilities()
22382238
}
22392239

2240-
clientSet := fakeclientset.NewSimpleClientset(nodes, nodeInfos)
2241-
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New())
2240+
clientSet := fakeclientset.NewSimpleClientset(nodes, csiNodes)
2241+
2242+
csiNodeLister, stopChan := csiNodeLister(clientSet, t)
2243+
defer close(stopChan)
2244+
2245+
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), csiNodeLister)
22422246

22432247
pv, err := csiProvisioner.Provision(controller.ProvisionOptions{
22442248
StorageClass: &storagev1.StorageClass{},
@@ -2292,7 +2296,7 @@ func TestProvisionWithTopologyDisabled(t *testing.T) {
22922296

22932297
clientSet := fakeclientset.NewSimpleClientset()
22942298
pluginCaps, controllerCaps := provisionWithTopologyCapabilities()
2295-
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New())
2299+
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil)
22962300

22972301
out := &csi.CreateVolumeResponse{
22982302
Volume: &csi.Volume{
@@ -2471,7 +2475,7 @@ func runDeleteTest(t *testing.T, k string, tc deleteTestcase) {
24712475
}
24722476

24732477
pluginCaps, controllerCaps := provisionCapabilities()
2474-
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New())
2478+
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil)
24752479

24762480
err = csiProvisioner.Delete(tc.persistentVolume)
24772481
if tc.expectErr && err == nil {
@@ -3126,7 +3130,7 @@ func TestProvisionFromPVC(t *testing.T) {
31263130
controllerServer.EXPECT().CreateVolume(gomock.Any(), gomock.Any()).Return(nil, errors.New("source volume size is bigger than requested volume size")).Times(1)
31273131
}
31283132

3129-
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New())
3133+
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil)
31303134

31313135
pv, err := csiProvisioner.Provision(tc.volOpts)
31323136
if tc.expectErr && err == nil {

pkg/controller/topology.go

Lines changed: 29 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,16 @@ import (
2525
"strings"
2626

2727
"github.com/container-storage-interface/spec/lib/go/csi"
28-
v1 "k8s.io/api/core/v1"
28+
"github.com/kubernetes-csi/csi-lib-utils/connection"
29+
"github.com/kubernetes-csi/external-provisioner/pkg/features"
30+
"k8s.io/api/core/v1"
2931
storage "k8s.io/api/storage/v1beta1"
3032
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33+
"k8s.io/apimachinery/pkg/labels"
3134
"k8s.io/apimachinery/pkg/util/version"
35+
utilfeature "k8s.io/apiserver/pkg/util/feature"
3236
"k8s.io/client-go/kubernetes"
37+
storagelisters "k8s.io/client-go/listers/storage/v1beta1"
3338
"k8s.io/klog"
3439
)
3540

@@ -69,12 +74,18 @@ func GenerateVolumeNodeAffinity(accessibleTopology []*csi.Topology) *v1.VolumeNo
6974
}
7075
}
7176

77+
// SupportsTopology returns whether topology is supported both for plugin and external provisioner
78+
func SupportsTopology(pluginCapabilities connection.PluginCapabilitySet) bool {
79+
return pluginCapabilities[csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS] &&
80+
utilfeature.DefaultFeatureGate.Enabled(features.Topology)
81+
}
82+
7283
// GenerateAccessibilityRequirements returns the CSI TopologyRequirement
7384
// to pass into the CSI CreateVolume request.
7485
//
7586
// This function is only called if the topology feature is enabled
7687
// in the external-provisioner and the CSI driver implements the
77-
// CSI accessbility capability. It is disabled by default.
88+
// CSI accessibility capability. It is disabled by default.
7889
//
7990
// If enabled, we require that the K8s API server is on at least
8091
// K8s 1.14, and that the K8s CSINode feature gate is enabled. In
@@ -140,7 +151,8 @@ func GenerateAccessibilityRequirements(
140151
pvcName string,
141152
allowedTopologies []v1.TopologySelectorTerm,
142153
selectedNode *v1.Node,
143-
strictTopology bool) (*csi.TopologyRequirement, error) {
154+
strictTopology bool,
155+
csiNodeLister storagelisters.CSINodeLister) (*csi.TopologyRequirement, error) {
144156
requirement := &csi.TopologyRequirement{}
145157

146158
var (
@@ -152,7 +164,7 @@ func GenerateAccessibilityRequirements(
152164

153165
// 1. Get CSINode for the selected node
154166
if selectedNode != nil {
155-
selectedCSINode, err = getSelectedCSINode(kubeClient, selectedNode)
167+
selectedCSINode, err = getSelectedCSINode(csiNodeLister, selectedNode)
156168
if err != nil {
157169
return nil, err
158170
}
@@ -207,7 +219,7 @@ func GenerateAccessibilityRequirements(
207219
requisiteTerms = flatten(allowedTopologies)
208220
} else {
209221
// Aggregate existing topologies in nodes across the entire cluster.
210-
requisiteTerms, err = aggregateTopologies(kubeClient, driverName, selectedCSINode)
222+
requisiteTerms, err = aggregateTopologies(kubeClient, driverName, selectedCSINode, csiNodeLister)
211223
if err != nil {
212224
return nil, err
213225
}
@@ -263,11 +275,10 @@ func GenerateAccessibilityRequirements(
263275

264276
// getSelectedCSINode returns the CSINode object for the given selectedNode.
265277
func getSelectedCSINode(
266-
kubeClient kubernetes.Interface,
278+
csiNodeLister storagelisters.CSINodeLister,
267279
selectedNode *v1.Node) (*storage.CSINode, error) {
268280

269-
// TODO (#144): use informers
270-
selectedNodeInfo, err := kubeClient.StorageV1beta1().CSINodes().Get(selectedNode.Name, metav1.GetOptions{})
281+
selectedCSINode, err := csiNodeLister.Get(selectedNode.Name)
271282
if err != nil {
272283
// If the Node is before 1.14, then we fallback to "topology disabled" behavior
273284
// to retain backwards compatibility in a single-topology environment with
@@ -289,37 +300,35 @@ func getSelectedCSINode(
289300
// error with the API server.
290301
return nil, fmt.Errorf("error getting CSINode for selected node %q: %v", selectedNode.Name, err)
291302
}
292-
return selectedNodeInfo, nil
303+
return selectedCSINode, nil
293304
}
294305

295306
// aggregateTopologies returns all the supported topology values in the cluster that
296307
// match the driver's topology keys.
297308
func aggregateTopologies(
298309
kubeClient kubernetes.Interface,
299310
driverName string,
300-
selectedCSINode *storage.CSINode) ([]topologyTerm, error) {
311+
selectedCSINode *storage.CSINode,
312+
csiNodeLister storagelisters.CSINodeLister) ([]topologyTerm, error) {
301313

302314
// 1. Determine topologyKeys to use for aggregation
303315
var topologyKeys []string
304316
if selectedCSINode == nil {
305317
// Immediate binding
306-
307-
// TODO (#144): use informers
308-
nodeInfos, err := kubeClient.StorageV1beta1().CSINodes().List(metav1.ListOptions{})
318+
csiNodes, err := csiNodeLister.List(labels.Everything())
309319
if err != nil {
310320
// Require CSINode beta feature on K8s apiserver to be enabled.
311321
// We don't want to fallback and provision in the wrong topology if there's some temporary
312322
// error with the API server.
313323
return nil, fmt.Errorf("error listing CSINodes: %v", err)
314324
}
315-
316-
rand.Shuffle(len(nodeInfos.Items), func(i, j int) {
317-
nodeInfos.Items[i], nodeInfos.Items[j] = nodeInfos.Items[j], nodeInfos.Items[i]
325+
rand.Shuffle(len(csiNodes), func(i, j int) {
326+
csiNodes[i], csiNodes[j] = csiNodes[j], csiNodes[i]
318327
})
319328

320329
// Pick the first node with topology keys
321-
for _, nodeInfo := range nodeInfos.Items {
322-
topologyKeys = getTopologyKeys(&nodeInfo, driverName)
330+
for _, csiNode := range csiNodes {
331+
topologyKeys = getTopologyKeys(csiNode, driverName)
323332
if topologyKeys != nil {
324333
break
325334
}
@@ -478,8 +487,8 @@ func sortAndShift(terms []topologyTerm, primary topologyTerm, shiftIndex uint32)
478487
return preferredTerms
479488
}
480489

481-
func getTopologyKeys(nodeInfo *storage.CSINode, driverName string) []string {
482-
for _, driver := range nodeInfo.Spec.Drivers {
490+
func getTopologyKeys(csiNode *storage.CSINode, driverName string) []string {
491+
for _, driver := range csiNode.Spec.Drivers {
483492
if driver.Name == driverName {
484493
return driver.TopologyKeys
485494
}

0 commit comments

Comments
 (0)