Skip to content

Commit a6bd8c8

Browse files
afrindmeta-codesync[bot]
authored andcommitted
Use PriorityQueue in WtStreamManager
Summary: 1) Implement generic prioritization interface, removing FATAL in setPriority 2) Hold connFc blocked streams outside the writeable queue until more f/c granted 3) Leave fin-only streams in the write queue regardless of stream/conn f/c It's unfortunate that this costs one more map lookup per egress. Maybe someday PriorityQueue will allow adjunct data to be stored in the queue. The expected data egress patten will be: ``` while (!queue.empty()) { auto id = queue.peekNext(); if (id.isStreamId()) { auto wh = wtSM.getEgressHandle(); if (!wh) { queue.erase(id); continue; } auto x = wtSM.dequeue(wh, max); if (x.data || x.fin) { // write } // else this stream got blocked } else if (id.isDatagramFlowId()) { // handle sending datagrams } } ``` when we run out of conn flow control, dequeue will move blocked streams out of the write queue. onMaxData(conn) will add them back to the write queue. Reviewed By: hanidamlaj Differential Revision: D88819681 fbshipit-source-id: f013dc3d5b176510f95aa06e96a1b3984ce6acf5
1 parent e6e43a6 commit a6bd8c8

5 files changed

Lines changed: 378 additions & 119 deletions

File tree

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
#pragma once
10+
11+
#include <optional>
12+
#include <quic/priority/PriorityQueue.h>
13+
14+
namespace proxygen::coro::detail {
15+
16+
/**
17+
* A thin wrapper around quic::PriorityQueue for managing stream egress.
18+
*
19+
* This class:
20+
* - Takes uint64_t streamId instead of quic::PriorityQueue::Identifier
21+
* - Tracks the count of streams in the queue
22+
* - Provides a simpler interface for stream-only operations
23+
*
24+
* The underlying PriorityQueue may be shared with other users (e.g.,
25+
* datagrams), so this wrapper only tracks streams inserted through its own
26+
* interface.
27+
*/
28+
class StreamPriorityQueue {
29+
public:
30+
using Priority = quic::PriorityQueue::Priority;
31+
32+
explicit StreamPriorityQueue(quic::PriorityQueue& queue) noexcept
33+
: queue_(queue) {
34+
}
35+
36+
/**
37+
* Insert a stream into the queue or update its priority if already present.
38+
* Increments the stream count if the stream was not already in the queue.
39+
*/
40+
void insert(uint64_t streamId, Priority priority) {
41+
auto id = quic::PriorityQueue::Identifier::fromStreamID(streamId);
42+
if (!queue_.contains(id)) {
43+
count_++;
44+
}
45+
queue_.insertOrUpdate(id, priority);
46+
}
47+
48+
/**
49+
* Update the priority of a stream if it exists in the queue.
50+
* Does not change the stream count.
51+
*/
52+
void update(uint64_t streamId, Priority priority) {
53+
auto id = quic::PriorityQueue::Identifier::fromStreamID(streamId);
54+
queue_.updateIfExist(id, priority);
55+
}
56+
57+
/**
58+
* Remove a stream from the queue.
59+
* Decrements the stream count if the stream was in the queue.
60+
*/
61+
void erase(uint64_t streamId) {
62+
auto id = quic::PriorityQueue::Identifier::fromStreamID(streamId);
63+
if (queue_.contains(id)) {
64+
count_--;
65+
}
66+
queue_.erase(id);
67+
}
68+
69+
/**
70+
* Consume bytes for fairness accounting.
71+
* Pass-through to the underlying queue.
72+
*/
73+
void consume(uint64_t bytes) {
74+
queue_.consume(bytes);
75+
}
76+
77+
/**
78+
* Returns the number of streams currently in the queue.
79+
*/
80+
[[nodiscard]] uint64_t count() const noexcept {
81+
return count_;
82+
}
83+
84+
/**
85+
* Returns true if there are streams in the queue.
86+
*/
87+
[[nodiscard]] bool hasStreams() const noexcept {
88+
return count_ > 0;
89+
}
90+
91+
/**
92+
* Peek at the next scheduled stream ID without modifying state.
93+
* Returns std::nullopt if the queue is empty or if the head of the queue
94+
* is not a stream (e.g., a datagram).
95+
*/
96+
[[nodiscard]] std::optional<uint64_t> peek() const noexcept {
97+
if (queue_.empty()) {
98+
return std::nullopt;
99+
}
100+
auto id = queue_.peekNextScheduledID();
101+
if (!id.isStreamID()) {
102+
return std::nullopt;
103+
}
104+
return id.asStreamID();
105+
}
106+
107+
private:
108+
quic::PriorityQueue& queue_;
109+
uint64_t count_{0};
110+
};
111+
112+
} // namespace proxygen::coro::detail

