Skip to content

Commit d121ffd

Browse files
committed
quic: refactoring sendpendingdata
Refactoring SendPendingData for both default and http3 applications. PR-URL: nodejs#275 Reviewed-By: Anna Henningsen <[email protected]>
1 parent 9311dc5 commit d121ffd

12 files changed

+940
-825
lines changed

node.gyp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -876,6 +876,7 @@
876876
'src/quic/node_quic_session-inl.h',
877877
'src/quic/node_quic_socket.h',
878878
'src/quic/node_quic_stream.h',
879+
'src/quic/node_quic_stream-inl.h',
879880
'src/quic/node_quic_util.h',
880881
'src/quic/node_quic_util-inl.h',
881882
'src/quic/node_quic_state.h',

src/quic/node_quic.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
#include "node_quic_crypto.h"
1010
#include "node_quic_session-inl.h"
1111
#include "node_quic_socket.h"
12-
#include "node_quic_stream.h"
12+
#include "node_quic_stream-inl.h"
1313
#include "node_quic_state.h"
1414
#include "node_quic_util-inl.h"
1515
#include "node_sockaddr-inl.h"

src/quic/node_quic_default_application.cc

Lines changed: 99 additions & 192 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
#include "node_quic_default_application.h"
44
#include "node_quic_session-inl.h"
55
#include "node_quic_socket.h"
6-
#include "node_quic_stream.h"
6+
#include "node_quic_stream-inl.h"
77
#include "node_quic_util-inl.h"
88
#include "node_sockaddr-inl.h"
99
#include <ngtcp2/ngtcp2.h>
@@ -13,16 +13,69 @@
1313
namespace node {
1414
namespace quic {
1515

16+
namespace {
17+
void Consume(ngtcp2_vec** pvec, size_t* pcnt, size_t len) {
18+
ngtcp2_vec* v = *pvec;
19+
size_t cnt = *pcnt;
20+
21+
for (; cnt > 0; --cnt, ++v) {
22+
if (v->len > len) {
23+
v->len -= len;
24+
v->base += len;
25+
break;
26+
}
27+
len -= v->len;
28+
}
29+
30+
*pvec = v;
31+
*pcnt = cnt;
32+
}
33+
34+
int IsEmpty(const ngtcp2_vec* vec, size_t cnt) {
35+
size_t i;
36+
for (i = 0; i < cnt && vec[i].len == 0; ++i) {}
37+
return i == cnt;
38+
}
39+
} // anonymous namespace
40+
1641
DefaultApplication::DefaultApplication(
1742
QuicSession* session) :
1843
QuicApplication(session) {}
1944

2045
bool DefaultApplication::Initialize() {
21-
if (!needs_init())
22-
return false;
23-
Debug(session(), "Default QUIC Application Initialized");
24-
set_init_done();
25-
return true;
46+
if (needs_init()) {
47+
Debug(session(), "Default QUIC Application Initialized");
48+
set_init_done();
49+
}
50+
return needs_init();
51+
}
52+
53+
void DefaultApplication::ScheduleStream(int64_t stream_id) {
54+
QuicStream* stream = session()->FindStream(stream_id);
55+
Debug(session(), "Scheduling stream %" PRIu64, stream_id);
56+
if (stream != nullptr)
57+
stream->Schedule(&stream_queue_);
58+
}
59+
60+
void DefaultApplication::UnscheduleStream(int64_t stream_id) {
61+
QuicStream* stream = session()->FindStream(stream_id);
62+
Debug(session(), "Unscheduling stream %" PRIu64, stream_id);
63+
if (stream != nullptr)
64+
stream->Unschedule();
65+
}
66+
67+
void DefaultApplication::StreamClose(
68+
int64_t stream_id,
69+
uint64_t app_error_code) {
70+
if (app_error_code == 0)
71+
app_error_code = NGTCP2_APP_NOERROR;
72+
UnscheduleStream(stream_id);
73+
QuicApplication::StreamClose(stream_id, app_error_code);
74+
}
75+
76+
void DefaultApplication::ResumeStream(int64_t stream_id) {
77+
Debug(session(), "Stream %" PRId64 " has data to send");
78+
ScheduleStream(stream_id);
2679
}
2780

2881
bool DefaultApplication::ReceiveStreamData(
@@ -59,197 +112,51 @@ bool DefaultApplication::ReceiveStreamData(
59112
return true;
60113
}
61114

62-
void DefaultApplication::AcknowledgeStreamData(
63-
int64_t stream_id,
64-
uint64_t offset,
65-
size_t datalen) {
66-
QuicStream* stream = session()->FindStream(stream_id);
67-
Debug(session(), "Default QUIC Application acknowledging stream data");
68-
// It's possible that the stream has already been destroyed and
69-
// removed. If so, just silently ignore the ack
70-
if (stream != nullptr)
71-
stream->AckedDataOffset(offset, datalen);
115+
int DefaultApplication::GetStreamData(StreamData* stream_data) {
116+
QuicStream* stream = stream_queue_.PopFront();
117+
// If stream is nullptr, there are no streams with data pending.
118+
if (stream == nullptr)
119+
return 0;
120+
121+
stream_data->remaining =
122+
stream->DrainInto(
123+
&stream_data->data,
124+
&stream_data->count,
125+
MAX_VECTOR_COUNT);
126+
127+
stream_data->stream.reset(stream);
128+
stream_data->id = stream->id();
129+
stream_data->fin = stream->is_writable() ? 0 : 1;
130+
131+
// Schedule the stream again only if there is data to write. There
132+
// might not actually be any more data to write but we can't know
133+
// that yet as it depends entirely on how much data actually gets
134+
// serialized by ngtcp2.
135+
if (stream_data->count > 0)
136+
stream->Schedule(&stream_queue_);
137+
138+
Debug(session(), "Selected %" PRId64 " buffers for stream %" PRId64 "%s",
139+
stream_data->count,
140+
stream_data->id,
141+
stream_data->fin == 1 ? " (fin)" : "");
142+
return 0;
72143
}
73144

74-
bool DefaultApplication::SendPendingData() {
75-
// Right now this iterates through the streams in the order they
76-
// were created. Later, we might want to implement a prioritization
77-
// scheme to allow higher priority streams to be serialized first.
78-
// Prioritization is left entirely up to the application layer in QUIC.
79-
// HTTP/3, for instance, drops prioritization entirely.
80-
Debug(session(), "Default QUIC Application sending pending data");
81-
for (const auto& stream : session()->streams()) {
82-
if (!SendStreamData(stream.second.get()))
83-
return false;
84-
85-
// Check to make sure QuicSession state did not change in this iteration
86-
if (session()->is_in_draining_period() ||
87-
session()->is_in_closing_period() ||
88-
session()->is_destroyed()) {
89-
break;
90-
}
91-
}
92-
145+
bool DefaultApplication::StreamCommit(
146+
StreamData* stream_data,
147+
size_t datalen) {
148+
CHECK(stream_data->stream);
149+
stream_data->remaining -= datalen;
150+
Consume(&stream_data->buf, &stream_data->count, datalen);
151+
stream_data->stream->Commit(datalen);
93152
return true;
94153
}
95154

96-
namespace {
97-
void Consume(ngtcp2_vec** pvec, size_t* pcnt, size_t len) {
98-
ngtcp2_vec* v = *pvec;
99-
size_t cnt = *pcnt;
100-
101-
for (; cnt > 0; --cnt, ++v) {
102-
if (v->len > len) {
103-
v->len -= len;
104-
v->base += len;
105-
break;
106-
}
107-
len -= v->len;
108-
}
109-
110-
*pvec = v;
111-
*pcnt = cnt;
112-
}
113-
114-
int IsEmpty(const ngtcp2_vec* vec, size_t cnt) {
115-
size_t i;
116-
for (i = 0; i < cnt && vec[i].len == 0; ++i) {}
117-
return i == cnt;
118-
}
119-
} // anonymous namespace
120-
121-
bool DefaultApplication::SendStreamData(QuicStream* stream) {
122-
ssize_t ndatalen = 0;
123-
QuicPathStorage path;
124-
Debug(session(), "Default QUIC Application sending stream %" PRId64 " data",
125-
stream->GetID());
126-
127-
std::vector<ngtcp2_vec> vec;
128-
129-
// remaining is the total number of bytes stored in the vector
130-
// that are remaining to be serialized.
131-
size_t remaining = stream->DrainInto(&vec);
132-
Debug(stream, "Sending %d bytes of stream data. Still writable? %s",
133-
remaining,
134-
stream->is_writable() ? "yes" : "no");
135-
136-
// c and v are used to track the current serialization position
137-
// for each iteration of the for(;;) loop below.
138-
size_t c = vec.size();
139-
ngtcp2_vec* v = vec.data();
140-
141-
// If there is no stream data and we're not sending fin,
142-
// Just return without doing anything.
143-
if (c == 0 && stream->is_writable()) {
144-
Debug(stream, "There is no stream data to send");
145-
return true;
146-
}
147-
148-
std::unique_ptr<QuicPacket> packet = CreateStreamDataPacket();
149-
size_t packet_offset = 0;
150-
151-
for (;;) {
152-
Debug(stream, "Starting packet serialization. Remaining? %d", remaining);
153-
154-
// If packet was sent on the previous iteration, it will have been reset
155-
if (!packet)
156-
packet = CreateStreamDataPacket();
157-
158-
ssize_t nwrite =
159-
ngtcp2_conn_writev_stream(
160-
session()->connection(),
161-
&path.path,
162-
packet->data() + packet_offset,
163-
session()->max_packet_length(),
164-
&ndatalen,
165-
remaining > 0 ?
166-
NGTCP2_WRITE_STREAM_FLAG_MORE :
167-
NGTCP2_WRITE_STREAM_FLAG_NONE,
168-
stream->GetID(),
169-
stream->is_writable() ? 0 : 1,
170-
reinterpret_cast<const ngtcp2_vec*>(v),
171-
c,
172-
uv_hrtime());
173-
174-
if (nwrite <= 0) {
175-
switch (nwrite) {
176-
case 0:
177-
// If zero is returned, we've hit congestion limits. We need to stop
178-
// serializing data and try again later to empty the queue once the
179-
// congestion window has expanded.
180-
Debug(stream, "Congestion limit reached");
181-
return true;
182-
case NGTCP2_ERR_PKT_NUM_EXHAUSTED:
183-
// There is a finite number of packets that can be sent
184-
// per connection. Once those are exhausted, there's
185-
// absolutely nothing we can do except immediately
186-
// and silently tear down the QuicSession. This has
187-
// to be silent because we can't even send a
188-
// CONNECTION_CLOSE since even those require a
189-
// packet number.
190-
session()->SilentClose();
191-
return false;
192-
case NGTCP2_ERR_STREAM_DATA_BLOCKED:
193-
Debug(stream, "Stream data blocked");
194-
session()->StreamDataBlocked(stream->GetID());
195-
return true;
196-
case NGTCP2_ERR_STREAM_SHUT_WR:
197-
Debug(stream, "Stream writable side is closed");
198-
return true;
199-
case NGTCP2_ERR_STREAM_NOT_FOUND:
200-
Debug(stream, "Stream does not exist");
201-
return true;
202-
case NGTCP2_ERR_WRITE_STREAM_MORE:
203-
if (ndatalen > 0) {
204-
remaining -= ndatalen;
205-
Debug(stream,
206-
"%" PRIu64 " stream bytes serialized into packet. %d remaining",
207-
ndatalen,
208-
remaining);
209-
Consume(&v, &c, ndatalen);
210-
stream->Commit(ndatalen);
211-
packet_offset += ndatalen;
212-
}
213-
continue;
214-
default:
215-
Debug(stream, "Error writing packet. Code %" PRIu64, nwrite);
216-
session()->set_last_error(
217-
QUIC_ERROR_SESSION,
218-
static_cast<int>(nwrite));
219-
return false;
220-
}
221-
}
222-
223-
if (ndatalen > 0) {
224-
remaining -= ndatalen;
225-
Debug(stream,
226-
"%" PRIu64 " stream bytes serialized into packet. %d remaining",
227-
ndatalen,
228-
remaining);
229-
Consume(&v, &c, ndatalen);
230-
stream->Commit(ndatalen);
231-
}
232-
233-
Debug(stream, "Sending %" PRIu64 " bytes in serialized packet", nwrite);
234-
packet->set_length(nwrite);
235-
if (!session()->SendPacket(std::move(packet), path))
236-
return false;
237-
238-
packet.reset();
239-
packet_offset = 0;
240-
241-
if (IsEmpty(v, c)) {
242-
// fin will have been set if all of the data has been
243-
// encoded in the packet and is_writable() returns false.
244-
if (!stream->is_writable()) {
245-
Debug(stream, "Final stream has been sent");
246-
stream->set_fin_sent();
247-
}
248-
break;
249-
}
250-
}
251-
252-
return true;
155+
bool DefaultApplication::ShouldSetFin(const StreamData& stream_data) {
156+
if (!stream_data.stream ||
157+
!IsEmpty(stream_data.buf, stream_data.count))
158+
return false;
159+
return !stream_data.stream->is_writable();
253160
}
254161

255162
} // namespace quic

src/quic/node_quic_default_application.h

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33

44
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
55

6+
#include "node_quic_stream.h"
67
#include "node_quic_session.h"
78
#include "node_quic_util.h"
9+
#include "util.h"
810
#include "v8.h"
911

1012
namespace node {
@@ -28,17 +30,23 @@ class DefaultApplication final : public QuicApplication {
2830
const uint8_t* data,
2931
size_t datalen,
3032
uint64_t offset) override;
31-
void AcknowledgeStreamData(
32-
int64_t stream_id,
33-
uint64_t offset,
34-
size_t datalen) override;
3533

36-
bool SendPendingData() override;
37-
bool SendStreamData(QuicStream* stream) override;
34+
int GetStreamData(StreamData* stream_data) override;
35+
36+
void ResumeStream(int64_t stream_id) override;
37+
void StreamClose(int64_t stream_id, uint64_t app_error_code) override;
38+
bool ShouldSetFin(const StreamData& stream_data) override;
39+
bool StreamCommit(StreamData* stream_data, size_t datalen) override;
3840

3941
SET_SELF_SIZE(DefaultApplication)
4042
SET_MEMORY_INFO_NAME(DefaultApplication)
4143
SET_NO_MEMORY_INFO()
44+
45+
private:
46+
void ScheduleStream(int64_t stream_id);
47+
void UnscheduleStream(int64_t stream_id);
48+
49+
QuicStream::Queue stream_queue_;
4250
};
4351

4452
} // namespace quic

0 commit comments

Comments
 (0)