Skip to content

Commit 3ec6f89

Browse files
committed
Make it possible to configure buffer sizes.
Document how backpressure and buffers work. Refs #170.
1 parent a41c831 commit 3ec6f89

File tree

4 files changed

+103
-10
lines changed

4 files changed

+103
-10
lines changed

docs/deployment.rst

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,76 @@
1+
Deployment
2+
==========
3+
4+
Backpressure
5+
------------
6+
7+
.. note::
8+
9+
This section discusses the concept of backpressure from the perspective of
10+
a server but the concepts also apply to clients. The issue is symmetrical.
11+
12+
With a naive implementation, if a server receives inputs faster than it can
13+
process them, or if it generates outputs faster than it can send them, data
14+
accumulates in buffers, eventually causing the server to run out of memory and
15+
crash.
16+
17+
The solution to this problem is backpressure. Any part of the server that
18+
receives inputs faster than it can it can process them and send the outputs
19+
must propagate that information back to the previous part in the chain.
20+
21+
``websockets`` is designed to make it easy to get backpressure right.
22+
23+
For incoming data, ``websockets`` builds upon :class:`~asyncio.StreamReader`
24+
which propagates backpressure to its own buffer and to the TCP stream. Frames
25+
are parsed from the input stream and added to a bounded queue. If the queue
26+
fills up, parsing halts until some the application reads a frame.
27+
28+
For outgoing data, ``websockets`` builds upon :class:`~asyncio.StreamWriter`
29+
which implements flow control. If the output buffers grow too large, it waits
30+
until they're drained. That's why all APIs that write frames are asynchronous
31+
in websockets (since version 2.0).
32+
33+
Of course, it's still possible for an application to create its own unbounded
34+
buffers and break the backpressure. Be careful with queues.
35+
36+
Buffers
37+
-------
38+
39+
An asynchronous systems works best when its buffers are almost always empty.
40+
41+
For example, if a client sends frames too fast for a server, the queue of
42+
incoming frames will be constantly full. The server will always be 32 frames
43+
(by default) behind the client. This consumes memory and adds latency for no
44+
good reason.
45+
46+
If buffers are almost always full and that problem cannot be solved by adding
47+
capacity (typically because the system is bottlenecked by the output and
48+
constantly regulated by backpressure), reducing the size of buffers minimizes
49+
negative consequences.
50+
51+
By default ``websockets`` has rather high limits. You can decrease them
52+
according to your application's characteristics.
53+
54+
Bufferbloat can happen at every level in the stack where there is a buffer.
55+
The receiving side contains these buffers:
56+
57+
- OS buffers: you shouldn't need to tune them in general.
58+
- :class:`~asyncio.StreamReader` bytes buffer: the default limit is 64kB.
59+
You can set another limit by passing a ``read_limit`` keyword argument to
60+
:func:`~websockets.client.connect` or :func:`~websockets.server.serve`.
61+
- ``websockets`` frame buffer: its size depends both on the size and the
62+
number of frames it contains. By default the maximum size is 1MB and the
63+
maximum number is 32. You can adjust these limits by setting the
64+
``max_size`` and ``max_queue`` keyword arguments of
65+
:func:`~websockets.client.connect` or :func:`~websockets.server.serve`.
66+
67+
The sending side contains these buffers:
68+
69+
- :class:`~asyncio.StreamWriter` bytes buffer: the default size is 64kB.
70+
You can set another limit by passing a ``write_limit`` keyword argument to
71+
:func:`~websockets.client.connect` or :func:`~websockets.server.serve`.
72+
- OS buffers: you shouldn't need to tune them in general.
73+
174
Deployment
275
----------
376

@@ -34,7 +107,6 @@ Here's a full example (Unix-only):
34107

35108
.. literalinclude:: ../example/shutdown.py
36109

37-
38110
It's more difficult to achieve the same effect on Windows. Some third-party
39111
projects try to help with this problem.
40112

