Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@ public void onFillable()
processFrames(null);
}

@Override
public void onFillInterestedFailed(Throwable cause)
{
long error = HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code();
parser.getListener().onStreamFailure(getEndPoint().getStream().getId(), error, cause);
}

private void processFrames(ParseResult result)
{
if (LOG.isDebugEnabled())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ void onFailure(Throwable failure)
fillInterest = null;
}
if (callback != null)
callback.succeeded();
callback.failed(failure);
else
protocolSession.onStreamFailure(stream.getId(), failure);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,8 +377,7 @@ public void offerTask(Runnable task)
producer.offer(task);
// Tasks may be offered when the production is idle, due to no
// network traffic and with the DatagramChannel read interested.
// Call dispatch() to avoid blocking the caller.
strategy.dispatch();
strategy.produce();
}

boolean isFinished(QuicheStream stream)
Expand Down Expand Up @@ -598,42 +597,29 @@ public Runnable produce()
if (task != null)
return task;

while (true)
List<Long> writable = quiche.writableStreamIds();
if (LOG.isDebugEnabled())
LOG.debug("writable stream ids: {} on {}", writable, QuicheSession.this);
// Unfortunately, Quiche does not mark a stream as writable if it
// has a pending write and received a STOP_SENDING, so we try to
// complete the pending write by failing it if the conditions apply.
for (QuicheStream stream : streams.values())
{
List<Long> writable = quiche.writableStreamIds();
if (LOG.isDebugEnabled())
LOG.debug("writable stream ids: {} on {}", writable, QuicheSession.this);
// Unfortunately, Quiche does not mark a stream as writable if it
// has a pending write and received a STOP_SENDING, so we try to
// complete the pending write by failing it if the conditions apply.
for (QuicheStream stream : streams.values())
{
if (writable.contains(stream.getId()))
stream.resumeWrite();
else
stream.tryFailWrite();
}

List<Long> readable = quiche.readableStreamIds();
if (LOG.isDebugEnabled())
LOG.debug("readable stream ids: {} on {}", readable, QuicheSession.this);
// Unfortunately, Quiche does not mark a stream as readable
// if it received a RESET_STREAM, so try to figure out whether
// there is an existing stream that has been reset.
boolean process = false;
for (QuicheStream stream : streams.values())
{
boolean removed = readable.remove(stream.getId());
process |= stream.readable(removed);
}
for (long streamId : readable)
{
QuicheStream stream = createRemoteStream(streamId);
process |= stream.readable(true);
}
if (writable.contains(stream.getId()))
stream.resumeWrite();
else
stream.tryFailWrite();
}

if (!process)
break;
List<Long> readable = quiche.readableStreamIds();
if (LOG.isDebugEnabled())
LOG.debug("readable stream ids: {} on {}", readable, QuicheSession.this);
for (Long streamId : readable)
{
QuicheStream stream = streams.get(streamId);
if (stream == null)
stream = createRemoteStream(streamId);
stream.readable();
}

task = poll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,33 +90,25 @@ public QuicheSession getSession()
return session;
}

boolean readable(boolean notify)
void readable()
{
boolean hasDemand;
try (AutoLock ignored = lock.lock())
{
hasDemand = dataDemand;
if (notify)
dataDemand = false;
dataDemand = false;
}

boolean streamFinished = session.isFinished(this);
Throwable resetFailure = streamFinished ? isReset() : null;

if (LOG.isDebugEnabled())
LOG.debug("readable demand={} {} on {}", hasDemand, this, session);
LOG.debug("readable demand={} finished={} reset={} {} on {}", hasDemand, streamFinished, resetFailure != null, this, session);

if (hasDemand && notify)
{
if (hasDemand && resetFailure == null)
notifyDataAvailable();
return true;
}

// Even if there is no demand, we want to know if the peer sent a RESET_STREAM.
if (session.isFinished(this))
{
Throwable failure = isReset();
if (failure != null)
notifyFailure(failure);
}
return false;
else if (resetFailure != null)
notifyFailure(resetFailure);
}

private Throwable isReset()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,8 @@ public Runnable onContentAvailable()
onContent = _onContentAvailable;
_onContentAvailable = null;
}
if (LOG.isDebugEnabled())
LOG.debug("onContentAvailable {} {}", onContent, this);
return _readInvoker.offer(onContent);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.Callback;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

Expand All @@ -46,6 +45,7 @@
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand All @@ -55,9 +55,6 @@ public class AsyncRequestContentTest extends AbstractTest
@MethodSource("transports")
public void testEarlyEofWithDemand(TransportType transport) throws Exception
{
// TODO #13131
Assumptions.assumeTrue(transport != TransportType.H3_QUICHE);

CountDownLatch serverHandleLatch = new CountDownLatch(1);
List<Content.Chunk> chunks = new CopyOnWriteArrayList<>();
AtomicReference<Throwable> failureRef = new AtomicReference<>();
Expand All @@ -68,17 +65,17 @@ public boolean handle(Request request, Response response, Callback callback) thr
{
request.addFailureListener(x -> failureRef.compareAndExchange(null, x).addSuppressed(x));

Content.Chunk chunk = request.read();
assertNull(chunk);

request.demand(new Runnable()
{
@Override
public void run()
{
Content.Chunk read = request.read();
if (read == null)
{
request.demand(this);
return;
}
read = Content.Chunk.from(new AssertionError("Unexpected call to demand callback"));

chunks.add(read);

Expand All @@ -92,6 +89,7 @@ public void run()
request.demand(this);
}
});

serverHandleLatch.countDown();

return true;
Expand Down