Skip to content

Commit 83d9d1f

Browse files
committed
nas: pass along context to rate limiter
and use new wrap package
1 parent 0f11e20 commit 83d9d1f

7 files changed

Lines changed: 123 additions & 135 deletions

File tree

pkg/nas/accesspoint_controller.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@ package nas
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"path"
78
"path/filepath"
89
"strconv"
910

1011
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cloud/metadata"
12+
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cloud/wrap"
1113
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/nas/interfaces"
1214

1315
sdk "github.com/alibabacloud-go/nas-20170626/v4/client"
@@ -98,7 +100,7 @@ func (c *accesspointController) CreateVolume(ctx context.Context, req *csi.Creat
98100
quota := (capacity + GiB - 1) >> 30
99101
resp.Volume.CapacityBytes = quota << 30
100102
resp.Volume.VolumeContext["volumeCapacity"] = "true"
101-
if err := c.nasClient.SetDirQuota(&sdk.SetDirQuotaRequest{
103+
if err := c.nasClient.SetDirQuota(ctx, &sdk.SetDirQuotaRequest{
102104
FileSystemId: &cnfs.Status.FsAttributes.FilesystemID,
103105
Path: new(path.Join(basePath, req.Name)),
104106
SizeLimit: &quota,
@@ -181,7 +183,7 @@ func (c *accesspointController) createAccesspoint(ctx context.Context, name, bas
181183
return nil, status.Errorf(codes.InvalidArgument, "storageclass parameters.%s is invalid: %q", key, parameters[key])
182184
}
183185

184-
resp, err := c.nasClient.CreateAccesspoint(req)
186+
resp, err := c.nasClient.CreateAccesspoint(ctx, req)
185187
if err != nil {
186188
return nil, status.Errorf(codes.Internal, "nas:CreateAccesspoint: %v", err)
187189
}
@@ -209,15 +211,15 @@ func (c *accesspointController) DeleteVolume(ctx context.Context, req *csi.Delet
209211

210212
// cancel dir quota
211213
if attributes["volumeCapacity"] == "true" {
212-
apInfo, err := c.nasClient.DescribeAccesspoint(filesystemId, accesspointId)
214+
apInfo, err := c.nasClient.DescribeAccesspoint(ctx, filesystemId, accesspointId)
213215
if err != nil {
214-
if cloud.IsAccessPointNotFoundError(err) {
216+
if errors.Is(err, wrap.ErrorCode("NotFound")) {
215217
klog.Infof("accesspoint %s already deleted", accesspointId)
216218
return &csi.DeleteVolumeResponse{}, nil
217219
}
218220
return nil, status.Errorf(codes.Internal, "nas:DescribeAccesspoint failed: %v", err)
219221
}
220-
if err := c.nasClient.CancelDirQuota(&sdk.CancelDirQuotaRequest{
222+
if err := c.nasClient.CancelDirQuota(ctx, &sdk.CancelDirQuotaRequest{
221223
FileSystemId: &filesystemId,
222224
Path: apInfo.Body.AccessPoint.RootPath,
223225
UserType: new("AllUsers"),
@@ -227,7 +229,7 @@ func (c *accesspointController) DeleteVolume(ctx context.Context, req *csi.Delet
227229
}
228230

229231
// delete accesspoint
230-
err = c.nasClient.DeleteAccesspoint(filesystemId, accesspointId)
232+
err = c.nasClient.DeleteAccesspoint(ctx, filesystemId, accesspointId)
231233
if err != nil {
232234
return nil, status.Errorf(codes.Internal, "nas:DeleteAccesspoint failed: %v", err)
233235
}
@@ -256,11 +258,11 @@ func (c *accesspointController) ControllerExpandVolume(ctx context.Context, req
256258
capacity := req.GetCapacityRange().GetRequiredBytes()
257259
if attributes["volumeCapacity"] == "true" {
258260
quota := (capacity + GiB - 1) >> 30
259-
apInfo, err := c.nasClient.DescribeAccesspoint(filesystemId, accesspointId)
261+
apInfo, err := c.nasClient.DescribeAccesspoint(ctx, filesystemId, accesspointId)
260262
if err != nil {
261263
return nil, status.Errorf(codes.Internal, "nas:DescribeAccesspoint failed: %v", err)
262264
}
263-
if err := c.nasClient.SetDirQuota(&sdk.SetDirQuotaRequest{
265+
if err := c.nasClient.SetDirQuota(ctx, &sdk.SetDirQuotaRequest{
264266
FileSystemId: &filesystemId,
265267
Path: apInfo.Body.AccessPoint.RootPath,
266268
SizeLimit: &quota,

pkg/nas/cloud/nas_client_v2.go

Lines changed: 65 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,15 @@ import (
55
"errors"
66
"fmt"
77
"os"
8+
"time"
89

910
openapi "github.com/alibabacloud-go/darabonba-openapi/v2/client"
1011
sdk "github.com/alibabacloud-go/nas-20170626/v4/client"
1112
"github.com/alibabacloud-go/tea/tea"
1213
alicred_old "github.com/aliyun/credentials-go/credentials"
14+
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cloud"
15+
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cloud/wrap"
1316
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/credentials"
14-
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/nas/interfaces"
1517
utilshttp "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils/http"
1618
"golang.org/x/time/rate"
1719
"k8s.io/klog/v2"
@@ -65,125 +67,114 @@ type NasClientV2 struct {
6567
client cloud.NasInterface
6668
}
6769

68-
func (c *NasClientV2) CreateDir(req *sdk.CreateDirRequest) error {
69-
if err := c.limiter.Wait(context.TODO()); err != nil {
70+
var (
71+
// longThrottleLatency defines threshold for logging requests. All requests being
72+
// throttled (via the provided rateLimiter) for more than longThrottleLatency will
73+
// be logged.
74+
longThrottleLatency = 250 * time.Millisecond
75+
)
76+
77+
func (c *NasClientV2) wait(ctx context.Context, logger klog.Logger) error {
78+
t0 := time.Now()
79+
if err := c.limiter.Wait(ctx); err != nil {
7080
return fmt.Errorf("error while waiting for rate limiter: %w", err)
7181
}
72-
resp, err := c.client.CreateDir(req)
73-
logger := klog.Background().WithValues("request", req, "response", resp)
74-
if err == nil {
75-
logger.V(2).Info("nas:CreateDir succeeded")
76-
} else {
77-
logger.Error(err, "nas:CreateDir failed")
82+
t := time.Since(t0)
83+
if t > longThrottleLatency {
84+
logger.V(3).Info("throttled NAS request", "elapsed", t)
85+
}
86+
return nil
87+
}
88+
89+
func (c *NasClientV2) CreateDir(ctx context.Context, req *sdk.CreateDirRequest) error {
90+
logger := klog.FromContext(ctx)
91+
if err := c.wait(ctx, logger); err != nil {
92+
return err
7893
}
94+
_, err := wrap.V2(logger, c.client.CreateDir)(req)
7995
return err
8096
}
8197

82-
func (c *NasClientV2) SetDirQuota(req *sdk.SetDirQuotaRequest) error {
83-
if err := c.limiter.Wait(context.TODO()); err != nil {
84-
return fmt.Errorf("error while waiting for rate limiter: %w", err)
98+
var ErrNotSuccess = errors.New("response indicates a failure")
99+
100+
func (c *NasClientV2) SetDirQuota(ctx context.Context, req *sdk.SetDirQuotaRequest) error {
101+
logger := klog.FromContext(ctx)
102+
if err := c.wait(ctx, logger); err != nil {
103+
return err
85104
}
86-
resp, err := c.client.SetDirQuota(req)
105+
resp, err := wrap.V2(logger, c.client.SetDirQuota)(req)
87106
if err == nil && resp.Body != nil && !tea.BoolValue(resp.Body.Success) {
88-
err = errors.New("response indicates a failure")
89-
}
90-
logger := klog.Background().WithValues("request", req, "response", resp)
91-
if err == nil {
92-
logger.V(2).Info("nas:SetDirQuota succeeded")
93-
} else {
94-
logger.Error(err, "nas:SetDirQuota failed")
107+
err = ErrNotSuccess
95108
}
96109
return err
97110
}
98111

99-
func (c *NasClientV2) CancelDirQuota(req *sdk.CancelDirQuotaRequest) error {
100-
if err := c.limiter.Wait(context.TODO()); err != nil {
101-
return fmt.Errorf("error while waiting for rate limiter: %w", err)
112+
func (c *NasClientV2) CancelDirQuota(ctx context.Context, req *sdk.CancelDirQuotaRequest) error {
113+
logger := klog.FromContext(ctx)
114+
if err := c.wait(ctx, logger); err != nil {
115+
return err
102116
}
103-
resp, err := c.client.CancelDirQuota(req)
117+
resp, err := wrap.V2(logger, c.client.CancelDirQuota)(req)
104118
if err == nil {
105119
if !tea.BoolValue(resp.Body.Success) {
106-
err = errors.New("response indicates a failure")
120+
err = ErrNotSuccess
107121
}
108122
} else {
109-
_err, ok := err.(*tea.SDKError)
110-
if ok && tea.StringValue(_err.Code) == "InvalidParameter.QuotaNotExistOnPath" {
123+
if errors.Is(err, wrap.ErrorCode("InvalidParameter.QuotaNotExistOnPath")) {
111124
// ignore err if quota not exists
112125
err = nil
113126
}
114127
}
115-
logger := klog.Background().WithValues("request", req, "response", resp)
116-
if err == nil {
117-
logger.V(2).Info("nas:CancelDirQuota succeeded")
118-
} else {
119-
logger.Error(err, "nas:CancelDirQuota failed")
120-
}
121128
return err
122129
}
123130

124-
func (c *NasClientV2) GetRecycleBinAttribute(filesystemId string) (*sdk.GetRecycleBinAttributeResponse, error) {
125-
if err := c.limiter.Wait(context.TODO()); err != nil {
126-
return nil, fmt.Errorf("error while waiting for rate limiter: %w", err)
131+
func (c *NasClientV2) GetRecycleBinAttribute(ctx context.Context, filesystemId string) (*sdk.GetRecycleBinAttributeResponse, error) {
132+
logger := klog.FromContext(ctx)
133+
if err := c.wait(ctx, logger); err != nil {
134+
return nil, err
127135
}
128136
req := &sdk.GetRecycleBinAttributeRequest{FileSystemId: &filesystemId}
129-
return c.client.GetRecycleBinAttribute(req)
137+
return wrap.V2(logger, c.client.GetRecycleBinAttribute)(req)
130138
}
131139

132-
func (c *NasClientV2) CreateAccesspoint(req *sdk.CreateAccessPointRequest) (*sdk.CreateAccessPointResponse, error) {
133-
if err := c.limiter.Wait(context.TODO()); err != nil {
134-
return nil, fmt.Errorf("error while waiting for rate limiter: %w", err)
140+
func (c *NasClientV2) CreateAccesspoint(ctx context.Context, req *sdk.CreateAccessPointRequest) (*sdk.CreateAccessPointResponse, error) {
141+
logger := klog.FromContext(ctx)
142+
if err := c.wait(ctx, logger); err != nil {
143+
return nil, err
135144
}
136-
resp, err := c.client.CreateAccessPoint(req)
137-
logger := klog.Background().WithValues("request", req, "response", resp)
138-
if err == nil {
139-
logger.V(2).Info("nas:CreateAccessPoint succeeded")
140-
} else {
141-
logger.Error(err, "nas:CreateAccessPoint failed")
142-
}
143-
return resp, err
145+
return wrap.V2(logger, c.client.CreateAccessPoint)(req)
144146
}
145147

146-
func (c *NasClientV2) DeleteAccesspoint(filesystemId, accessPointId string) error {
147-
if err := c.limiter.Wait(context.TODO()); err != nil {
148-
return fmt.Errorf("error while waiting for rate limiter: %w", err)
148+
func (c *NasClientV2) DeleteAccesspoint(ctx context.Context, filesystemId, accessPointId string) error {
149+
logger := klog.FromContext(ctx)
150+
if err := c.wait(ctx, logger); err != nil {
151+
return nil
149152
}
150153
req := &sdk.DeleteAccessPointRequest{
151154
AccessPointId: &accessPointId,
152155
FileSystemId: &filesystemId,
153156
}
154-
resp, err := c.client.DeleteAccessPoint(req)
155-
logger := klog.Background().WithValues("request", req, "response", resp)
156-
if err == nil {
157-
logger.V(2).Info("nas:DeleteAccessPoint succeeded")
158-
} else {
159-
logger.Error(err, "nas:DeleteAccessPoint failed")
160-
}
157+
_, err := wrap.V2(logger, c.client.DeleteAccessPoint)(req)
161158
return err
162159
}
163160

164-
func (c *NasClientV2) DescribeAccesspoint(filesystemId, accessPointId string) (*sdk.DescribeAccessPointResponse, error) {
165-
if err := c.limiter.Wait(context.TODO()); err != nil {
166-
return nil, fmt.Errorf("error while waiting for rate limiter: %w", err)
161+
func (c *NasClientV2) DescribeAccesspoint(ctx context.Context, filesystemId, accessPointId string) (*sdk.DescribeAccessPointResponse, error) {
162+
logger := klog.FromContext(ctx)
163+
if err := c.wait(ctx, logger); err != nil {
164+
return nil, err
167165
}
168-
return c.client.DescribeAccessPoint(&sdk.DescribeAccessPointRequest{
166+
return wrap.V2(logger, c.client.DescribeAccessPoint)(&sdk.DescribeAccessPointRequest{
169167
AccessPointId: &accessPointId,
170168
FileSystemId: &filesystemId,
171169
})
172170
}
173171

174-
func (c *NasClientV2) DescribeFileSystems(filesystemID string) (*sdk.DescribeFileSystemsResponse, error) {
175-
if err := c.limiter.Wait(context.TODO()); err != nil {
176-
return nil, fmt.Errorf("error while waiting for rate limiter: %w", err)
172+
func (c *NasClientV2) DescribeFileSystems(ctx context.Context, filesystemID string) (*sdk.DescribeFileSystemsResponse, error) {
173+
logger := klog.FromContext(ctx)
174+
if err := c.wait(ctx, logger); err != nil {
175+
return nil, err
177176
}
178-
return c.client.DescribeFileSystems(&sdk.DescribeFileSystemsRequest{
177+
return wrap.V2(logger, c.client.DescribeFileSystems)(&sdk.DescribeFileSystemsRequest{
179178
FileSystemId: &filesystemID,
180179
})
181180
}
182-
183-
func IsAccessPointNotFoundError(err error) bool {
184-
if err == nil {
185-
return false
186-
}
187-
sdkErr, ok := err.(*tea.SDKError)
188-
return ok && tea.StringValue(sdkErr.Code) == "NotFound"
189-
}

0 commit comments

Comments
 (0)