Skip to content

Commit e473610

Browse files
committed
use csiNode informer to prevent hit the API server for all csiNode resource request
Signed-off-by: Mucahit Kurt <[email protected]>
1 parent 9f588ea commit e473610

File tree

6 files changed

+121
-48
lines changed

6 files changed

+121
-48
lines changed

Gopkg.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cmd/csi-provisioner/csi-provisioner.go

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,10 @@ import (
4141
"k8s.io/client-go/util/workqueue"
4242
"k8s.io/klog"
4343

44+
"github.com/kubernetes-csi/external-provisioner/pkg/features"
4445
utilfeature "k8s.io/apiserver/pkg/util/feature"
46+
"k8s.io/client-go/informers"
47+
storagelisters "k8s.io/client-go/listers/storage/v1beta1"
4548
utilflag "k8s.io/component-base/cli/flag"
4649
csitranslationlib "k8s.io/csi-translation-lib"
4750
)
@@ -179,9 +182,31 @@ func main() {
179182
provisionerOptions = append(provisionerOptions, controller.AdditionalProvisionerNames([]string{supportsMigrationFromInTreePluginName}))
180183
}
181184

185+
var csiNodeLister storagelisters.CSINodeLister
186+
var factory informers.SharedInformerFactory
187+
if utilfeature.DefaultFeatureGate.Enabled(features.Topology) {
188+
// Create informer to prevent hit the API server for all resource request
189+
factory = informers.NewSharedInformerFactory(clientset, ctrl.ResyncPeriodOfCsiNodeInformer)
190+
csiNodeLister = factory.Storage().V1beta1().CSINodes().Lister()
191+
}
192+
182193
// Create the provisioner: it implements the Provisioner interface expected by
183194
// the controller
184-
csiProvisioner := ctrl.NewCSIProvisioner(clientset, *operationTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient, provisionerName, pluginCapabilities, controllerCapabilities, supportsMigrationFromInTreePluginName, *strictTopology)
195+
csiProvisioner := ctrl.NewCSIProvisioner(
196+
clientset,
197+
*operationTimeout,
198+
identity,
199+
*volumeNamePrefix,
200+
*volumeNameUUIDLength,
201+
grpcClient,
202+
snapClient,
203+
provisionerName,
204+
pluginCapabilities,
205+
controllerCapabilities,
206+
supportsMigrationFromInTreePluginName,
207+
*strictTopology,
208+
csiNodeLister)
209+
185210
provisionController = controller.NewProvisionController(
186211
clientset,
187212
provisionerName,
@@ -191,6 +216,17 @@ func main() {
191216
)
192217

193218
run := func(context.Context) {
219+
if factory != nil {
220+
stopCh := context.Background().Done()
221+
factory.Start(stopCh)
222+
cacheSyncResult := factory.WaitForCacheSync(stopCh)
223+
for _, v := range cacheSyncResult {
224+
if !v {
225+
klog.Fatalf("Failed to sync CsiNodeInformer!")
226+
}
227+
}
228+
}
229+
194230
provisionController.Run(wait.NeverStop)
195231
}
196232

pkg/controller/controller.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import (
3636
"sigs.k8s.io/sig-storage-lib-external-provisioner/controller"
3737
"sigs.k8s.io/sig-storage-lib-external-provisioner/util"
3838

39-
v1 "k8s.io/api/core/v1"
39+
"k8s.io/api/core/v1"
4040
"k8s.io/apimachinery/pkg/api/resource"
4141
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
4242
_ "k8s.io/apimachinery/pkg/util/json"
@@ -49,6 +49,7 @@ import (
4949
"k8s.io/klog"
5050

5151
"google.golang.org/grpc"
52+
storagelisters "k8s.io/client-go/listers/storage/v1beta1"
5253
)
5354

5455
//secretParamsMap provides a mapping of current as well as deprecated secret keys
@@ -114,6 +115,8 @@ const (
114115
tokenPVNameKey = "pv.name"
115116
tokenPVCNameKey = "pvc.name"
116117
tokenPVCNameSpaceKey = "pvc.namespace"
118+
119+
ResyncPeriodOfCsiNodeInformer = 1 * time.Hour
117120
)
118121

119122
var (
@@ -178,6 +181,7 @@ type csiProvisioner struct {
178181
controllerCapabilities connection.ControllerCapabilitySet
179182
supportsMigrationFromInTreePluginName string
180183
strictTopology bool
184+
csiNodeLister storagelisters.CSINodeLister
181185
}
182186

183187
var _ controller.Provisioner = &csiProvisioner{}
@@ -235,7 +239,8 @@ func NewCSIProvisioner(client kubernetes.Interface,
235239
pluginCapabilities connection.PluginCapabilitySet,
236240
controllerCapabilities connection.ControllerCapabilitySet,
237241
supportsMigrationFromInTreePluginName string,
238-
strictTopology bool) controller.Provisioner {
242+
strictTopology bool,
243+
csiNodeLister storagelisters.CSINodeLister) controller.Provisioner {
239244

240245
csiClient := csi.NewControllerClient(grpcClient)
241246
provisioner := &csiProvisioner{
@@ -252,6 +257,7 @@ func NewCSIProvisioner(client kubernetes.Interface,
252257
controllerCapabilities: controllerCapabilities,
253258
supportsMigrationFromInTreePluginName: supportsMigrationFromInTreePluginName,
254259
strictTopology: strictTopology,
260+
csiNodeLister: csiNodeLister,
255261
}
256262
return provisioner
257263
}
@@ -471,7 +477,8 @@ func (p *csiProvisioner) ProvisionExt(options controller.ProvisionOptions) (*v1.
471477
options.PVC.Name,
472478
options.StorageClass.AllowedTopologies,
473479
options.SelectedNode,
474-
p.strictTopology)
480+
p.strictTopology,
481+
p.csiNodeLister)
475482
if err != nil {
476483
return nil, controller.ProvisioningNoChange, fmt.Errorf("error generating accessibility requirements: %v", err)
477484
}

pkg/controller/controller_test.go

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import (
3939
crdv1 "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1alpha1"
4040
"github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned/fake"
4141
"google.golang.org/grpc"
42-
v1 "k8s.io/api/core/v1"
42+
"k8s.io/api/core/v1"
4343
storagev1 "k8s.io/api/storage/v1"
4444
storagev1beta1 "k8s.io/api/storage/v1beta1"
4545
"k8s.io/apimachinery/pkg/api/resource"
@@ -396,7 +396,7 @@ func TestCreateDriverReturnsInvalidCapacityDuringProvision(t *testing.T) {
396396
defer driver.Stop()
397397

398398
pluginCaps, controllerCaps := provisionCapabilities()
399-
csiProvisioner := NewCSIProvisioner(nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false)
399+
csiProvisioner := NewCSIProvisioner(nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, nil)
400400

401401
// Requested PVC with requestedBytes storage
402402
deletePolicy := v1.PersistentVolumeReclaimDelete
@@ -1450,7 +1450,7 @@ func runProvisionTest(t *testing.T, k string, tc provisioningTestcase, requested
14501450
}
14511451

14521452
pluginCaps, controllerCaps := provisionCapabilities()
1453-
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false)
1453+
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, nil)
14541454

14551455
out := &csi.CreateVolumeResponse{
14561456
Volume: &csi.Volume{
@@ -1829,7 +1829,7 @@ func TestProvisionFromSnapshot(t *testing.T) {
18291829
})
18301830

18311831
pluginCaps, controllerCaps := provisionFromSnapshotCapabilities()
1832-
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, client, driverName, pluginCaps, controllerCaps, "", false)
1832+
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, client, driverName, pluginCaps, controllerCaps, "", false, nil)
18331833

18341834
out := &csi.CreateVolumeResponse{
18351835
Volume: &csi.Volume{
@@ -1966,7 +1966,7 @@ func TestProvisionWithTopologyEnabled(t *testing.T) {
19661966
}
19671967

19681968
nodes := buildNodes(tc.nodeLabels, k8sTopologyBetaVersion.String())
1969-
nodeInfos := buildNodeInfos(tc.topologyKeys)
1969+
csiNodes := buildCSINodes(tc.topologyKeys)
19701970

19711971
var (
19721972
pluginCaps connection.PluginCapabilitySet
@@ -1979,8 +1979,12 @@ func TestProvisionWithTopologyEnabled(t *testing.T) {
19791979
pluginCaps, controllerCaps = provisionCapabilities()
19801980
}
19811981

1982-
clientSet := fakeclientset.NewSimpleClientset(nodes, nodeInfos)
1983-
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false)
1982+
clientSet := fakeclientset.NewSimpleClientset(nodes, csiNodes)
1983+
1984+
csiNodeLister, stopChan := csiNodeLister(clientSet, t)
1985+
defer close(stopChan)
1986+
1987+
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csiNodeLister)
19841988

19851989
pv, err := csiProvisioner.Provision(controller.ProvisionOptions{
19861990
StorageClass: &storagev1.StorageClass{},
@@ -2034,7 +2038,7 @@ func TestProvisionWithTopologyDisabled(t *testing.T) {
20342038

20352039
clientSet := fakeclientset.NewSimpleClientset()
20362040
pluginCaps, controllerCaps := provisionWithTopologyCapabilities()
2037-
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false)
2041+
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, nil)
20382042

20392043
out := &csi.CreateVolumeResponse{
20402044
Volume: &csi.Volume{
@@ -2213,7 +2217,7 @@ func runDeleteTest(t *testing.T, k string, tc deleteTestcase) {
22132217
}
22142218

22152219
pluginCaps, controllerCaps := provisionCapabilities()
2216-
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false)
2220+
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, nil)
22172221

22182222
err = csiProvisioner.Delete(tc.persistentVolume)
22192223
if tc.expectErr && err == nil {
@@ -2627,7 +2631,7 @@ func TestProvisionFromPVC(t *testing.T) {
26272631
controllerServer.EXPECT().CreateVolume(gomock.Any(), gomock.Any()).Return(nil, errors.New("source volume size is bigger than requested volume size")).Times(1)
26282632
}
26292633

2630-
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false)
2634+
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, nil)
26312635

26322636
pv, err := csiProvisioner.Provision(tc.volOpts)
26332637
if tc.expectErr && err == nil {

pkg/controller/topology.go

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,13 @@ import (
2525
"strings"
2626

2727
"github.com/container-storage-interface/spec/lib/go/csi"
28-
v1 "k8s.io/api/core/v1"
28+
"k8s.io/api/core/v1"
2929
storage "k8s.io/api/storage/v1beta1"
3030
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31+
"k8s.io/apimachinery/pkg/labels"
3132
"k8s.io/apimachinery/pkg/util/version"
3233
"k8s.io/client-go/kubernetes"
34+
storagelisters "k8s.io/client-go/listers/storage/v1beta1"
3335
"k8s.io/klog"
3436
)
3537

@@ -74,7 +76,7 @@ func GenerateVolumeNodeAffinity(accessibleTopology []*csi.Topology) *v1.VolumeNo
7476
//
7577
// This function is only called if the topology feature is enabled
7678
// in the external-provisioner and the CSI driver implements the
77-
// CSI accessbility capability. It is disabled by default.
79+
// CSI accessibility capability. It is disabled by default.
7880
//
7981
// If enabled, we require that the K8s API server is on at least
8082
// K8s 1.14, and that the K8s CSINode feature gate is enabled. In
@@ -140,7 +142,8 @@ func GenerateAccessibilityRequirements(
140142
pvcName string,
141143
allowedTopologies []v1.TopologySelectorTerm,
142144
selectedNode *v1.Node,
143-
strictTopology bool) (*csi.TopologyRequirement, error) {
145+
strictTopology bool,
146+
csiNodeLister storagelisters.CSINodeLister) (*csi.TopologyRequirement, error) {
144147
requirement := &csi.TopologyRequirement{}
145148

146149
var (
@@ -152,7 +155,7 @@ func GenerateAccessibilityRequirements(
152155

153156
// 1. Get CSINode for the selected node
154157
if selectedNode != nil {
155-
selectedCSINode, err = getSelectedCSINode(kubeClient, selectedNode)
158+
selectedCSINode, err = getSelectedCSINode(csiNodeLister, selectedNode)
156159
if err != nil {
157160
return nil, err
158161
}
@@ -207,7 +210,7 @@ func GenerateAccessibilityRequirements(
207210
requisiteTerms = flatten(allowedTopologies)
208211
} else {
209212
// Aggregate existing topologies in nodes across the entire cluster.
210-
requisiteTerms, err = aggregateTopologies(kubeClient, driverName, selectedCSINode)
213+
requisiteTerms, err = aggregateTopologies(kubeClient, driverName, selectedCSINode, csiNodeLister)
211214
if err != nil {
212215
return nil, err
213216
}
@@ -263,11 +266,10 @@ func GenerateAccessibilityRequirements(
263266

264267
// getSelectedCSINode returns the CSINode object for the given selectedNode.
265268
func getSelectedCSINode(
266-
kubeClient kubernetes.Interface,
269+
csiNodeLister storagelisters.CSINodeLister,
267270
selectedNode *v1.Node) (*storage.CSINode, error) {
268271

269-
// TODO (#144): use informers
270-
selectedNodeInfo, err := kubeClient.StorageV1beta1().CSINodes().Get(selectedNode.Name, metav1.GetOptions{})
272+
selectedCSINode, err := csiNodeLister.Get(selectedNode.Name)
271273
if err != nil {
272274
// If the Node is before 1.14, then we fallback to "topology disabled" behavior
273275
// to retain backwards compatibility in a single-topology environment with
@@ -289,37 +291,33 @@ func getSelectedCSINode(
289291
// error with the API server.
290292
return nil, fmt.Errorf("error getting CSINode for selected node %q: %v", selectedNode.Name, err)
291293
}
292-
return selectedNodeInfo, nil
294+
return selectedCSINode, nil
293295
}
294296

295297
// aggregateTopologies returns all the supported topology values in the cluster that
296298
// match the driver's topology keys.
297299
func aggregateTopologies(
298300
kubeClient kubernetes.Interface,
299301
driverName string,
300-
selectedCSINode *storage.CSINode) ([]topologyTerm, error) {
302+
selectedCSINode *storage.CSINode,
303+
csiNodeLister storagelisters.CSINodeLister) ([]topologyTerm, error) {
301304

302305
// 1. Determine topologyKeys to use for aggregation
303306
var topologyKeys []string
304307
if selectedCSINode == nil {
305308
// Immediate binding
306-
307-
// TODO (#144): use informers
308-
nodeInfos, err := kubeClient.StorageV1beta1().CSINodes().List(metav1.ListOptions{})
309+
csiNodes, err := csiNodeLister.List(labels.Everything())
309310
if err != nil {
310-
// Require CSINode beta feature on K8s apiserver to be enabled.
311-
// We don't want to fallback and provision in the wrong topology if there's some temporary
312-
// error with the API server.
313-
return nil, fmt.Errorf("error listing CSINodes: %v", err)
311+
klog.Warningf("No csi nodes found")
312+
return nil, nil
314313
}
315-
316-
rand.Shuffle(len(nodeInfos.Items), func(i, j int) {
317-
nodeInfos.Items[i], nodeInfos.Items[j] = nodeInfos.Items[j], nodeInfos.Items[i]
314+
rand.Shuffle(len(csiNodes), func(i, j int) {
315+
csiNodes[i], csiNodes[j] = csiNodes[j], csiNodes[i]
318316
})
319317

320318
// Pick the first node with topology keys
321-
for _, nodeInfo := range nodeInfos.Items {
322-
topologyKeys = getTopologyKeys(&nodeInfo, driverName)
319+
for _, csiNode := range csiNodes {
320+
topologyKeys = getTopologyKeys(csiNode, driverName)
323321
if topologyKeys != nil {
324322
break
325323
}
@@ -478,8 +476,8 @@ func sortAndShift(terms []topologyTerm, primary topologyTerm, shiftIndex uint32)
478476
return preferredTerms
479477
}
480478

481-
func getTopologyKeys(nodeInfo *storage.CSINode, driverName string) []string {
482-
for _, driver := range nodeInfo.Spec.Drivers {
479+
func getTopologyKeys(csiNode *storage.CSINode, driverName string) []string {
480+
for _, driver := range csiNode.Spec.Drivers {
483481
if driver.Name == driverName {
484482
return driver.TopologyKeys
485483
}

0 commit comments

Comments
 (0)