Skip to content

Commit ccadcaa

Browse files
stonezdjOrlinVasilev
authored andcommitted
Add max_upstream_conn parameter for each proxy_cache project (goharbor#22348)
limit the proxy connection to upstream registry Signed-off-by: stonezdj <stonezdj@gmail.com>
1 parent f1dde53 commit ccadcaa

9 files changed

Lines changed: 299 additions & 8 deletions

File tree

api/v2.0/swagger.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7321,6 +7321,10 @@ definitions:
73217321
type: string
73227322
description: 'The bandwidth limit of proxy cache, in Kbps (kilobits per second). It limits the communication between Harbor and the upstream registry, not the client and the Harbor.'
73237323
x-nullable: true
7324+
max_upstream_conn:
7325+
type: string
7326+
description: 'The max connection per artifact to the upstream registry in current proxy cache project, if it is -1, no limit to upstream registry connections'
7327+
x-nullable: true
73247328
ProjectSummary:
73257329
type: object
73267330
properties:

src/pkg/project/models/pro_meta.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,5 @@ const (
2525
ProMetaReuseSysCVEAllowlist = "reuse_sys_cve_allowlist"
2626
ProMetaAutoSBOMGen = "auto_sbom_generation"
2727
ProMetaProxySpeed = "proxy_speed_kb"
28+
ProMetaMaxUpstreamConn = "max_upstream_conn"
2829
)

src/pkg/project/models/project.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"strings"
2222
"time"
2323

24+
"github.com/goharbor/harbor/src/lib/log"
2425
"github.com/goharbor/harbor/src/lib/orm"
2526
allowlist "github.com/goharbor/harbor/src/pkg/allowlist/models"
2627
)
@@ -169,6 +170,20 @@ func (p *Project) ProxyCacheSpeed() int32 {
169170
return int32(speedInt)
170171
}
171172

