Skip to content

Commit 84f4b5e

Browse files
committed
added semaphores to implement limits for max number of network connections
1 parent 842b6cc commit 84f4b5e

File tree

6 files changed

+157
-30
lines changed

6 files changed

+157
-30
lines changed

engine/plugins/service_discovery/http_probes/plugin.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,11 @@ func (hp *httpProbing) query(e *et.Event, entity *dbt.Entity, target string, por
105105
ctx, cancel := context.WithTimeout(e.Session.Ctx(), 5*time.Second)
106106
defer cancel()
107107

108-
if resp, err := http.RequestWebPage(ctx, &http.Request{URL: target}); err == nil && resp != nil {
108+
e.Session.NetSem().Acquire()
109+
resp, err := http.RequestWebPage(ctx, &http.Request{URL: target})
110+
e.Session.NetSem().Release()
111+
112+
if err == nil && resp != nil {
109113
findings = append(findings, hp.store(e, resp, entity, port)...)
110114
}
111115
return findings

engine/sessions/manager.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func NewManager(l *slog.Logger, reg et.Registry) et.SessionManager {
4141
}
4242

4343
func (r *manager) NewSession(cfg *config.Config) (et.Session, error) {
44-
s, err := CreateSession(r.registry, cfg)
44+
s, err := CreateSession(r, r.registry, cfg)
4545
if err == nil {
4646
err = r.AddSession(s)
4747

@@ -120,6 +120,13 @@ func (r *manager) GetSession(id uuid.UUID) et.Session {
120120
return nil
121121
}
122122

123+
func (r *manager) NumOfSessions() int {
124+
r.RLock()
125+
defer r.RUnlock()
126+
127+
return len(r.sessions)
128+
}
129+
123130
// Shutdown: cleans all sessions from a session storage and shutdown the session storage.
124131
func (r *manager) Shutdown() {
125132
var list []uuid.UUID

engine/sessions/session.go

Lines changed: 105 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,15 @@ import (
1212
"os"
1313
"path/filepath"
1414
"strings"
15+
"sync"
1516
"time"
1617

1718
"github.com/google/uuid"
1819
"github.com/owasp-amass/amass/v5/config"
1920
"github.com/owasp-amass/amass/v5/engine/pubsub"
2021
"github.com/owasp-amass/amass/v5/engine/sessions/scope"
2122
et "github.com/owasp-amass/amass/v5/engine/types"
23+
amassnet "github.com/owasp-amass/amass/v5/internal/net"
2224
assetdb "github.com/owasp-amass/asset-db"
2325
"github.com/owasp-amass/asset-db/repository"
2426
"github.com/owasp-amass/asset-db/repository/neo4j"
@@ -29,47 +31,54 @@ import (
2931
)
3032

3133
type Session struct {
32-
id uuid.UUID
33-
ctx context.Context
34-
cancel context.CancelFunc
35-
log *slog.Logger
36-
ps *pubsub.Logger
37-
cfg *config.Config
38-
scope et.Scope
39-
start time.Time
40-
db repository.Repository
41-
backlog *sessionBacklog
42-
pipelines et.SessionPipelines
43-
dsn string
44-
dbtype string
45-
ranger cidranger.Ranger
46-
tmpdir string
47-
stats *et.SessionStats
48-
done chan struct{}
49-
finished bool
34+
id uuid.UUID
35+
mgr *manager
36+
ctx context.Context
37+
cancel context.CancelFunc
38+
log *slog.Logger
39+
ps *pubsub.Logger
40+
cfg *config.Config
41+
scope et.Scope
42+
start time.Time
43+
db repository.Repository
44+
backlog *sessionBacklog
45+
pipelines et.SessionPipelines
46+
dsn string
47+
dbtype string
48+
ranger cidranger.Ranger
49+
tmpdir string
50+
stats *et.SessionStats
51+
done chan struct{}
52+
finished bool
53+
numOfSess int
54+
netSemaphore *sessSemaphore
5055
}
5156

5257
// CreateSession initializes a new Session object based on the provided configuration.
5358
// The session object represents the state of an active engine enumeration.
54-
func CreateSession(reg et.Registry, cfg *config.Config) (et.Session, error) {
59+
func CreateSession(mgr *manager, reg et.Registry, cfg *config.Config) (et.Session, error) {
5560
// Use default configuration if none is provided
5661
if cfg == nil {
5762
cfg = config.NewConfig()
5863
}
5964

6065
startTime := time.Now()
66+
numOfSessions := mgr.NumOfSessions() + 1
6167
ctx, cancel := context.WithCancel(context.Background())
6268
// Create a new session object
6369
s := &Session{
64-
id: uuid.New(),
65-
ctx: ctx,
66-
cancel: cancel,
67-
cfg: cfg,
68-
start: startTime,
69-
ranger: NewAmassRanger(),
70-
ps: pubsub.NewLogger(),
71-
stats: new(et.SessionStats),
72-
done: make(chan struct{}),
70+
id: uuid.New(),
71+
mgr: mgr,
72+
ctx: ctx,
73+
cancel: cancel,
74+
cfg: cfg,
75+
start: startTime,
76+
ranger: NewAmassRanger(),
77+
ps: pubsub.NewLogger(),
78+
stats: new(et.SessionStats),
79+
done: make(chan struct{}),
80+
numOfSess: numOfSessions,
81+
netSemaphore: NewSessSemaphore(amassnet.MaxNetworkConns / numOfSessions),
7382
}
7483
s.scope = scope.CreateFromConfigScope(s)
7584
s.log = slog.New(slog.NewJSONHandler(s.ps, nil)).With("session", s.id)
@@ -102,6 +111,7 @@ func CreateSession(reg et.Registry, cfg *config.Config) (et.Session, error) {
102111
s.log.Info("Temporary directory created", slog.String("dir", s.tmpdir))
103112
s.log.Info("Database connection established", slog.String("dsn", s.dsn))
104113
go s.updateStats()
114+
go s.updateSessionSemaphore()
105115
return s, nil
106116
}
107117

@@ -121,6 +131,10 @@ func (s *Session) PubSub() *pubsub.Logger {
121131
return s.ps
122132
}
123133

134+
func (s *Session) NetSem() et.SessionSemaphone {
135+
return s.netSemaphore
136+
}
137+
124138
func (s *Session) Config() *config.Config {
125139
return s.cfg
126140
}
@@ -298,3 +312,66 @@ func (s *Session) calculateStats() {
298312
ss.WorkItemsCompleted = completed
299313
ss.Unlock()
300314
}
315+
316+
type sessSemaphore struct {
317+
sync.Mutex
318+
sem amassnet.Semaphore
319+
}
320+
321+
func NewSessSemaphore(limit int) *sessSemaphore {
322+
return &sessSemaphore{sem: amassnet.NewSemaphore(limit)}
323+
}
324+
325+
func (ss *sessSemaphore) Acquire() {
326+
ss.Lock()
327+
defer ss.Unlock()
328+
329+
ss.sem.Acquire()
330+
}
331+
332+
func (ss *sessSemaphore) Release() {
333+
ss.Lock()
334+
defer ss.Unlock()
335+
336+
ss.sem.Release()
337+
}
338+
339+
func (s *Session) updateSessionSemaphore() {
340+
tick := time.NewTicker(10 * time.Second)
341+
defer tick.Stop()
342+
343+
for {
344+
select {
345+
case <-s.done:
346+
return
347+
case <-tick.C:
348+
if num := s.mgr.NumOfSessions(); num != s.numOfSess {
349+
s.numOfSess = num
350+
s.buildNewSessionSemaphore()
351+
}
352+
}
353+
}
354+
}
355+
356+
func (s *Session) buildNewSessionSemaphore() {
357+
s.netSemaphore.Lock()
358+
defer s.netSemaphore.Unlock()
359+
360+
limit := amassnet.MaxNetworkConns / s.numOfSess
361+
sem := amassnet.NewSemaphore(limit)
362+
loop:
363+
for range limit {
364+
select {
365+
case s.netSemaphore.sem <- struct{}{}:
366+
select {
367+
case <-sem:
368+
default:
369+
break loop
370+
}
371+
default:
372+
break loop
373+
}
374+
}
375+
376+
s.netSemaphore.sem = sem
377+
}

engine/types/sessions.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,17 @@ import (
2626

2727
type SessionPipelines map[oam.AssetType]*AssetPipeline
2828

29+
type SessionSemaphone interface {
30+
Acquire()
31+
Release()
32+
}
33+
2934
type Session interface {
3035
ID() uuid.UUID
3136
Ctx() context.Context
3237
Log() *slog.Logger
3338
PubSub() *pubsub.Logger
39+
NetSem() SessionSemaphone
3440
Config() *config.Config
3541
Scope() Scope
3642
StartTime() time.Time

internal/net/http/http.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,9 @@ func RequestWebPage(ctx context.Context, r *Request) (*Response, error) {
201201
}
202202
}
203203

204+
amassnet.MaxNetConnSem.Acquire()
204205
resp, err := DefaultClient.Do(req)
206+
amassnet.MaxNetConnSem.Release()
205207
if err != nil {
206208
return nil, err
207209
}

internal/net/network.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ import (
1414
"time"
1515
)
1616

17+
// Maximum number of connections allowed by each process.
18+
const MaxNetworkConns = 500
19+
1720
// IPv4RE is a regular expression that will match an IPv4 address.
1821
const IPv4RE = "((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)[.]){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)"
1922

@@ -48,12 +51,40 @@ var ReservedCIDRs = []string{
4851
// The reserved network address ranges
4952
var reservedAddrRanges []*net.IPNet
5053

54+
type Semaphore chan struct{}
55+
56+
var MaxNetConnSem Semaphore
57+
5158
func init() {
5259
for _, cidr := range ReservedCIDRs {
5360
if _, ipnet, err := net.ParseCIDR(cidr); err == nil {
5461
reservedAddrRanges = append(reservedAddrRanges, ipnet)
5562
}
5663
}
64+
65+
// setup the semaphore for limiting max network connections
66+
MaxNetConnSem = NewSemaphore(MaxNetworkConns)
67+
}
68+
69+
func NewSemaphore(cap int) Semaphore {
70+
sem := make(Semaphore, cap)
71+
72+
for range cap {
73+
sem <- struct{}{}
74+
}
75+
76+
return sem
77+
}
78+
79+
func (r Semaphore) Acquire() {
80+
<-r
81+
}
82+
83+
func (r Semaphore) Release() {
84+
select {
85+
case r <- struct{}{}:
86+
default:
87+
}
5788
}
5889

5990
// DialContext performs the dial using global variables (e.g. LocalAddr).

0 commit comments

Comments
 (0)