websockets/client.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ def handshake(self, wsuri,
136136
def connect(uri, *,
137137
klass=WebSocketClientProtocol,
138138
timeout=10, max_size=2 ** 20, max_queue=2 ** 5,
139+
read_limit=2 ** 16, write_limit=2 ** 16,
139140
loop=None, legacy_recv=False,
140141
origin=None, subprotocols=None, extra_headers=None,
141142
**kwds):
@@ -154,9 +155,9 @@ def connect(uri, *,
154155
a ``wss://`` URI, if this argument isn't provided explicitly, it's set to
155156
``True``, which means Python's default :class:`~ssl.SSLContext` is used.
156157
157-
The behavior of the ``timeout``, ``max_size``, and ``max_queue`` optional
158-
arguments is described the documentation of
159-
:class:`~websockets.protocol.WebSocketCommonProtocol`.
158+
The behavior of the ``timeout``, ``max_size``, and ``max_queue``,
159+
``read_limit``, and ``write_limit`` optional arguments is described in the
160+
documentation of :class:`~websockets.protocol.WebSocketCommonProtocol`.
160161
161162
:func:`connect` also accepts the following optional arguments:
162163
@@ -186,6 +187,7 @@ def connect(uri, *,
186187
factory = lambda: klass(
187188
host=wsuri.host, port=wsuri.port, secure=wsuri.secure,
188189
timeout=timeout, max_size=max_size, max_queue=max_queue,
190+
read_limit=read_limit, write_limit=write_limit,
189191
loop=loop, legacy_recv=legacy_recv,
190192
)
191193

websockets/protocol.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,16 @@ class WebSocketCommonProtocol(asyncio.StreamReaderProtocol):
7878
this is 128MB. You may want to lower the limits, depending on your
7979
application's requirements.
8080
81+
The ``read_limit`` argument sets the high-water limit of the buffer for
82+
incoming bytes. The low-water limit is half the high-water limit. The
83+
default value is 64kB, half of asyncio's default (based on the current
84+
implementation of :class:`~asyncio.StreamReader`).
85+
86+
The ``write_limit`` argument sets the high-water limit of the buffer for
87+
outgoing bytes. The low-water limit is a quarter of the high-water limit.
88+
The default value is 64kB, equal to asyncio's default (based on the
89+
current implementation of ``_FlowControlMixin``).
90+
8191
As soon as the HTTP request and response in the opening handshake are
8292
processed, the request path is available in the :attr:`path` attribute,
8393
and the request and response HTTP headers are available:
@@ -105,22 +115,27 @@ class WebSocketCommonProtocol(asyncio.StreamReaderProtocol):
105115
def __init__(self, *,
106116
host=None, port=None, secure=None,
107117
timeout=10, max_size=2 ** 20, max_queue=2 ** 5,
118+
read_limit=2 ** 16, write_limit=2 ** 16,
108119
loop=None, legacy_recv=False):
109120
self.host = host
110121
self.port = port
111122
self.secure = secure
112-
113123
self.timeout = timeout
114124
self.max_size = max_size
125+
self.max_queue = max_queue
126+
self.read_limit = read_limit
127+
self.write_limit = write_limit
128+
115129
# Store a reference to loop to avoid relying on self._loop, a private
116-
# attribute of StreamReaderProtocol, inherited from FlowControlMixin.
130+
# attribute of StreamReaderProtocol, inherited from _FlowControlMixin.
117131
if loop is None:
118132
loop = asyncio.get_event_loop()
119133
self.loop = loop
120134

121135
self.legacy_recv = legacy_recv
122136

123-
stream_reader = asyncio.StreamReader(loop=loop)
137+
# This limit is both the line length limit and half the buffer limit.
138+
stream_reader = asyncio.StreamReader(limit=read_limit // 2, loop=loop)
124139
super().__init__(stream_reader, self.client_connected, loop)
125140

126141
self.reader = None
@@ -636,6 +651,8 @@ def fail_connection(self, code=1011, reason=''):
636651
def client_connected(self, reader, writer):
637652
self.reader = reader
638653
self.writer = writer
654+
# Configure write buffer limit.
655+
self.writer._transport.set_write_buffer_limits(self.write_limit)
639656
# Start the task that handles incoming messages.
640657
self.worker_task = asyncio_ensure_future(self.run(), loop=self.loop)
641658

websockets/server.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,7 @@ def wait_closed(self):
388388
def serve(ws_handler, host=None, port=None, *,
389389
klass=WebSocketServerProtocol,
390390
timeout=10, max_size=2 ** 20, max_queue=2 ** 5,
391+
read_limit=2 ** 16, write_limit=2 ** 16,
391392
loop=None, legacy_recv=False,
392393
origins=None, subprotocols=None, extra_headers=None,
393394
**kwds):
@@ -412,9 +413,9 @@ def serve(ws_handler, host=None, port=None, *,
412413
For example, you can set the ``ssl`` keyword argument to a
413414
:class:`~ssl.SSLContext` to enable TLS.
414415
415-
The behavior of the ``timeout``, ``max_size``, and ``max_queue`` optional
416-
arguments is described the documentation of
417-
:class:`~websockets.protocol.WebSocketCommonProtocol`.
416+
The behavior of the ``timeout``, ``max_size``, and ``max_queue``,
417+
``read_limit``, and ``write_limit`` optional arguments is described in the
418+
documentation of :class:`~websockets.protocol.WebSocketCommonProtocol`.
418419
419420
:func:`serve` also accepts the following optional arguments:
420421
@@ -451,6 +452,7 @@ def serve(ws_handler, host=None, port=None, *,
451452
ws_handler, ws_server,
452453
host=host, port=port, secure=secure,
453454
timeout=timeout, max_size=max_size, max_queue=max_queue,
455+
read_limit=read_limit, write_limit=write_limit,
454456
loop=loop, legacy_recv=legacy_recv,
455457
origins=origins, subprotocols=subprotocols,
456458
extra_headers=extra_headers,

0 commit comments

Comments
 (0)