Skip to content

Commit 65ab0da

Browse files
authored
Security: add flags for TCP connection limits and timeouts (#7518)
* add flags for TCP connection limits and timeouts Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> * ensure semaphore slot is held for the lifetime of stream connections Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> * modernize Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> * change oversize packet log level to debug Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> * add metrics Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> * add stream to activeConnections to make accurate tracking Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> * add histogram for tracking inbound packet sizes in bytes Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> --------- Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
1 parent 29b6167 commit 65ab0da

5 files changed

Lines changed: 513 additions & 6 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
* [FEATURE] Querier: Implement Resource Based Throttling in Querier. #7442
1313
* [ENHANCEMENT] Upgrade prometheus alertmanager version to v0.32.1. #7462
1414
* [ENHANCEMENT] Tenant Federation: Avoid purging the regex resolver LRU cache on user-sync ticks when the set of known users has not changed. #7489
15+
* [ENHANCEMENT] Memberlist: Add `-memberlist.packet-read-timeout`, `-memberlist.max-packet-size`, and `-memberlist.max-concurrent-connections` flags to bound inbound gossip TCP connections, preventing slow-read, OOM, and connection-flood attacks on the gossip port. #7518
1516
* [ENHANCEMENT] Parquet Converter: Add a ring status page to expose the ring status. #7455
1617
* [ENHANCEMENT] Ingester: Add WAL record metrics to help evaluate the effectiveness of WAL compression type (e.g. snappy, zstd): `cortex_ingester_tsdb_wal_record_part_writes_total`, `cortex_ingester_tsdb_wal_record_parts_bytes_written_total`, and `cortex_ingester_tsdb_wal_record_bytes_saved_total`. #7420
1718
* [ENHANCEMENT] Distributor: Introduce dynamic `Symbols` slice capacity pooling. #7398 #7401

docs/configuration/config-file-reference.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4693,6 +4693,18 @@ The `memberlist_config` configures the Gossip memberlist.
46934693
# CLI flag: -memberlist.packet-write-timeout
46944694
[packet_write_timeout: <duration> | default = 5s]
46954695
4696+
# Timeout for reading packet data from inbound connections. 0 = no limit.
4697+
# CLI flag: -memberlist.packet-read-timeout
4698+
[packet_read_timeout: <duration> | default = 5s]
4699+
4700+
# Maximum size in bytes of an inbound gossip packet. 0 = no limit.
4701+
# CLI flag: -memberlist.max-packet-size
4702+
[max_packet_size: <int> | default = 1048576]
4703+
4704+
# Maximum number of concurrent inbound TCP connections. 0 = no limit.
4705+
# CLI flag: -memberlist.max-concurrent-connections
4706+
[max_concurrent_connections: <int> | default = 100]
4707+
46964708
# Enable TLS on the memberlist transport layer.
46974709
# CLI flag: -memberlist.tls-enabled
46984710
[tls_enabled: <boolean> | default = false]

pkg/ring/kv/memberlist/tcp_transport.go

Lines changed: 148 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/prometheus/client_golang/prometheus"
2121
"github.com/prometheus/client_golang/prometheus/promauto"
2222
"go.uber.org/atomic"
23+
"golang.org/x/sync/semaphore"
2324

2425
"github.com/cortexproject/cortex/pkg/util/flagext"
2526
cortextls "github.com/cortexproject/cortex/pkg/util/tls"
@@ -50,6 +51,15 @@ type TCPTransportConfig struct {
5051
// Timeout for writing packet data. Zero = no timeout.
5152
PacketWriteTimeout time.Duration `yaml:"packet_write_timeout"`
5253

54+
// Timeout for reading inbound packet data. Zero = no timeout.
55+
PacketReadTimeout time.Duration `yaml:"packet_read_timeout"`
56+
57+
// Maximum size in bytes of a single inbound packet. Zero = no limit.
58+
MaxPacketSize int64 `yaml:"max_packet_size"`
59+
60+
// Maximum number of concurrent inbound TCP connections. Zero = no limit.
61+
MaxConcurrentConnections int `yaml:"max_concurrent_connections"`
62+
5363
// Transport logs lot of messages at debug level, so it deserves an extra flag for turning it on
5464
TransportDebug bool `yaml:"-"`
5565

@@ -72,6 +82,9 @@ func (cfg *TCPTransportConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix s
7282
f.IntVar(&cfg.BindPort, prefix+"memberlist.bind-port", 7946, "Port to listen on for gossip messages.")
7383
f.DurationVar(&cfg.PacketDialTimeout, prefix+"memberlist.packet-dial-timeout", 5*time.Second, "Timeout used when connecting to other nodes to send packet.")
7484
f.DurationVar(&cfg.PacketWriteTimeout, prefix+"memberlist.packet-write-timeout", 5*time.Second, "Timeout for writing 'packet' data.")
85+
f.DurationVar(&cfg.PacketReadTimeout, prefix+"memberlist.packet-read-timeout", 5*time.Second, "Timeout for reading packet data from inbound connections. 0 = no limit.")
86+
f.Int64Var(&cfg.MaxPacketSize, prefix+"memberlist.max-packet-size", 1*1024*1024 /*1MB*/, "Maximum size in bytes of an inbound gossip packet. 0 = no limit.")
87+
f.IntVar(&cfg.MaxConcurrentConnections, prefix+"memberlist.max-concurrent-connections", 100, "Maximum number of concurrent inbound TCP connections. 0 = no limit.")
7588
f.BoolVar(&cfg.TransportDebug, prefix+"memberlist.transport-debug", false, "Log debug transport messages. Note: global log.level must be at debug level as well.")
7689

7790
f.BoolVar(&cfg.TLSEnabled, prefix+"memberlist.tls-enabled", false, "Enable TLS on the memberlist transport layer.")
@@ -90,6 +103,9 @@ type TCPTransport struct {
90103
tcpListeners []net.Listener
91104
tlsConfig *tls.Config
92105

106+
// connSemaphore limits the number of concurrent inbound TCP connections.
107+
connSemaphore *semaphore.Weighted
108+
93109
shutdown atomic.Int32
94110

95111
advertiseMu sync.RWMutex
@@ -107,6 +123,10 @@ type TCPTransport struct {
107123
sentPacketsBytes prometheus.Counter
108124
sentPacketsErrors prometheus.Counter
109125
unknownConnections prometheus.Counter
126+
rejectedConnections prometheus.Counter
127+
activeConnections prometheus.Gauge
128+
packetReceiveDuration prometheus.Histogram
129+
packetReceiveBytes prometheus.Histogram
110130
}
111131

112132
// NewTCPTransport returns a new tcp-based transport with the given configuration. On
@@ -125,6 +145,10 @@ func NewTCPTransport(config TCPTransportConfig, logger log.Logger) (*TCPTranspor
125145
connCh: make(chan net.Conn),
126146
}
127147

148+
if config.MaxConcurrentConnections > 0 {
149+
t.connSemaphore = semaphore.NewWeighted(int64(config.MaxConcurrentConnections))
150+
}
151+
128152
var err error
129153
if config.TLSEnabled {
130154
t.tlsConfig, err = config.TLS.GetTLSConfig()
@@ -222,7 +246,30 @@ func (t *TCPTransport) tcpListen(tcpLn net.Listener) {
222246
// No error, reset loop delay
223247
loopDelay = 0
224248

225-
go t.handleConnection(conn)
249+
// Enforce concurrent connection via semaphore.
250+
if t.connSemaphore != nil {
251+
if !t.connSemaphore.TryAcquire(1) {
252+
t.rejectedConnections.Inc()
253+
level.Debug(t.logger).Log("msg", "max concurrent connections reached, closing connection", "remote", conn.RemoteAddr())
254+
_ = conn.Close()
255+
continue
256+
}
257+
}
258+
259+
t.activeConnections.Inc()
260+
go func() {
261+
// handleConnection returns true when it wrapped the conn in a
262+
// semaphoreConn and transferred ownership of the slot (and the
263+
// activeConnections gauge) to that wrapper (stream path).
264+
// In that case we must not release here.
265+
semTransferred := t.handleConnection(conn)
266+
if !semTransferred {
267+
if t.connSemaphore != nil {
268+
t.connSemaphore.Release(1)
269+
}
270+
t.activeConnections.Dec()
271+
}
272+
}()
226273
}
227274
}
228275

@@ -235,7 +282,7 @@ func (t *TCPTransport) debugLog() log.Logger {
235282
return noopLogger
236283
}
237284

238-
func (t *TCPTransport) handleConnection(conn net.Conn) {
285+
func (t *TCPTransport) handleConnection(conn net.Conn) (semTransferred bool) {
239286
t.debugLog().Log("msg", "New connection", "addr", conn.RemoteAddr())
240287

241288
closeConn := true
@@ -245,6 +292,15 @@ func (t *TCPTransport) handleConnection(conn net.Conn) {
245292
}
246293
}()
247294

295+
// Apply a read deadline for the entire packet receive so that a slow or
296+
// adversarial peer cannot hold the goroutine open indefinitely.
297+
if t.cfg.PacketReadTimeout > 0 {
298+
if err := conn.SetReadDeadline(time.Now().Add(t.cfg.PacketReadTimeout)); err != nil {
299+
level.Warn(t.logger).Log("msg", "failed to set read deadline", "err", err, "remote", conn.RemoteAddr())
300+
return
301+
}
302+
}
303+
248304
// let's read first byte, and determine what to do about this connection
249305
msgType := []byte{0}
250306
_, err := io.ReadFull(conn, msgType)
@@ -256,13 +312,28 @@ func (t *TCPTransport) handleConnection(conn net.Conn) {
256312
if messageType(msgType[0]) == stream {
257313
t.incomingStreams.Inc()
258314

259-
// hand over this connection to memberlist
315+
// Stream connections are handed off to memberlist which manages them
316+
// independently – clear the deadline so memberlist can use its own
317+
// timeouts, then pass the connection over.
318+
if t.cfg.PacketReadTimeout > 0 {
319+
_ = conn.SetReadDeadline(time.Time{})
320+
}
321+
322+
// hand over this connection to memberlist.
323+
// If the semaphore is active, wrap the conn so that the slot is held
324+
// for the real lifetime of the stream. The memberlist will close it.
260325
closeConn = false
261-
t.connCh <- conn
326+
if t.connSemaphore != nil {
327+
t.connCh <- &semaphoreConn{Conn: conn, sem: t.connSemaphore, activeGauge: t.activeConnections}
328+
} else {
329+
t.connCh <- &semaphoreConn{Conn: conn, activeGauge: t.activeConnections}
330+
}
331+
semTransferred = true
262332
} else if messageType(msgType[0]) == packet {
263333
// it's a memberlist "packet", which contains an address and data.
264334
t.receivedPackets.Inc()
265335

336+
packetStart := time.Now()
266337
// before reading packet, read the address
267338
addrLengthBuf := []byte{0}
268339
_, err := io.ReadFull(conn, addrLengthBuf)
@@ -280,14 +351,27 @@ func (t *TCPTransport) handleConnection(conn net.Conn) {
280351
return
281352
}
282353

283-
// read the rest to buffer -- this is the "packet" itself
284-
buf, err := io.ReadAll(conn)
354+
var reader io.Reader = conn
355+
if t.cfg.MaxPacketSize > 0 {
356+
// Read one byte beyond the limit so we can detect oversized packets.
357+
reader = io.LimitReader(conn, t.cfg.MaxPacketSize+1)
358+
}
359+
buf, err := io.ReadAll(reader)
360+
t.packetReceiveDuration.Observe(time.Since(packetStart).Seconds())
361+
t.packetReceiveBytes.Observe(float64(len(buf)))
285362
if err != nil {
286363
t.receivedPacketsErrors.Inc()
287364
level.Warn(t.logger).Log("msg", "error while reading packet data", "err", err, "remote", conn.RemoteAddr())
288365
return
289366
}
290367

368+
// Reject oversized packets
369+
if t.cfg.MaxPacketSize > 0 && int64(len(buf)) > t.cfg.MaxPacketSize {
370+
t.receivedPacketsErrors.Inc()
371+
level.Debug(t.logger).Log("msg", "packet too large, dropping", "size", len(buf), "max", t.cfg.MaxPacketSize, "remote", conn.RemoteAddr())
372+
return
373+
}
374+
291375
if len(buf) < md5.Size {
292376
t.receivedPacketsErrors.Inc()
293377
level.Warn(t.logger).Log("msg", "not enough data received", "data_length", len(buf), "remote", conn.RemoteAddr())
@@ -318,6 +402,7 @@ func (t *TCPTransport) handleConnection(conn net.Conn) {
318402
t.unknownConnections.Inc()
319403
level.Error(t.logger).Log("msg", "unknown message type", "msgType", msgType, "remote", conn.RemoteAddr())
320404
}
405+
return
321406
}
322407

323408
type addr string
@@ -330,6 +415,27 @@ func (a addr) String() string {
330415
return string(a)
331416
}
332417

418+
// semaphoreConn wraps a net.Conn and releases a semaphore slot (if set) and
419+
// decrements the active-connections gauge exactly once when the connection is
420+
// closed. It is used on the stream path to keep both the semaphore slot and
421+
// the gauge accurate for the real lifetime of the connection.
422+
type semaphoreConn struct {
423+
net.Conn
424+
sem *semaphore.Weighted
425+
activeGauge prometheus.Gauge
426+
once sync.Once
427+
}
428+
429+
func (c *semaphoreConn) Close() error {
430+
c.once.Do(func() {
431+
if c.sem != nil {
432+
c.sem.Release(1)
433+
}
434+
c.activeGauge.Dec()
435+
})
436+
return c.Conn.Close()
437+
}
438+
333439
func (t *TCPTransport) getConnection(addr string, timeout time.Duration) (net.Conn, error) {
334440
if t.cfg.TLSEnabled {
335441
return tls.DialWithDialer(&net.Dialer{Timeout: timeout}, "tcp", addr, t.tlsConfig)
@@ -634,4 +740,40 @@ func (t *TCPTransport) registerMetrics(registerer prometheus.Registerer) {
634740
Name: "unknown_connections_total",
635741
Help: "Number of unknown TCP connections (not a packet or stream)",
636742
})
743+
744+
t.rejectedConnections = promauto.With(registerer).NewCounter(prometheus.CounterOpts{
745+
Namespace: t.cfg.MetricsNamespace,
746+
Subsystem: subsystem,
747+
Name: "rejected_connections_total",
748+
Help: "Number of inbound TCP connections rejected because the concurrent connection limit was reached",
749+
})
750+
751+
t.activeConnections = promauto.With(registerer).NewGauge(prometheus.GaugeOpts{
752+
Namespace: t.cfg.MetricsNamespace,
753+
Subsystem: subsystem,
754+
Name: "active_connections",
755+
Help: "Current number of active inbound TCP connections.",
756+
})
757+
758+
t.packetReceiveDuration = promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
759+
Namespace: t.cfg.MetricsNamespace,
760+
Subsystem: subsystem,
761+
Name: "packet_receive_duration_seconds",
762+
Help: "Duration (in seconds) of inbound packet-type message reads.",
763+
Buckets: prometheus.DefBuckets,
764+
NativeHistogramBucketFactor: 1.1,
765+
NativeHistogramMaxBucketNumber: 100,
766+
NativeHistogramMinResetDuration: 1 * time.Hour,
767+
})
768+
769+
t.packetReceiveBytes = promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
770+
Namespace: t.cfg.MetricsNamespace,
771+
Subsystem: subsystem,
772+
Name: "packet_receive_bytes",
773+
Help: "Distribution of inbound packet sizes in bytes.",
774+
Buckets: prometheus.ExponentialBuckets(64, 4, 8), // 64, 256, 1K, 4K, 16K, 64K, 256K, 1M
775+
NativeHistogramBucketFactor: 1.1,
776+
NativeHistogramMaxBucketNumber: 100,
777+
NativeHistogramMinResetDuration: 1 * time.Hour,
778+
})
637779
}

0 commit comments

Comments
 (0)