Skip to content

Commit 704a706

Browse files
committed
use csiNode informer to prevent hit the API server for all csiNode resource request
Signed-off-by: Mucahit Kurt <[email protected]>
1 parent 272f2c1 commit 704a706

File tree

6 files changed

+120
-44
lines changed

6 files changed

+120
-44
lines changed

Gopkg.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cmd/csi-provisioner/csi-provisioner.go

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,11 @@ import (
4141
"k8s.io/client-go/util/workqueue"
4242
"k8s.io/klog"
4343

44+
"github.com/container-storage-interface/spec/lib/go/csi"
45+
"github.com/kubernetes-csi/external-provisioner/pkg/features"
4446
utilfeature "k8s.io/apiserver/pkg/util/feature"
47+
"k8s.io/client-go/informers"
48+
storagelisters "k8s.io/client-go/listers/storage/v1beta1"
4549
utilflag "k8s.io/component-base/cli/flag"
4650
csitranslationlib "k8s.io/csi-translation-lib"
4751
)
@@ -179,9 +183,31 @@ func main() {
179183
provisionerOptions = append(provisionerOptions, controller.AdditionalProvisionerNames([]string{supportsMigrationFromInTreePluginName}))
180184
}
181185

186+
var csiNodeLister storagelisters.CSINodeLister
187+
var factory informers.SharedInformerFactory
188+
if utilfeature.DefaultFeatureGate.Enabled(features.Topology) && pluginCapabilities[csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS] {
189+
// Create informer to prevent hit the API server for all resource request
190+
factory = informers.NewSharedInformerFactory(clientset, ctrl.ResyncPeriodOfCsiNodeInformer)
191+
csiNodeLister = factory.Storage().V1beta1().CSINodes().Lister()
192+
}
193+
182194
// Create the provisioner: it implements the Provisioner interface expected by
183195
// the controller
184-
csiProvisioner := ctrl.NewCSIProvisioner(clientset, *operationTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient, provisionerName, pluginCapabilities, controllerCapabilities, supportsMigrationFromInTreePluginName, *strictTopology)
196+
csiProvisioner := ctrl.NewCSIProvisioner(
197+
clientset,
198+
*operationTimeout,
199+
identity,
200+
*volumeNamePrefix,
201+
*volumeNameUUIDLength,
202+
grpcClient,
203+
snapClient,
204+
provisionerName,
205+
pluginCapabilities,
206+
controllerCapabilities,
207+
supportsMigrationFromInTreePluginName,
208+
*strictTopology,
209+
csiNodeLister)
210+
185211
provisionController = controller.NewProvisionController(
186212
clientset,
187213
provisionerName,
@@ -191,6 +217,17 @@ func main() {
191217
)
192218

193219
run := func(context.Context) {
220+
if factory != nil {
221+
stopCh := context.Background().Done()
222+
factory.Start(stopCh)
223+
cacheSyncResult := factory.WaitForCacheSync(stopCh)
224+
for _, v := range cacheSyncResult {
225+
if !v {
226+
klog.Fatalf("Failed to sync CsiNodeInformer!")
227+
}
228+
}
229+
}
230+
194231
provisionController.Run(wait.NeverStop)
195232
}
196233

pkg/controller/controller.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import (
3636
"sigs.k8s.io/sig-storage-lib-external-provisioner/controller"
3737
"sigs.k8s.io/sig-storage-lib-external-provisioner/util"
3838

39-
v1 "k8s.io/api/core/v1"
39+
"k8s.io/api/core/v1"
4040
"k8s.io/apimachinery/pkg/api/resource"
4141
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
4242
_ "k8s.io/apimachinery/pkg/util/json"
@@ -49,6 +49,7 @@ import (
4949
"k8s.io/klog"
5050

5151
"google.golang.org/grpc"
52+
storagelisters "k8s.io/client-go/listers/storage/v1beta1"
5253
)
5354

5455
//secretParamsMap provides a mapping of current as well as deprecated secret keys
@@ -115,6 +116,8 @@ const (
115116
tokenPVCNameKey = "pvc.name"
116117
tokenPVCNameSpaceKey = "pvc.namespace"
117118

119+
ResyncPeriodOfCsiNodeInformer = 1 * time.Hour
120+
118121
deleteVolumeRetryCount = 5
119122

120123
annStorageProvisioner = "volume.beta.kubernetes.io/storage-provisioner"
@@ -182,6 +185,7 @@ type csiProvisioner struct {
182185
controllerCapabilities connection.ControllerCapabilitySet
183186
supportsMigrationFromInTreePluginName string
184187
strictTopology bool
188+
csiNodeLister storagelisters.CSINodeLister
185189
}
186190

187191
var _ controller.Provisioner = &csiProvisioner{}
@@ -239,7 +243,8 @@ func NewCSIProvisioner(client kubernetes.Interface,
239243
pluginCapabilities connection.PluginCapabilitySet,
240244
controllerCapabilities connection.ControllerCapabilitySet,
241245
supportsMigrationFromInTreePluginName string,
242-
strictTopology bool) controller.Provisioner {
246+
strictTopology bool,
247+
csiNodeLister storagelisters.CSINodeLister) controller.Provisioner {
243248

244249
csiClient := csi.NewControllerClient(grpcClient)
245250
provisioner := &csiProvisioner{
@@ -256,6 +261,7 @@ func NewCSIProvisioner(client kubernetes.Interface,
256261
controllerCapabilities: controllerCapabilities,
257262
supportsMigrationFromInTreePluginName: supportsMigrationFromInTreePluginName,
258263
strictTopology: strictTopology,
264+
csiNodeLister: csiNodeLister,
259265
}
260266
return provisioner
261267
}
@@ -483,7 +489,8 @@ func (p *csiProvisioner) ProvisionExt(options controller.ProvisionOptions) (*v1.
483489
options.PVC.Name,
484490
options.StorageClass.AllowedTopologies,
485491
options.SelectedNode,
486-
p.strictTopology)
492+
p.strictTopology,
493+
p.csiNodeLister)
487494
if err != nil {
488495
return nil, controller.ProvisioningNoChange, fmt.Errorf("error generating accessibility requirements: %v", err)
489496
}

pkg/controller/controller_test.go

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import (
3939
crdv1 "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1alpha1"
4040
"github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned/fake"
4141
"google.golang.org/grpc"
42-
v1 "k8s.io/api/core/v1"
42+
"k8s.io/api/core/v1"
4343
storagev1 "k8s.io/api/storage/v1"
4444
storagev1beta1 "k8s.io/api/storage/v1beta1"
4545
"k8s.io/apimachinery/pkg/api/resource"
@@ -398,7 +398,7 @@ func TestCreateDriverReturnsInvalidCapacityDuringProvision(t *testing.T) {
398398
defer driver.Stop()
399399

400400
pluginCaps, controllerCaps := provisionCapabilities()
401-
csiProvisioner := NewCSIProvisioner(nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false)
401+
csiProvisioner := NewCSIProvisioner(nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, nil)
402402

403403
// Requested PVC with requestedBytes storage
404404
deletePolicy := v1.PersistentVolumeReclaimDelete
@@ -1502,7 +1502,7 @@ func runProvisionTest(t *testing.T, k string, tc provisioningTestcase, requested
15021502
}
15031503

15041504
pluginCaps, controllerCaps := provisionCapabilities()
1505-
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, provisionDriverName, pluginCaps, controllerCaps, supportsMigrationFromInTreePluginName, false)
1505+
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, provisionDriverName, pluginCaps, controllerCaps, supportsMigrationFromInTreePluginName, false, nil)
15061506

15071507
out := &csi.CreateVolumeResponse{
15081508
Volume: &csi.Volume{
@@ -1919,7 +1919,7 @@ func TestProvisionFromSnapshot(t *testing.T) {
19191919
})
19201920

19211921
pluginCaps, controllerCaps := provisionFromSnapshotCapabilities()
1922-
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, client, driverName, pluginCaps, controllerCaps, "", false)
1922+
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, client, driverName, pluginCaps, controllerCaps, "", false, nil)
19231923

19241924
out := &csi.CreateVolumeResponse{
19251925
Volume: &csi.Volume{
@@ -2074,7 +2074,7 @@ func TestProvisionWithTopologyEnabled(t *testing.T) {
20742074
}
20752075

20762076
nodes := buildNodes(tc.nodeLabels, k8sTopologyBetaVersion.String())
2077-
nodeInfos := buildNodeInfos(tc.topologyKeys)
2077+
csiNodes := buildCSINodes(tc.topologyKeys)
20782078

20792079
var (
20802080
pluginCaps connection.PluginCapabilitySet
@@ -2087,8 +2087,12 @@ func TestProvisionWithTopologyEnabled(t *testing.T) {
20872087
pluginCaps, controllerCaps = provisionCapabilities()
20882088
}
20892089

2090-
clientSet := fakeclientset.NewSimpleClientset(nodes, nodeInfos)
2091-
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false)
2090+
clientSet := fakeclientset.NewSimpleClientset(nodes, csiNodes)
2091+
2092+
csiNodeLister, stopChan := csiNodeLister(clientSet, t)
2093+
defer close(stopChan)
2094+
2095+
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csiNodeLister)
20922096

20932097
pv, err := csiProvisioner.Provision(controller.ProvisionOptions{
20942098
StorageClass: &storagev1.StorageClass{},
@@ -2142,7 +2146,7 @@ func TestProvisionWithTopologyDisabled(t *testing.T) {
21422146

21432147
clientSet := fakeclientset.NewSimpleClientset()
21442148
pluginCaps, controllerCaps := provisionWithTopologyCapabilities()
2145-
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false)
2149+
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, nil)
21462150

21472151
out := &csi.CreateVolumeResponse{
21482152
Volume: &csi.Volume{
@@ -2321,7 +2325,7 @@ func runDeleteTest(t *testing.T, k string, tc deleteTestcase) {
23212325
}
23222326

23232327
pluginCaps, controllerCaps := provisionCapabilities()
2324-
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false)
2328+
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, nil)
23252329

23262330
err = csiProvisioner.Delete(tc.persistentVolume)
23272331
if tc.expectErr && err == nil {
@@ -2751,7 +2755,7 @@ func TestProvisionFromPVC(t *testing.T) {
27512755
controllerServer.EXPECT().CreateVolume(gomock.Any(), gomock.Any()).Return(nil, errors.New("source volume size is bigger than requested volume size")).Times(1)
27522756
}
27532757

2754-
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false)
2758+
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, nil)
27552759

27562760
pv, err := csiProvisioner.Provision(tc.volOpts)
27572761
if tc.expectErr && err == nil {

pkg/controller/topology.go

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,13 @@ import (
2525
"strings"
2626

2727
"github.com/container-storage-interface/spec/lib/go/csi"
28-
v1 "k8s.io/api/core/v1"
28+
"k8s.io/api/core/v1"
2929
storage "k8s.io/api/storage/v1beta1"
3030
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31+
"k8s.io/apimachinery/pkg/labels"
3132
"k8s.io/apimachinery/pkg/util/version"
3233
"k8s.io/client-go/kubernetes"
34+
storagelisters "k8s.io/client-go/listers/storage/v1beta1"
3335
"k8s.io/klog"
3436
)
3537

@@ -74,7 +76,7 @@ func GenerateVolumeNodeAffinity(accessibleTopology []*csi.Topology) *v1.VolumeNo
7476
//
7577
// This function is only called if the topology feature is enabled
7678
// in the external-provisioner and the CSI driver implements the
77-
// CSI accessbility capability. It is disabled by default.
79+
// CSI accessibility capability. It is disabled by default.
7880
//
7981
// If enabled, we require that the K8s API server is on at least
8082
// K8s 1.14, and that the K8s CSINode feature gate is enabled. In
@@ -140,7 +142,8 @@ func GenerateAccessibilityRequirements(
140142
pvcName string,
141143
allowedTopologies []v1.TopologySelectorTerm,
142144
selectedNode *v1.Node,
143-
strictTopology bool) (*csi.TopologyRequirement, error) {
145+
strictTopology bool,
146+
csiNodeLister storagelisters.CSINodeLister) (*csi.TopologyRequirement, error) {
144147
requirement := &csi.TopologyRequirement{}
145148

146149
var (
@@ -152,7 +155,7 @@ func GenerateAccessibilityRequirements(
152155

153156
// 1. Get CSINode for the selected node
154157
if selectedNode != nil {
155-
selectedCSINode, err = getSelectedCSINode(kubeClient, selectedNode)
158+
selectedCSINode, err = getSelectedCSINode(csiNodeLister, selectedNode)
156159
if err != nil {
157160
return nil, err
158161
}
@@ -207,7 +210,7 @@ func GenerateAccessibilityRequirements(
207210
requisiteTerms = flatten(allowedTopologies)
208211
} else {
209212
// Aggregate existing topologies in nodes across the entire cluster.
210-
requisiteTerms, err = aggregateTopologies(kubeClient, driverName, selectedCSINode)
213+
requisiteTerms, err = aggregateTopologies(kubeClient, driverName, selectedCSINode, csiNodeLister)
211214
if err != nil {
212215
return nil, err
213216
}
@@ -263,11 +266,10 @@ func GenerateAccessibilityRequirements(
263266

264267
// getSelectedCSINode returns the CSINode object for the given selectedNode.
265268
func getSelectedCSINode(
266-
kubeClient kubernetes.Interface,
269+
csiNodeLister storagelisters.CSINodeLister,
267270
selectedNode *v1.Node) (*storage.CSINode, error) {
268271

269-
// TODO (#144): use informers
270-
selectedNodeInfo, err := kubeClient.StorageV1beta1().CSINodes().Get(selectedNode.Name, metav1.GetOptions{})
272+
selectedCSINode, err := csiNodeLister.Get(selectedNode.Name)
271273
if err != nil {
272274
// If the Node is before 1.14, then we fallback to "topology disabled" behavior
273275
// to retain backwards compatibility in a single-topology environment with
@@ -289,37 +291,35 @@ func getSelectedCSINode(
289291
// error with the API server.
290292
return nil, fmt.Errorf("error getting CSINode for selected node %q: %v", selectedNode.Name, err)
291293
}
292-
return selectedNodeInfo, nil
294+
return selectedCSINode, nil
293295
}
294296

295297
// aggregateTopologies returns all the supported topology values in the cluster that
296298
// match the driver's topology keys.
297299
func aggregateTopologies(
298300
kubeClient kubernetes.Interface,
299301
driverName string,
300-
selectedCSINode *storage.CSINode) ([]topologyTerm, error) {
302+
selectedCSINode *storage.CSINode,
303+
csiNodeLister storagelisters.CSINodeLister) ([]topologyTerm, error) {
301304

302305
// 1. Determine topologyKeys to use for aggregation
303306
var topologyKeys []string
304307
if selectedCSINode == nil {
305308
// Immediate binding
306-
307-
// TODO (#144): use informers
308-
nodeInfos, err := kubeClient.StorageV1beta1().CSINodes().List(metav1.ListOptions{})
309+
csiNodes, err := csiNodeLister.List(labels.Everything())
309310
if err != nil {
310311
// Require CSINode beta feature on K8s apiserver to be enabled.
311312
// We don't want to fallback and provision in the wrong topology if there's some temporary
312313
// error with the API server.
313314
return nil, fmt.Errorf("error listing CSINodes: %v", err)
314315
}
315-
316-
rand.Shuffle(len(nodeInfos.Items), func(i, j int) {
317-
nodeInfos.Items[i], nodeInfos.Items[j] = nodeInfos.Items[j], nodeInfos.Items[i]
316+
rand.Shuffle(len(csiNodes), func(i, j int) {
317+
csiNodes[i], csiNodes[j] = csiNodes[j], csiNodes[i]
318318
})
319319

320320
// Pick the first node with topology keys
321-
for _, nodeInfo := range nodeInfos.Items {
322-
topologyKeys = getTopologyKeys(&nodeInfo, driverName)
321+
for _, csiNode := range csiNodes {
322+
topologyKeys = getTopologyKeys(csiNode, driverName)
323323
if topologyKeys != nil {
324324
break
325325
}
@@ -478,8 +478,8 @@ func sortAndShift(terms []topologyTerm, primary topologyTerm, shiftIndex uint32)
478478
return preferredTerms
479479
}
480480

481-
func getTopologyKeys(nodeInfo *storage.CSINode, driverName string) []string {
482-
for _, driver := range nodeInfo.Spec.Drivers {
481+
func getTopologyKeys(csiNode *storage.CSINode, driverName string) []string {
482+
for _, driver := range csiNode.Spec.Drivers {
483483
if driver.Name == driverName {
484484
return driver.TopologyKeys
485485
}

0 commit comments

Comments
 (0)