proxygen/lib/http/webtransport/WtEgressContainer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ WtBufferedStreamData::DequeueResult WtBufferedStreamData::dequeue(
2727
window_.getAvailable(),
2828
static_cast<uint64_t>(data_.chainLength())});
2929
DequeueResult res;
30-
res.data = data_.splitAtMost(atMost);
30+
res.data = atMost == 0 ? nullptr : data_.splitAtMost(atMost);
3131
res.fin = data_.empty() && std::exchange(fin_, false);
3232
window_.commit(atMost);
3333
return res;

proxygen/lib/http/webtransport/WtStreamManager.cpp

Lines changed: 66 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,9 @@ struct WtStreamManager::Accessor {
110110
auto& writableStreams() {
111111
return sm_.writableStreams_;
112112
}
113+
auto& connFcBlockedStreams() {
114+
return sm_.connFcBlockedStreams_;
115+
}
113116
WtStreamManager& sm_;
114117
};
115118

@@ -191,6 +194,7 @@ struct WriteHandle : public WebTransport::StreamWriteHandle {
191194

192195
Accessor smAccessor_;
193196
WtBufferedStreamData bufferedSendData_;
197+
quic::PriorityQueue::Priority priority_; // Stream priority
194198
uint64_t err{kInvalidVarint};
195199
WritePromise promise_{emptyWritePromise()};
196200
HandleState state_;
@@ -329,10 +333,12 @@ uint64_t& WtStreamManager::MaxStreamsContainer::getMaxStreams(
329333
WtStreamManager::WtStreamManager(WtDir dir,
330334
const WtConfig& config,
331335
EgressCallback& egressCb,
332-
IngressCallback& ingressCb) noexcept
336+
IngressCallback& ingressCb,
337+
quic::PriorityQueue& priorityQueue) noexcept
333338
: dir_(dir),
334339
nextStreamIds_(nextStreamIds(dir)),
335340
maxStreams_(maxStreams(dir, config)),
341+
writableStreams_(priorityQueue),
336342
wtConfig_(config),
337343
connRecvFc_(config.selfMaxConnData),
338344
connSendFc_(config.peerMaxConnData),
@@ -507,8 +513,21 @@ WtStreamManager::Result WtStreamManager::onMaxData(MaxConnData data) noexcept {
507513
if (!connSendFc_.grant(data.maxData)) {
508514
return Fail;
509515
}
516+
510517
bool wasEmpty = !hasEvent();
511-
if (!wasEmpty && hasEvent()) {
518+
// Re-add all connection-FC-blocked streams to the priority queue
519+
auto blockedStreams = std::move(connFcBlockedStreams_);
520+
for (auto* wh : blockedStreams) {
521+
auto& writeHandle = writehandle_ref_cast(*wh);
522+
if (writeHandle.bufferedSendData_.canSendData()) {
523+
writableStreams_.insert(writeHandle.getID(), writeHandle.priority_);
524+
} else {
525+
XLOG(ERR) << "Stream " << writeHandle.getID()
526+
<< " in connFcBlockedStreams_ but !canSendData";
527+
}
528+
}
529+
530+
if (wasEmpty && hasEvent()) {
512531
egressCb_.eventsAvailable();
513532
}
514533
return Ok;
@@ -566,31 +585,35 @@ StreamData WtStreamManager::dequeue(WtWriteHandle& wh,
566585
uint64_t atMost) noexcept {
567586
// we're limited by conn egress fc
568587
atMost = std::min(atMost, connSendFc_.getAvailable());
569-
auto res = writehandle_ref_cast(wh).dequeue(atMost);
588+
auto& writeHandle = writehandle_ref_cast(wh);
589+
auto res = writeHandle.dequeue(atMost);
570590
// TODO(@damlaj): return len to elide unnecessarily computing chain len
571591
auto len = computeChainLength(res.data);
572592
// commit len bytes to conn window
573593
connSendFc_.commit(len);
594+
595+
// Connection FC blocked if we have data but conn window is exhausted
596+
if (connSendFc_.getAvailable() == 0 &&
597+
writeHandle.bufferedSendData_.hasData()) {
598+
connFcBlockedStreams_.insert(&wh);
599+
}
600+
574601
XLOG(DBG8) << __func__ << "; atMost=" << atMost << "; len=" << len
575602
<< "; fin=" << res.fin;
576603
return res;
577604
}
578605

579-
WtStreamManager::WtWriteHandle* WtStreamManager::nextWritable() const noexcept {
580-
WriteHandle* wh = !writableStreams_.empty()
581-
? writehandle_ptr_cast(*writableStreams_.begin())
582-
: nullptr;
583-
// streams with only a pending fin should be yielded even if connection-level
584-
// flow control window is blocked
585-
return (wh && (connSendFc_.getAvailable() > 0 ||
586-
wh->bufferedSendData_.onlyFinPending()))
587-
? wh
588-
: nullptr;
589-
}
590-
591606
void WtStreamManager::onStreamWritable(WtWriteHandle& wh) noexcept {
607+
// Don't re-insert if already tracked as conn FC blocked
608+
if (connFcBlockedStreams_.count(&wh) > 0) {
609+
return;
610+
}
611+
592612
bool wasEmpty = !hasEvent();
593-
writableStreams_.insert(&wh);
613+
614+
auto& writeHandle = writehandle_ref_cast(wh);
615+
writableStreams_.insert(writeHandle.getID(), writeHandle.priority_);
616+
594617
if (wasEmpty && hasEvent()) {
595618
egressCb_.eventsAvailable();
596619
}
@@ -623,29 +646,8 @@ bool WtStreamManager::canCreateBidi() const noexcept {
623646
return !streamLimitExceeded(nextStreamIds_.bidi);
624647
}
625648

626-
/**
627-
* Even if wt connection is flow-control blocked, a stream with only a fin
628-
* pending should be yielded from ::nextWritable. We insert streams with only a
629-
* pending fin first in the set for ::nextWritable to check such cases in O(1)
630-
* time.
631-
*/
632-
bool WtStreamManager::Compare::operator()(const WtWriteHandle* l,
633-
const WtWriteHandle* r) const {
634-
const auto* lWh = static_cast<const WriteHandle*>(l);
635-
const auto* rWh = static_cast<const WriteHandle*>(r);
636-
// safe to cast to int64_t as getID() is limited by kMaxVarint
637-
int64_t lId = lWh->getID();
638-
int64_t rId = rWh->getID();
639-
// set highest order bit to ensure id is negative for onlyFinPending streams
640-
// (i.e. always less than non-onlyFinPending streams) while maintaining stream
641-
// id priority
642-
lId |= int64_t(lWh->bufferedSendData_.onlyFinPending()) << 63;
643-
rId |= int64_t(rWh->bufferedSendData_.onlyFinPending()) << 63;
644-
return lId < rId;
645-
}
646-
647649
bool WtStreamManager::hasEvent() const noexcept {
648-
return nextWritable() != nullptr || !ctrlEvents_.empty();
650+
return writableStreams_.hasStreams() || !ctrlEvents_.empty();
649651
}
650652

651653
uint64_t WtStreamManager::initStreamRecvFc(uint64_t streamId) const noexcept {
@@ -697,6 +699,11 @@ WtStreamManager::WtReadHandle* WtStreamManager::getIngressHandle(
697699
return getBidiHandle(streamId).readHandle;
698700
}
699701

702+
WtStreamManager::WtWriteHandle* WtStreamManager::nextWritable() const noexcept {
703+
auto streamId = writableStreams_.peek();
704+
return streamId ? getEgressHandle(*streamId) : nullptr;
705+
}
706+
700707
} // namespace proxygen::coro::detail
701708

702709
/**
@@ -839,7 +846,12 @@ folly::Expected<folly::Unit, WriteHandle::ErrCode> WriteHandle::resetStream(
839846

840847
folly::Expected<folly::Unit, WriteHandle::ErrCode> WriteHandle::setPriority(
841848
quic::PriorityQueue::Priority priority) {
842-
XLOG(FATAL) << "not implemented";
849+
XCHECK_NE(state_, Closed) << "setPriority after close";
850+
851+
priority_ = priority;
852+
smAccessor_.writableStreams().update(getID(), priority_);
853+
854+
return folly::unit;
843855
}
844856

845857
Result WriteHandle::onMaxData(uint64_t offset) {
@@ -871,16 +883,26 @@ WritePromise WriteHandle::resetPromise() noexcept {
871883
// TODO(@damlaj): StreamData and DequeueResult should be the same struct
872884
StreamData WriteHandle::dequeue(uint64_t atMost) noexcept {
873885
XCHECK_NE(state_, Closed) << "dequeue after close";
886+
874887
auto res = bufferedSendData_.dequeue(atMost);
875888
const auto bufferAvailable = bufferedSendData_.window().getBufferAvailable();
889+
876890
if (bufferAvailable > 0) {
877891
if (auto p = resetPromise(); p.valid()) {
878892
p.setValue(bufferAvailable);
879893
}
880894
}
881-
if (!bufferedSendData_.canSendData()) {
882-
smAccessor_.writableStreams().erase(this);
895+
896+
auto bytesDequeued = computeChainLength(res.data);
897+
898+
// Erase if blocked (wrote nothing) or done (!canSendData)
899+
// Consume if wrote data and still have more
900+
if (bytesDequeued > 0 && bufferedSendData_.canSendData()) {
901+
smAccessor_.writableStreams().consume(bytesDequeued);
902+
} else {
903+
smAccessor_.writableStreams().erase(getID());
883904
}
905+
884906
finish(res.fin);
885907
return StreamData{std::move(res.data), res.fin};
886908
}
@@ -892,7 +914,9 @@ void WriteHandle::cancel(folly::exception_wrapper ex) noexcept {
892914
if (auto p = resetPromise(); p.valid()) {
893915
p.setException(ex_);
894916
}
895-
smAccessor_.writableStreams().erase(this);
917+
smAccessor_.writableStreams().erase(getID());
918+
// Also remove from conn FC blocked set if present
919+
smAccessor_.connFcBlockedStreams().erase(this);
896920
cs_.requestCancellation();
897921
// **beware finish must be last** (`this` can be deleted immediately after)
898922
finish(/*done=*/true);

0 commit comments

Comments
 (0)