173+
// MaxUpstreamConnection ...
174+
func (p *Project) MaxUpstreamConnection() int {
175+
countVal, exist := p.GetMetadata(ProMetaMaxUpstreamConn)
176+
if !exist {
177+
return 0
178+
}
179+
cnt, err := strconv.ParseInt(countVal, 10, 32)
180+
if err != nil {
181+
log.Warningf("failed th parse the max_upstream_conn, val:%s error %v", countVal, err)
182+
return 0
183+
}
184+
return int(cnt)
185+
}
186+
172187
// FilterByPublic returns orm.QuerySeter with public filter
173188
func (p *Project) FilterByPublic(_ context.Context, qs orm.QuerySeter, _ string, value any) orm.QuerySeter {
174189
subQuery := `SELECT project_id FROM project_metadata WHERE name = 'public' AND value = '%s'`

src/pkg/proxy/connection/limit.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
// Copyright Project Harbor Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package connection
16+
17+
import (
18+
"context"
19+
"fmt"
20+
21+
"github.com/go-redis/redis/v8"
22+
23+
"github.com/goharbor/harbor/src/lib/log"
24+
)
25+
26+
// ConLimiter is used to limit the number of connections to the upstream service
27+
type ConnLimiter struct {
28+
}
29+
30+
// Limiter is a global connection limiter instance
31+
var Limiter = &ConnLimiter{}
32+
33+
// Used to compare and increase connection number in redis
34+
//
35+
// KEYS[1]: key of max_conn_upstream
36+
// ARGV[1]: max connection limit
37+
var increaseWithLimitText = `
38+
local current = tonumber(redis.call('GET', KEYS[1]) or '0')
39+
local max = tonumber(ARGV[1])
40+
41+
if current + 1 <= max then
42+
redis.call('INCRBY', KEYS[1], 1)
43+
redis.call('EXPIRE', KEYS[1], 3600) -- set expire to avoid always lock
44+
return 1
45+
else
46+
return 0
47+
end
48+
`
49+
50+
var acquireScript = redis.NewScript(increaseWithLimitText)
51+
52+
// Acquire tries to acquire a connection, returns true if successful
53+
func (c *ConnLimiter) Acquire(ctx context.Context, rdb *redis.Client, key string, limit int) bool {
54+
result, err := acquireScript.Run(ctx, rdb, []string{key}, fmt.Sprintf("%v", limit)).Int()
55+
if err != nil {
56+
log.Errorf("failed to get the connection lock in redis, error %v", err)
57+
return false
58+
}
59+
log.Debugf("Acquire script result is %d", result)
60+
return result == 1
61+
}
62+
63+
var decreaseText = `
64+
local val = tonumber(redis.call("GET", KEYS[1]) or "0")
65+
if val > 0 then
66+
redis.call("DECR", KEYS[1])
67+
end
68+
return 0
69+
`
70+
71+
var decreaseScript = redis.NewScript(decreaseText)
72+
73+
// Release releases a connection in redis
74+
func (c *ConnLimiter) Release(ctx context.Context, rdb *redis.Client, key string) {
75+
_, err := decreaseScript.Run(ctx, rdb, []string{key}).Int()
76+
if err != nil {
77+
log.Infof("release connection failed:%v", err)
78+
}
79+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Copyright Project Harbor Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package connection
16+
17+
import (
18+
"context"
19+
"fmt"
20+
"os"
21+
"testing"
22+
23+
"github.com/go-redis/redis/v8"
24+
"github.com/stretchr/testify/assert"
25+
)
26+
27+
func TestConnLimiter_Acquire_Release(t *testing.T) {
28+
redisAddress := os.Getenv("REDIS_HOST")
29+
redisHost := "localhost"
30+
if len(redisAddress) > 0 {
31+
redisHost = redisAddress
32+
}
33+
34+
ctx := context.Background()
35+
rdb := redis.NewClient(&redis.Options{
36+
Addr: fmt.Sprintf("%s:6379", redisHost), // Redis server address
37+
Password: "", // No password set
38+
DB: 0, // Use default DB
39+
})
40+
key := "test_max_connection_key"
41+
maxConn := 10
42+
for range 10 {
43+
result := Limiter.Acquire(ctx, rdb, key, maxConn)
44+
assert.True(t, result)
45+
}
46+
// after max connection reached, it should be false
47+
result2 := Limiter.Acquire(ctx, rdb, key, maxConn)
48+
assert.False(t, result2)
49+
50+
for range 10 {
51+
Limiter.Release(ctx, rdb, key)
52+
}
53+
54+
// connection in redis should be 0 finally
55+
n, err := rdb.Get(ctx, key).Int()
56+
assert.Nil(t, err)
57+
assert.Equal(t, 0, n)
58+
59+
}

src/server/middleware/repoproxy/proxy.go

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"fmt"
2020
"io"
2121
"net/http"
22+
"os"
2223
"strings"
2324
"time"
2425

@@ -33,18 +34,21 @@ import (
3334
httpLib "github.com/goharbor/harbor/src/lib/http"
3435
"github.com/goharbor/harbor/src/lib/log"
3536
"github.com/goharbor/harbor/src/lib/orm"
37+
"github.com/goharbor/harbor/src/lib/redis"
3638
proModels "github.com/goharbor/harbor/src/pkg/project/models"
39+
"github.com/goharbor/harbor/src/pkg/proxy/connection"
3740
"github.com/goharbor/harbor/src/pkg/reg/model"
3841
"github.com/goharbor/harbor/src/server/middleware"
3942
)
4043

4144
const (
42-
contentLength = "Content-Length"
43-
contentType = "Content-Type"
44-
dockerContentDigest = "Docker-Content-Digest"
45-
etag = "Etag"
46-
ensureTagInterval = 10 * time.Second
47-
ensureTagMaxRetry = 60
45+
contentLength = "Content-Length"
46+
contentType = "Content-Type"
47+
dockerContentDigest = "Docker-Content-Digest"
48+
etag = "Etag"
49+
ensureTagInterval = 10 * time.Second
50+
ensureTagMaxRetry = 60
51+
upstreamRegistryLimitOnProject = "UPSTREAM_REGISTRY_LIMIT_ON_PROJECT" // if UPSTREAM_REGISTRY_LIMIT_ON_PROJECT is true, the upstream registry connection is based on project level, by default it is artifact level
4852
)
4953

5054
var tooManyRequestsError = errors.New("too many requests to upstream registry").WithCode(errors.RateLimitCode)
@@ -99,6 +103,22 @@ func handleBlob(w http.ResponseWriter, r *http.Request, next http.Handler) error
99103
next.ServeHTTP(w, r)
100104
return nil
101105
}
106+
107+
if p.MaxUpstreamConnection() > 0 {
108+
client, err := redis.GetHarborClient()
109+
if err != nil {
110+
return errors.NewErrs(err)
111+
}
112+
key := upstreamRegistryConnectionKey(art)
113+
log.Debugf("handle blob, upstream registry connection limit key: %s", key)
114+
if !connection.Limiter.Acquire(ctx, client, key, p.MaxUpstreamConnection()) {
115+
log.Infof("current connection exceed max connections to upstream registry")
116+
// send http code 429 to client
117+
return tooManyRequestsError
118+
}
119+
defer connection.Limiter.Release(context.Background(), client, key) // use background context in defer to avoid been canceled
120+
}
121+
102122
size, reader, err := proxyCtl.ProxyBlob(ctx, p, art)
103123
if err != nil {
104124
return err
@@ -173,6 +193,15 @@ func defaultBlobURL(projectName string, name string, digest string) string {
173193
return fmt.Sprintf("/v2/%s/library/%s/blobs/%s", projectName, name, digest)
174194
}
175195

196+
// upstreamRegistryConnectionKey get upstream registry connection key
197+
func upstreamRegistryConnectionKey(art lib.ArtifactInfo) string {
198+
limitOnProject := os.Getenv(upstreamRegistryLimitOnProject)
199+
if strings.EqualFold("true", limitOnProject) {
200+
return fmt.Sprintf("{upstream_registry_connection}:%s", art.ProjectName)
201+
}
202+
return fmt.Sprintf("{upstream_registry_connection}:%s:%s", art.Repository, art.Digest)
203+
}
204+
176205
func handleManifest(w http.ResponseWriter, r *http.Request, next http.Handler) error {
177206
ctx := r.Context()
178207
art, p, proxyCtl, err := preCheck(ctx, true)
@@ -219,6 +248,20 @@ func handleManifest(w http.ResponseWriter, r *http.Request, next http.Handler) e
219248
next.ServeHTTP(w, r)
220249
return nil
221250
}
251+
if p.MaxUpstreamConnection() > 0 {
252+
client, err := redis.GetHarborClient()
253+
if err != nil {
254+
return errors.NewErrs(err)
255+
}
256+
key := upstreamRegistryConnectionKey(art)
257+
log.Debugf("handle manifest key %v", key)
258+
if !connection.Limiter.Acquire(ctx, client, key, p.MaxUpstreamConnection()) {
259+
log.Infof("current connection exceed max connections to upstream registry")
260+
// send http code 429 to client
261+
return tooManyRequestsError
262+
}
263+
defer connection.Limiter.Release(context.Background(), client, key) // use background context in defer to avoid been canceled
264+
}
222265

223266
log.Debugf("the tag is %v, digest is %v", art.Tag, art.Digest)
224267
if r.Method == http.MethodHead {

src/server/v2.0/handler/project.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,9 +163,10 @@ func (a *projectAPI) CreateProject(ctx context.Context, params operation.CreateP
163163
}
164164
}
165165

166-
// ignore metadata.proxy_speed_kb for non-proxy-cache project
166+
// ignore metadata.proxy_speed_kb and metadata.max_upstream_conn for non-proxy-cache project
167167
if req.RegistryID == nil {
168168
req.Metadata.ProxySpeedKb = nil
169+
req.Metadata.MaxUpstreamConn = nil
169170
}
170171

171172
// ignore enable_content_trust metadata for proxy cache project
@@ -566,9 +567,10 @@ func (a *projectAPI) UpdateProject(ctx context.Context, params operation.UpdateP
566567
}
567568
}
568569

569-
// ignore metadata.proxy_speed_kb for non-proxy-cache project
570+
// ignore metadata.proxy_speed_kb and metadata.max_upstream_conn for non-proxy-cache project
570571
if params.Project.Metadata != nil && !p.IsProxy() {
571572
params.Project.Metadata.ProxySpeedKb = nil
573+
params.Project.Metadata.MaxUpstreamConn = nil
572574
}
573575

574576
// ignore enable_content_trust metadata for proxy cache project
@@ -818,6 +820,12 @@ func (a *projectAPI) validateProjectReq(ctx context.Context, req *models.Project
818820
return errors.BadRequestError(nil).WithMessagef("metadata.proxy_speed_kb should by an int32, but got: '%s', err: %s", *ps, err)
819821
}
820822
}
823+
824+
if cnt := req.Metadata.MaxUpstreamConn; cnt != nil {
825+
if _, err := strconv.ParseInt(*cnt, 10, 32); err != nil {
826+
return errors.BadRequestError(nil).WithMessagef("metadata.max_upstream_conn should be an int, but got '%s', err: %s", *cnt, err)
827+
}
828+
}
821829
}
822830

823831
if req.StorageLimit != nil {

src/server/v2.0/handler/project_metadata.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,12 @@ func (p *projectMetadataAPI) validate(metas map[string]string) (map[string]strin
161161
return nil, errors.New(nil).WithCode(errors.BadRequestCode).WithMessagef("invalid value: %s", value)
162162
}
163163
metas[proModels.ProMetaProxySpeed] = strconv.FormatInt(v, 10)
164+
case proModels.ProMetaMaxUpstreamConn:
165+
v, err := strconv.ParseInt(value, 10, 32)
166+
if err != nil {
167+
return nil, errors.New(nil).WithCode(errors.BadRequestCode).WithMessagef("invalid value: %s", value)
168+
}
169+
metas[proModels.ProMetaMaxUpstreamConn] = strconv.FormatInt(v, 10)
164170
default:
165171
return nil, errors.New(nil).WithCode(errors.BadRequestCode).WithMessagef("invalid key: %s", key)
166172
}

0 commit comments

Comments
 (0)