Skip to content

Commit ddcffa3

Browse files
committed
src: some minor quic stream cleanups
1 parent eee1a95 commit ddcffa3

File tree

3 files changed

+127
-42
lines changed

3 files changed

+127
-42
lines changed

src/quic/defs.h

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -309,15 +309,16 @@ enum class DatagramStatus : uint8_t {
309309
CC_ALGOS(V)
310310
#undef V
311311

312+
using error_code = uint64_t;
313+
using stream_id = int64_t;
314+
using datagram_id = uint64_t;
315+
312316
constexpr size_t kDefaultMaxPacketLength = NGTCP2_MAX_UDP_PAYLOAD_SIZE;
313317
constexpr uint64_t kMaxSizeT = std::numeric_limits<size_t>::max();
314318
constexpr uint64_t kMaxSafeJsInteger = 9007199254740991;
315319
constexpr auto kSocketAddressInfoTimeout = 60 * NGTCP2_SECONDS;
316320
constexpr size_t kMaxVectorCount = 16;
317-
318-
using error_code = uint64_t;
319-
using stream_id = int64_t;
320-
using datagram_id = uint64_t;
321+
constexpr size_t kMaxStreamId = std::numeric_limits<stream_id>::max();
321322

322323
class DebugIndentScope final {
323324
public:

src/quic/streams.cc

Lines changed: 76 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ using v8::Value;
3535
namespace quic {
3636

3737
#define STREAM_STATE(V) \
38-
V(ID, id, int64_t) \
38+
V(ID, id, stream_id) \
3939
V(PENDING, pending, uint8_t) \
4040
V(FIN_SENT, fin_sent, uint8_t) \
4141
V(FIN_RECEIVED, fin_received, uint8_t) \
@@ -107,7 +107,7 @@ PendingStream::~PendingStream() {
107107
}
108108
}
109109

110-
void PendingStream::fulfill(int64_t id) {
110+
void PendingStream::fulfill(stream_id id) {
111111
CHECK(waiting_);
112112
waiting_ = false;
113113
stream_->NotifyStreamOpened(id);
@@ -145,31 +145,76 @@ Maybe<std::shared_ptr<DataQueue>> Stream::GetDataQueueFromSource(
145145
DCHECK_IMPLIES(!value->IsUndefined(), value->IsObject());
146146
std::vector<std::unique_ptr<DataQueue::Entry>> entries;
147147
if (value->IsUndefined()) {
148+
// Return an empty DataQueue.
148149
return Just(std::shared_ptr<DataQueue>());
149150
} else if (value->IsArrayBuffer()) {
151+
// DataQueue is created from an ArrayBuffer.
150152
auto buffer = value.As<ArrayBuffer>();
153+
// We require that the ArrayBuffer be detachable. This ensures that the
154+
// underlying memory can be transferred to the DataQueue without risk
155+
// of the memory being modified by JavaScript code while it is owned
156+
// by the DataQueue.
157+
if (!buffer->IsDetachable()) {
158+
THROW_ERR_INVALID_ARG_TYPE(env, "Data source not detachable");
159+
return Nothing<std::shared_ptr<DataQueue>>();
160+
}
161+
auto backing = buffer->GetBackingStore();
162+
uint64_t offset = 0;
163+
uint64_t length = buffer->ByteLength();
164+
if (buffer->Detach(Local<Value>()).IsNothing()) {
165+
THROW_ERR_INVALID_ARG_TYPE(env, "Data source not detachable");
166+
return Nothing<std::shared_ptr<DataQueue>>();
167+
}
151168
entries.push_back(DataQueue::CreateInMemoryEntryFromBackingStore(
152-
buffer->GetBackingStore(), 0, buffer->ByteLength()));
169+
std::move(backing), offset, length));
153170
return Just(DataQueue::CreateIdempotent(std::move(entries)));
154171
} else if (value->IsSharedArrayBuffer()) {
155-
auto buffer = value.As<SharedArrayBuffer>();
156-
entries.push_back(DataQueue::CreateInMemoryEntryFromBackingStore(
157-
buffer->GetBackingStore(), 0, buffer->ByteLength()));
158-
return Just(DataQueue::CreateIdempotent(std::move(entries)));
172+
// We aren't going to allow use of SharedArrayBuffer as a data source.
173+
// The reason is that SharedArrayBuffer memory is possibly shared with
174+
// other JavaScript code and we cannot detach it, making it impossible
175+
// for us to guarantee that the memory will not be modified while it
176+
// is owned by the DataQueue.
177+
THROW_ERR_INVALID_ARG_TYPE(env, "SharedArrayBuffer is not allowed");
178+
return Nothing<std::shared_ptr<DataQueue>>();
159179
} else if (value->IsArrayBufferView()) {
160-
auto entry =
161-
DataQueue::CreateInMemoryEntryFromView(value.As<ArrayBufferView>());
162-
if (!entry) {
180+
auto view = value.As<ArrayBufferView>();
181+
auto buffer = view->Buffer();
182+
if (buffer->IsSharedArrayBuffer()) {
183+
// We aren't going to allow use of SharedArrayBuffer as a data source.
184+
// The reason is that SharedArrayBuffer memory is possibly shared with
185+
// other JavaScript code and we cannot detach it, making it impossible
186+
// for us to guarantee that the memory will not be modified while it
187+
// is owned by the DataQueue.
188+
THROW_ERR_INVALID_ARG_TYPE(env, "SharedArrayBuffer is not allowed");
189+
return Nothing<std::shared_ptr<DataQueue>>();
190+
}
191+
if (!buffer->IsDetachable()) {
163192
THROW_ERR_INVALID_ARG_TYPE(env, "Data source not detachable");
164193
return Nothing<std::shared_ptr<DataQueue>>();
165194
}
166-
entries.push_back(std::move(entry));
195+
if (buffer->Detach(Local<Value>()).IsNothing()) {
196+
THROW_ERR_INVALID_ARG_TYPE(env, "Data source not detachable");
197+
return Nothing<std::shared_ptr<DataQueue>>();
198+
}
199+
auto backing = buffer->GetBackingStore();
200+
auto offset = view->ByteOffset();
201+
auto length = view->ByteLength();
202+
entries.push_back(DataQueue::CreateInMemoryEntryFromBackingStore(
203+
std::move(backing), offset, length));
167204
return Just(DataQueue::CreateIdempotent(std::move(entries)));
168205
} else if (Blob::HasInstance(env, value)) {
169206
Blob* blob;
170207
ASSIGN_OR_RETURN_UNWRAP(
171208
&blob, value, Nothing<std::shared_ptr<DataQueue>>());
172209
return Just(blob->getDataQueue().slice(0));
210+
} else if (value->IsString()) {
211+
Utf8Value str(env->isolate(), value);
212+
JS_TRY_ALLOCATE_BACKING_OR_RETURN(
213+
env, backing, str.length(), Nothing<std::shared_ptr<DataQueue>>());
214+
memcpy(backing->Data(), *str, str.length());
215+
entries.push_back(DataQueue::CreateInMemoryEntryFromBackingStore(
216+
std::move(backing), 0, backing->ByteLength()));
217+
return Just(DataQueue::CreateIdempotent(std::move(entries)));
173218
}
174219
// TODO(jasnell): Add streaming sources...
175220
THROW_ERR_INVALID_ARG_TYPE(env, "Invalid data source type");
@@ -182,15 +227,16 @@ struct Stream::Impl {
182227
// Attaches an outbound data source to the stream.
183228
JS_METHOD(AttachSource) {
184229
Environment* env = Environment::GetCurrent(args);
230+
Stream* stream;
231+
ASSIGN_OR_RETURN_UNWRAP(&stream, args.This());
185232

186233
std::shared_ptr<DataQueue> dataqueue;
187234
if (GetDataQueueFromSource(env, args[0]).To(&dataqueue)) {
188-
Stream* stream;
189-
ASSIGN_OR_RETURN_UNWRAP(&stream, args.This());
190235
stream->set_outbound(std::move(dataqueue));
191236
}
192237
}
193238

239+
// Immediately and forcefully destroys the stream.
194240
JS_METHOD(Destroy) {
195241
Stream* stream;
196242
ASSIGN_OR_RETURN_UNWRAP(&stream, args.This());
@@ -204,6 +250,10 @@ struct Stream::Impl {
204250
}
205251
}
206252

253+
// Sends a block of headers to the peer. If the stream is not yet open,
254+
// the headers will be queued and sent immediately when the stream is
255+
// opened. If the application does not support sending headers on streams,
256+
// they will be ignored and dropped on the floor.
207257
JS_METHOD(SendHeaders) {
208258
Stream* stream;
209259
ASSIGN_OR_RETURN_UNWRAP(&stream, args.This());
@@ -233,7 +283,7 @@ struct Stream::Impl {
233283
JS_METHOD(StopSending) {
234284
Stream* stream;
235285
ASSIGN_OR_RETURN_UNWRAP(&stream, args.This());
236-
uint64_t code = 0;
286+
error_code code = 0;
237287
CHECK_IMPLIES(!args[0]->IsUndefined(), args[0]->IsBigInt());
238288
if (!args[0]->IsUndefined()) {
239289
bool unused = false; // not used but still necessary.
@@ -258,7 +308,7 @@ struct Stream::Impl {
258308
JS_METHOD(ResetStream) {
259309
Stream* stream;
260310
ASSIGN_OR_RETURN_UNWRAP(&stream, args.This());
261-
uint64_t code = 0;
311+
error_code code = 0;
262312
CHECK_IMPLIES(!args[0]->IsUndefined(), args[0]->IsBigInt());
263313
if (!args[0]->IsUndefined()) {
264314
bool lossless = false; // not used but still necessary.
@@ -315,6 +365,8 @@ struct Stream::Impl {
315365
args.GetReturnValue().Set(static_cast<uint32_t>(priority));
316366
}
317367

368+
// Returns a Blob::Reader that can be used to read data that has been
369+
// received on the stream.
318370
JS_METHOD(GetReader) {
319371
Stream* stream;
320372
ASSIGN_OR_RETURN_UNWRAP(&stream, args.This());
@@ -758,7 +810,7 @@ Stream* Stream::From(void* stream_user_data) {
758810
}
759811

760812
BaseObjectPtr<Stream> Stream::Create(Session* session,
761-
int64_t id,
813+
stream_id id,
762814
std::shared_ptr<DataQueue> source) {
763815
DCHECK_GE(id, 0);
764816
DCHECK_NOT_NULL(session);
@@ -778,7 +830,7 @@ BaseObjectPtr<Stream> Stream::Create(Session* session,
778830

779831
Stream::Stream(BaseObjectWeakPtr<Session> session,
780832
Local<Object> object,
781-
int64_t id,
833+
stream_id id,
782834
std::shared_ptr<DataQueue> source)
783835
: AsyncWrap(session->env(), object, PROVIDER_QUIC_STREAM),
784836
stats_(env()->isolate()),
@@ -787,6 +839,7 @@ Stream::Stream(BaseObjectWeakPtr<Session> session,
787839
inbound_(DataQueue::Create()),
788840
headers_(env()->isolate()) {
789841
MakeWeak();
842+
DCHECK(id < kMaxStreamId);
790843
state_->id = id;
791844
state_->pending = 0;
792845
// Allows us to be notified when data is actually read from the
@@ -818,7 +871,7 @@ Stream::Stream(BaseObjectWeakPtr<Session> session,
818871
std::make_unique<PendingStream>(direction, this, session_)),
819872
headers_(env()->isolate()) {
820873
MakeWeak();
821-
state_->id = -1;
874+
state_->id = kMaxStreamId;
822875
state_->pending = 1;
823876

824877
// Allows us to be notified when data is actually read from the
@@ -841,8 +894,9 @@ Stream::~Stream() {
841894
DCHECK_NE(stats_->destroyed_at, 0);
842895
}
843896

844-
void Stream::NotifyStreamOpened(int64_t id) {
897+
void Stream::NotifyStreamOpened(stream_id id) {
845898
CHECK(is_pending());
899+
DCHECK(id < kMaxStreamId);
846900
Debug(this, "Pending stream opened with id %" PRIi64, id);
847901
state_->pending = 0;
848902
state_->id = id;
@@ -886,13 +940,13 @@ void Stream::NotifyStreamOpened(int64_t id) {
886940
if (outbound_) session().ResumeStream(id);
887941
}
888942

889-
void Stream::NotifyReadableEnded(uint64_t code) {
943+
void Stream::NotifyReadableEnded(error_code code) {
890944
CHECK(!is_pending());
891945
Session::SendPendingDataScope send_scope(&session());
892946
ngtcp2_conn_shutdown_stream_read(session(), 0, id(), code);
893947
}
894948

895-
void Stream::NotifyWritableEnded(uint64_t code) {
949+
void Stream::NotifyWritableEnded(error_code code) {
896950
CHECK(!is_pending());
897951
Session::SendPendingDataScope send_scope(&session());
898952
ngtcp2_conn_shutdown_stream_write(session(), 0, id(), code);
@@ -910,7 +964,7 @@ bool Stream::is_pending() const {
910964
return state_->pending;
911965
}
912966

913-
int64_t Stream::id() const {
967+
stream_id Stream::id() const {
914968
return state_->id;
915969
}
916970

src/quic/streams.h

Lines changed: 46 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,13 @@ class PendingStream final {
105105
// data is buffered in memory makes it essential that the flow control for the
106106
// session and the stream are properly handled. For now, we are largely relying
107107
// on ngtcp2's default flow control mechanisms which generally should be doing
108-
// the right thing.
108+
// the right thing. From the JavaScript side, the application pushes data into
109+
// the stream's outbound queue and ngtcp2 pulls data from that queue as it is
110+
// able. The stream outbound has a high watermark. The JS side can choose to
111+
// continue writing data even after the high watermark is reached but this
112+
// risks using up large amounts of memory if the session is slow to send data
113+
// or the peer is slow to acknowledge receipt. The JavaScript side needs to
114+
// be aware of this risk and pay proper attention to the backpressure signals.
109115
//
110116
// A Stream may be in a fully closed state (No longer readable nor writable)
111117
// state but still have unacknowledged data in both the inbound and outbound
@@ -115,23 +121,29 @@ class PendingStream final {
115121
// (b) all queued data has been acknowledged.
116122
//
117123
// The Stream may be forcefully closed immediately using destroy(err). This
118-
// causes all queued outbound data and pending JavaScript writes are abandoned,
119-
// and causes the Stream to be immediately closed at the ngtcp2 level without
120-
// waiting for any outstanding acknowledgements. Keep in mind, however, that the
121-
// peer is not notified that the stream is destroyed and may attempt to continue
122-
// sending data and acknowledgements.
124+
// causes all queued outbound data to be cleared, pending JavaScript writes
125+
// to be abandoned, the Stream to be immediately closed at the ngtcp2 level
126+
// without waiting for any outstanding acknowledgements. Keep in mind, however,
127+
// that the peer is not notified that the stream is destroyed and may attempt
128+
// to continue sending data and acknowledgements until it is able to determine
129+
// that the stream is gone. Any data that has already been received and is in
130+
// the inbound queue is preserved and may be read by the application.
123131
//
124132
// QUIC streams in general do not have headers. Some QUIC applications, however,
125-
// may associate headers with the stream (HTTP/3 for instance).
133+
// may associate headers with the stream (HTTP/3 for instance). As a
134+
// convenience, the Stream class will hold onto these headers for the
135+
// application.
126136
//
127137
// Streams may be created in a pending state. This means that while the Stream
128138
// object is created, it has not yet been opened in ngtcp2 and therefore has
129139
// no official status yet. Certain operations can still be performed on the
130-
// stream object such as providing data and headers, and destroying the stream.
140+
// stream object such as providing data, adding headers, or destroying the
141+
// stream.
131142
//
132143
// When a stream is created the data source for the stream must be given.
133144
// If no data source is given, then the stream is assumed to not have any
134-
// outbound data. The data source can be fixed length or may support
145+
// outbound data. If the stream was created as bidirectional, the outbound
146+
// side will be closed. The data source can be fixed length or may support
135147
// streaming. What this means practically is, when a stream is opened,
136148
// you must already have a sense of whether that will provide data or
137149
// not. When in doubt, specify a streaming data source, which can produce
@@ -142,15 +154,21 @@ class Stream final : public AsyncWrap,
142154
public:
143155
using Header = NgHeaderBase<BindingData>;
144156

157+
// Acquire a DataQueue from the given value if it is valid. The return
158+
// follows the typical V8 rules for Maybe types. If an error occurs,
159+
// the Maybe will be empty and an exception will be set on the isolate.
145160
static v8::Maybe<std::shared_ptr<DataQueue>> GetDataQueueFromSource(
146161
Environment* env, v8::Local<v8::Value> value);
147162

163+
// The stream_user_data field is from ngtcp2 and will point to the
164+
// Stream instance associated with the stream_id.
148165
static Stream* From(void* stream_user_data);
149166

150167
JS_CONSTRUCTOR(Stream);
151168
JS_BINDING_INIT_BOILERPLATE();
152169

153-
// Creates a new non-pending stream.
170+
// Creates a new non-pending stream. The directionality of the stream
171+
// is inferred from the stream id.
154172
static BaseObjectPtr<Stream> Create(
155173
Session* session,
156174
stream_id id,
@@ -179,7 +197,8 @@ class Stream final : public AsyncWrap,
179197
DISALLOW_COPY_AND_MOVE(Stream)
180198
~Stream() override;
181199

182-
// While the stream is still pending, the id will be -1.
200+
// While the stream is still pending, the id will be kMaxStreamId,
201+
// inidicating the maximum possible stream id is kMaxStreamId - 1.
183202
stream_id id() const;
184203

185204
// While the stream is still pending, the origin will be invalid.
@@ -208,13 +227,23 @@ class Stream final : public AsyncWrap,
208227
// Called by the session/application to indicate that the specified number
209228
// of bytes have been acknowledged by the peer.
210229
void Acknowledge(size_t datalen);
230+
231+
// Called by the session/application to indicate that the specified number
232+
// of bytes have been transmitted to the peer. This is an initial
233+
// indication occuring the first time data is sent. It does not indicate
234+
// that the data has been retransmitted due to loss or has been
235+
// acknowledged to have been received by the peer.
211236
void Commit(size_t datalen);
212237

213238
void EndWritable();
214239
void EndReadable(std::optional<uint64_t> maybe_final_size = std::nullopt);
215240
void EntryRead(size_t amount) override;
216241

217242
// Pulls data from the internal outbound DataQueue configured for this stream.
243+
// This is called by the session/application when it is preparing to send
244+
// data to the peer. There is no guarantee that the requested amount of data
245+
// will actually be sent. The amount of data actually sent is indicated
246+
// by the datalen argument to the Commit() method.
218247
int DoPull(bob::Next<ngtcp2_vec> next,
219248
int options,
220249
ngtcp2_vec* data,
@@ -282,7 +311,8 @@ class Stream final : public AsyncWrap,
282311
void EmitReset(const QuicError& error);
283312

284313
// Notifies the JavaScript side that the application is ready to receive
285-
// trailing headers.
314+
// trailing headers. Any trailing headers must be sent immediately, and
315+
// synchronously when this callback is triggered.
286316
void EmitWantTrailers();
287317

288318
// Notifies the JavaScript side that sending data on the stream has been
@@ -292,8 +322,8 @@ class Stream final : public AsyncWrap,
292322
// Delivers the set of inbound headers that have been collected.
293323
void EmitHeaders();
294324

295-
void NotifyReadableEnded(uint64_t code);
296-
void NotifyWritableEnded(uint64_t code);
325+
void NotifyReadableEnded(error_code code);
326+
void NotifyWritableEnded(error_code code);
297327

298328
// When a pending stream is finally opened, the NotifyStreamOpened method
299329
// will be called and the id will be assigned.
@@ -314,8 +344,8 @@ class Stream final : public AsyncWrap,
314344
std::optional<std::unique_ptr<PendingStream>> maybe_pending_stream_ =
315345
std::nullopt;
316346
std::vector<std::unique_ptr<PendingHeaders>> pending_headers_queue_;
317-
uint64_t pending_close_read_code_ = 0;
318-
uint64_t pending_close_write_code_ = 0;
347+
error_code pending_close_read_code_ = 0;
348+
error_code pending_close_write_code_ = 0;
319349

320350
struct PendingPriority {
321351
StreamPriority priority;

0 commit comments

Comments
 (0)