Reworked HTTP/3 stream and demand implementation so that it now leaves in HTTP3Stream, rather than in HTTP3StreamConnection.

This was necessary to avoid NPE when e.g. calling demand() on a stream that has already been removed from the session.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2022-08-31 17:33:20 +02:00
parent 854d5cdf6a
commit fe34e5390c
14 changed files with 334 additions and 355 deletions

View File

@ -93,6 +93,11 @@ public class HttpReceiverOverHTTP3 extends HttpReceiver implements Stream.Client
{
if (LOG.isDebugEnabled())
LOG.debug("stalling response processing, no demand after headers on {}", this);
// TODO: If frame.isLast(), calling demand() on it will kinda of make little sense.
// we should go back to notifySuccessRef being set before this block (to avoid the race)
// and in receive() query it to call notifySuccess().
}
}
}

View File

@ -214,9 +214,4 @@ public class ClientHTTP3Session extends ClientProtocolSession
if (messageFlusher.offer(endPoint, frame, callback))
messageFlusher.iterate();
}
public void onDataAvailable(long streamId)
{
session.onDataAvailable(streamId);
}
}

View File

@ -19,17 +19,8 @@ import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
public class ClientHTTP3StreamConnection extends HTTP3StreamConnection
{
private final ClientHTTP3Session session;
public ClientHTTP3StreamConnection(QuicStreamEndPoint endPoint, ClientHTTP3Session session, MessageParser parser)
{
super(endPoint, session.getQuicSession().getExecutor(), session.getQuicSession().getByteBufferPool(), parser);
this.session = session;
}
@Override
protected void onDataAvailable(long streamId)
{
session.onDataAvailable(streamId);
}
}

View File

@ -59,7 +59,7 @@ public class HTTP3StreamClient extends HTTP3Stream implements Stream.Client
valid = validateAndUpdate(EnumSet.of(FrameState.INITIAL, FrameState.INFORMATIONAL), FrameState.HEADER);
if (valid)
{
notIdle();
onHeaders(frame);
notifyResponse(frame);
updateClose(frame.isLast(), false);
}

View File

@ -298,6 +298,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
if (failure == null)
{
HTTP3Stream stream = newHTTP3Stream(endPoint, local);
((HTTP3StreamConnection)endPoint.getConnection()).setStream(stream);
long idleTimeout = getStreamIdleTimeout();
if (idleTimeout > 0)
stream.setIdleTimeout(idleTimeout);
@ -465,14 +466,6 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
onSessionFailure(HTTP3ErrorCode.FRAME_UNEXPECTED_ERROR.code(), "invalid_frame_sequence", new IllegalStateException("invalid frame sequence"));
}
public void onDataAvailable(long streamId)
{
HTTP3Stream stream = getStream(streamId);
if (LOG.isDebugEnabled())
LOG.debug("notifying data available on {}", stream);
stream.onDataAvailable();
}
@Override
public void onGoAway(GoAwayFrame frame)
{

View File

@ -17,6 +17,7 @@ import java.util.EnumSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.DataFrame;
@ -28,6 +29,7 @@ import org.eclipse.jetty.util.Attachable;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.NanoTime;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.Invocable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -36,6 +38,8 @@ public abstract class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, A
{
private static final Logger LOG = LoggerFactory.getLogger(HTTP3Stream.class);
private final AutoLock lock = new AutoLock();
private final AtomicReference<Data> dataRef = new AtomicReference<>();
private final HTTP3Session session;
private final QuicStreamEndPoint endPoint;
private final boolean local;
@ -44,6 +48,10 @@ public abstract class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, A
private long idleTimeout;
private long expireNanoTime;
private Object attachment;
private boolean dataDemand;
private boolean dataStalled;
private boolean dataLast;
private boolean dataAvailable;
public HTTP3Stream(HTTP3Session session, QuicStreamEndPoint endPoint, boolean local)
{
@ -131,6 +139,9 @@ public abstract class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, A
protected CompletableFuture<Stream> write(Frame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("writing {} on {}", frame, this);
return writeFrame(frame)
.whenComplete((s, x) ->
{
@ -146,10 +157,27 @@ public abstract class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, A
{
try
{
HTTP3StreamConnection connection = (HTTP3StreamConnection)endPoint.getConnection();
Data data = connection.readData();
if (data != null)
updateClose(data.isLast(), false);
if (LOG.isDebugEnabled())
LOG.debug("reading data on {}", this);
Data data;
if (dataLast)
{
data = Stream.Data.EOF;
}
else
{
data = read();
if (data == null)
{
HTTP3StreamConnection connection = (HTTP3StreamConnection)endPoint.getConnection();
connection.receive();
data = read();
}
}
if (LOG.isDebugEnabled())
LOG.debug("read {} on {}", data, this);
return data;
}
catch (Throwable x)
@ -162,11 +190,81 @@ public abstract class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, A
}
}
private Data read()
{
Data data = dataRef.getAndSet(null);
if (data != null)
{
boolean last = data.isLast();
dataLast = last;
updateClose(last, false);
}
else
{
dataAvailable = false;
}
if (LOG.isDebugEnabled())
LOG.debug("reading available data {} on {}", data, this);
return data;
}
@Override
public void demand()
{
HTTP3StreamConnection connection = (HTTP3StreamConnection)endPoint.getConnection();
connection.demand();
boolean hasData;
boolean process = false;
try (AutoLock ignored = lock.lock())
{
dataDemand = true;
hasData = dataAvailable;
if (dataStalled && hasData || dataLast)
{
dataStalled = false;
process = true;
}
}
if (LOG.isDebugEnabled())
LOG.debug("demand, wasStalled={} dataAvailable={} on {}", process, hasData, this);
if (process)
{
processData();
}
else if (!hasData)
{
HTTP3StreamConnection connection = (HTTP3StreamConnection)endPoint.getConnection();
connection.fillInterested();
}
}
private void processData()
{
while (true)
{
boolean notify = true;
try (AutoLock ignored = lock.lock())
{
if (LOG.isDebugEnabled())
LOG.debug("processing demand={}, last={} on {}", dataDemand, dataLast, this);
if (dataDemand)
{
// Do not notify if there is demand but no data.
if (!dataAvailable)
notify = false;
else
dataDemand = false;
}
else
{
dataStalled = true;
notify = false;
}
}
if (!notify)
break;
onDataAvailable();
}
}
@Override
@ -179,8 +277,29 @@ public abstract class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, A
public boolean hasDemand()
{
HTTP3StreamConnection connection = (HTTP3StreamConnection)endPoint.getConnection();
return connection.hasDemand();
try (AutoLock ignored = lock.lock())
{
return dataDemand;
}
}
public void onHeaders(HeadersFrame frame)
{
notIdle();
try (AutoLock ignored = lock.lock())
{
dataLast = frame.isLast();
dataAvailable = true;
}
}
protected boolean hasDemandOrStall()
{
try (AutoLock ignored = lock.lock())
{
dataStalled = !dataDemand;
return dataDemand;
}
}
public void onData(DataFrame frame)
@ -189,11 +308,31 @@ public abstract class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, A
notIdle();
}
public void onDataAvailable()
private void onDataAvailable()
{
if (LOG.isDebugEnabled())
LOG.debug("notifying data available on {}", this);
notifyDataAvailable();
}
public void onData(Data data)
{
if (!dataRef.compareAndSet(null, data))
throw new IllegalStateException();
boolean process;
try (AutoLock ignored = lock.lock())
{
if (LOG.isDebugEnabled())
LOG.debug("onData demand={}, last={} {} on {}", dataDemand, dataLast, data, this);
dataAvailable = true;
process = dataDemand;
}
if (process)
processData();
}
protected abstract void notifyDataAvailable();
public void onTrailer(HeadersFrame frame)
@ -256,37 +395,30 @@ public abstract class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, A
{
switch (closeState)
{
case NOT_CLOSED:
case NOT_CLOSED ->
{
closeState = local ? CloseState.LOCALLY_CLOSED : CloseState.REMOTELY_CLOSED;
break;
}
case LOCALLY_CLOSED:
case LOCALLY_CLOSED ->
{
if (!local)
{
closeState = CloseState.CLOSED;
session.removeStream(this, null);
}
break;
}
case REMOTELY_CLOSED:
case REMOTELY_CLOSED ->
{
if (local)
{
closeState = CloseState.CLOSED;
session.removeStream(this, null);
}
break;
}
case CLOSED:
case CLOSED ->
{
break;
}
default:
{
throw new IllegalStateException();
}
default -> throw new IllegalStateException();
}
}
}

View File

@ -17,6 +17,7 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.MetaData;
@ -30,7 +31,6 @@ import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -40,18 +40,13 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
// An empty DATA frame is the sequence of bytes [0x0, 0x0].
private static final ByteBuffer EMPTY_DATA_FRAME = ByteBuffer.allocate(2);
private final AutoLock lock = new AutoLock();
private final AtomicReference<Runnable> action = new AtomicReference<>();
private final RetainableByteBufferPool buffers;
private final MessageParser parser;
private boolean useInputDirectByteBuffers = true;
private RetainableByteBuffer buffer;
private HTTP3Stream stream;
private RetainableByteBuffer networkBuffer;
private boolean applicationMode;
private boolean parserDataMode;
private boolean dataDemand;
private boolean dataStalled;
private DataFrame dataFrame;
private boolean dataLast;
private boolean hasNetworkData;
private boolean remotelyClosed;
public HTTP3StreamConnection(QuicStreamEndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, MessageParser parser)
@ -78,6 +73,11 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
this.useInputDirectByteBuffers = useInputDirectByteBuffers;
}
void setStream(HTTP3Stream stream)
{
this.stream = stream;
}
public void setApplicationMode(boolean mode)
{
this.applicationMode = mode;
@ -101,74 +101,40 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
public void onFillable()
{
if (LOG.isDebugEnabled())
LOG.debug("processing parserDataMode={} on {}", parserDataMode, this);
if (parserDataMode)
processDataFrames();
else
processNonDataFrames();
}
private void processDataFrames()
{
processDataDemand();
if (!parserDataMode)
LOG.debug("onFillable dataMode={} on {}", parser.isDataMode(), this);
if (parser.isDataMode())
{
if (buffer != null && buffer.hasRemaining())
processNonDataFrames();
else
fillInterested();
// If there are not enough bytes to parse a DATA frame and call
// the application (so that it can drive), set fill interest.
processDataFrames(true);
}
else
{
processNonDataFrames();
}
}
private void processNonDataFrames()
private void processDataFrames(boolean setFillInterest)
{
try
{
tryAcquireBuffer();
while (true)
MessageParser.Result result = parseAndFill(setFillInterest);
switch (result)
{
if (parseAndFill(true) == MessageParser.Result.NO_FRAME)
case NO_FRAME -> tryReleaseBuffer(false);
case MODE_SWITCH ->
{
tryReleaseBuffer(false);
return;
parser.setDataMode(false);
processNonDataFrames();
}
// TODO: we should also exit if the connection was closed due to errors.
// There is not yet a isClosed() primitive though.
if (remotelyClosed)
case FRAME ->
{
// We have detected the end of the stream,
// do not loop around to fill & parse again.
// However, the last frame may have
// caused a write that we need to flush.
getEndPoint().getQuicSession().flush();
action.getAndSet(null).run();
// Release the network buffer here (if empty), since the application may
// not be reading more bytes, to avoid to keep around a consumed buffer.
tryReleaseBuffer(false);
return;
}
if (parserDataMode)
{
if (buffer.hasRemaining())
{
processDataFrames();
}
else
{
if (applicationMode)
{
if (LOG.isDebugEnabled())
LOG.debug("skipping fill interest on {}", this);
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("setting fill interest on {}", this);
fillInterested();
}
}
tryReleaseBuffer(false);
return;
}
}
}
@ -182,181 +148,115 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
}
}
protected abstract void onDataAvailable(long streamId);
public Stream.Data readData()
private void processNonDataFrames()
{
try
{
if (LOG.isDebugEnabled())
LOG.debug("reading data on {}", this);
if (dataLast)
return Stream.Data.EOF;
tryAcquireBuffer();
return switch (parseAndFill(false))
while (true)
{
case FRAME ->
MessageParser.Result result = parseAndFill(true);
switch (result)
{
if (parserDataMode)
case NO_FRAME ->
{
DataFrame frame = dataFrame;
dataFrame = null;
if (LOG.isDebugEnabled())
LOG.debug("read data {} on {}", frame, this);
dataLast = frame.isLast();
buffer.retain();
StreamData data = new StreamData(frame, buffer);
// Release the network buffer here (if empty), since the application may
// not be reading more bytes, to avoid to keep around a consumed buffer.
tryReleaseBuffer(false);
yield data;
return;
}
else
case MODE_SWITCH ->
{
// Not anymore in data mode, so it's a trailer frame.
tryReleaseBuffer(false);
yield null;
// MODE_SWITCH is only reported when parsing DATA frames.
throw new IllegalStateException();
}
case FRAME ->
{
action.getAndSet(null).run();
// TODO: we should also exit if the connection was closed due to errors.
// There is not yet a isClosed() primitive though.
if (remotelyClosed)
{
// We have detected the end of the stream,
// do not loop around to parse & fill again.
// However, the last frame may have
// caused a write that we need to flush.
getEndPoint().getQuicSession().flush();
tryReleaseBuffer(false);
return;
}
if (parser.isDataMode())
{
// TODO: handle applicationMode here?
if (stream.hasDemandOrStall())
{
if (networkBuffer != null && networkBuffer.hasRemaining())
{
// There are bytes left in the buffer; if there are not
// enough bytes to parse a DATA frame and call the
// application (so that it can drive), set fill interest.
processDataFrames(true);
}
else
{
// No bytes left in the buffer, but there is demand.
// Set fill interest to call the application when bytes arrive.
fillInterested();
}
}
// From now on it's the application that drives
// demand, reads, parse+fill and fill interest.
return;
}
// There might be a trailer, loop around.
}
default -> throw new IllegalStateException("unknown message parser result: " + result);
}
case MODE_SWITCH ->
{
if (LOG.isDebugEnabled())
LOG.debug("switching to parserDataMode=false on {}", this);
dataLast = true;
parserDataMode = false;
parser.setDataMode(false);
tryReleaseBuffer(false);
yield null;
}
case NO_FRAME ->
{
if (LOG.isDebugEnabled())
LOG.debug("read no data on {}", this);
tryReleaseBuffer(false);
yield null;
}
};
}
}
catch (Throwable x)
{
cancelDemand();
tryReleaseBuffer(true);
getEndPoint().close(HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code(), x);
// Rethrow so the application has a chance to handle it.
throw x;
long error = HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code();
getEndPoint().close(error, x);
// Notify the application that a failure happened.
parser.getListener().onStreamFailure(getEndPoint().getStreamId(), error, x);
}
}
public void demand()
public void receive()
{
boolean hasData;
boolean process = false;
try (AutoLock ignored = lock.lock())
{
hasData = hasNetworkData;
dataDemand = true;
if (dataStalled && hasData)
{
dataStalled = false;
process = true;
}
}
if (LOG.isDebugEnabled())
LOG.debug("demand, wasStalled={} hasData={} on {}", process, hasData, this);
if (process)
processDataFrames();
else if (!hasData)
fillInterested();
}
public boolean hasDemand()
{
try (AutoLock ignored = lock.lock())
{
return dataDemand;
}
}
private void cancelDemand()
{
try (AutoLock ignored = lock.lock())
{
dataDemand = false;
}
}
private boolean isStalled()
{
try (AutoLock ignored = lock.lock())
{
return dataStalled;
}
}
private void setHasNetworkData(boolean hasNetworkData)
{
try (AutoLock ignored = lock.lock())
{
this.hasNetworkData = hasNetworkData;
}
}
private void processDataDemand()
{
while (true)
{
boolean process = true;
try (AutoLock ignored = lock.lock())
{
if (LOG.isDebugEnabled())
LOG.debug("processing demand={}, last={} fillInterested={} on {}", dataDemand, dataLast, isFillInterested(), this);
if (dataDemand)
{
// Do not process if there is demand but no data.
if (isFillInterested())
process = false;
else
dataDemand = false;
}
else
{
dataStalled = true;
process = false;
}
}
if (!process)
return;
onDataAvailable(getEndPoint().getStreamId());
}
LOG.debug("receiving on {}", this);
processDataFrames(false);
}
private void tryAcquireBuffer()
{
if (buffer == null)
if (networkBuffer == null)
{
buffer = buffers.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
networkBuffer = buffers.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
if (LOG.isDebugEnabled())
LOG.debug("acquired {}", buffer);
LOG.debug("acquired {}", networkBuffer);
}
}
private void tryReleaseBuffer(boolean force)
{
if (buffer != null)
if (networkBuffer != null)
{
if (buffer.hasRemaining() && force)
buffer.clear();
if (!buffer.hasRemaining())
if (networkBuffer.hasRemaining() && force)
networkBuffer.clear();
if (!networkBuffer.hasRemaining())
{
buffer.release();
networkBuffer.release();
if (LOG.isDebugEnabled())
LOG.debug("released {}", buffer);
buffer = null;
LOG.debug("released {}", networkBuffer);
networkBuffer = null;
}
}
}
@ -366,33 +266,30 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
try
{
if (LOG.isDebugEnabled())
LOG.debug("parse+fill setFillInterest={} on {} with buffer {}", setFillInterest, this, buffer);
// Assume there is network data until proven otherwise.
setHasNetworkData(true);
LOG.debug("parse+fill setFillInterest={} on {} with buffer {}", setFillInterest, this, networkBuffer);
while (true)
{
ByteBuffer byteBuffer = buffer.getBuffer();
ByteBuffer byteBuffer = networkBuffer.getBuffer();
MessageParser.Result result = parser.parse(byteBuffer);
if (LOG.isDebugEnabled())
LOG.debug("parsed {} on {} with buffer {}", result, this, buffer);
LOG.debug("parsed {} on {} with buffer {}", result, this, networkBuffer);
if (result == MessageParser.Result.FRAME || result == MessageParser.Result.MODE_SWITCH)
return result;
if (buffer.isRetained())
if (networkBuffer.isRetained())
{
buffer.release();
networkBuffer.release();
RetainableByteBuffer newBuffer = buffers.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
if (LOG.isDebugEnabled())
LOG.debug("reacquired {} for retained {}", newBuffer, buffer);
buffer = newBuffer;
byteBuffer = buffer.getBuffer();
LOG.debug("reacquired {} for retained {}", newBuffer, networkBuffer);
networkBuffer = newBuffer;
byteBuffer = networkBuffer.getBuffer();
}
int filled = fill(byteBuffer);
if (LOG.isDebugEnabled())
LOG.debug("filled {} on {} with buffer {}", filled, this, buffer);
LOG.debug("filled {} on {} with buffer {}", filled, this, networkBuffer);
if (filled > 0)
continue;
@ -410,9 +307,6 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
return MessageParser.Result.FRAME;
}
// Remember that there is no network data so that
// we may call fillInterested() from demand().
setHasNetworkData(false);
if (setFillInterest)
fillInterested();
}
@ -443,7 +337,7 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
@Override
public String toConnectionString()
{
return String.format("%s[demand=%b,stalled=%b,parserDataMode=%b]", super.toConnectionString(), hasDemand(), isStalled(), parserDataMode);
return String.format("%s[dataMode=%b]", super.toConnectionString(), parser.isDataMode());
}
private static class StreamData extends Stream.Data
@ -479,14 +373,16 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
@Override
public void onHeaders(long streamId, HeadersFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("onHeaders #{} {} on {}", streamId, frame, this);
MetaData metaData = frame.getMetaData();
if (metaData.isRequest())
{
// Expect DATA frames now.
parserDataMode = true;
parser.setDataMode(true);
if (LOG.isDebugEnabled())
LOG.debug("switching to parserDataMode=true for request {} on {}", metaData, this);
LOG.debug("switching to dataMode=true for request {} on {}", metaData, this);
}
else if (metaData.isResponse())
{
@ -494,15 +390,14 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
if (HttpStatus.isInformational(response.getStatus()))
{
if (LOG.isDebugEnabled())
LOG.debug("staying in parserDataMode=false for response {} on {}", metaData, this);
LOG.debug("staying in dataMode=false for response {} on {}", metaData, this);
}
else
{
// Expect DATA frames now.
parserDataMode = true;
parser.setDataMode(true);
if (LOG.isDebugEnabled())
LOG.debug("switching to parserDataMode=true for response {} on {}", metaData, this);
LOG.debug("switching to dataMode=true for response {} on {}", metaData, this);
}
}
else
@ -511,20 +406,37 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
if (!frame.isLast())
frame = new HeadersFrame(metaData, true);
}
if (frame.isLast())
HeadersFrame headersFrame = frame;
if (headersFrame.isLast())
shutdownInput();
super.onHeaders(streamId, frame);
Runnable existing = action.getAndSet(() -> super.onHeaders(streamId, headersFrame));
if (existing != null)
throw new IllegalStateException("existing onHeaders action " + existing);
}
@Override
public void onData(long streamId, DataFrame frame)
{
if (dataFrame != null)
throw new IllegalStateException();
dataFrame = frame;
if (LOG.isDebugEnabled())
LOG.debug("onData #{} {} on {}", streamId, frame, this);
if (frame.isLast())
shutdownInput();
super.onData(streamId, frame);
networkBuffer.retain();
StreamData data = new StreamData(frame, networkBuffer);
Runnable existing = action.getAndSet(() ->
{
super.onData(streamId, frame);
if (LOG.isDebugEnabled())
LOG.debug("notifying {} on {}", data, stream);
stream.onData(data);
});
if (existing != null)
throw new IllegalStateException("existing onData action " + existing);
}
private void shutdownInput()

View File

@ -72,6 +72,11 @@ public class MessageParser
return listener;
}
public boolean isDataMode()
{
return dataMode;
}
public void setDataMode(boolean enable)
{
this.dataMode = enable;
@ -99,7 +104,7 @@ public class MessageParser
{
state = State.BODY;
// If we are in data mode, but we did not parse a DATA frame, bail out.
if (dataMode && headerParser.getFrameType() != FrameType.DATA.type())
if (isDataMode() && headerParser.getFrameType() != FrameType.DATA.type())
return Result.MODE_SWITCH;
}
else

View File

@ -45,7 +45,7 @@ public class HTTP3StreamServer extends HTTP3Stream implements Stream.Server
{
if (validateAndUpdate(EnumSet.of(FrameState.INITIAL), FrameState.HEADER))
{
notIdle();
onHeaders(frame);
Listener listener = this.listener = notifyRequest(frame);
if (listener == null)
{

View File

@ -206,9 +206,4 @@ public class ServerHTTP3Session extends ServerProtocolSession
if (messageFlusher.offer(endPoint, frame, callback))
messageFlusher.iterate();
}
public void onDataAvailable(long streamId)
{
session.onDataAvailable(streamId);
}
}

View File

@ -48,12 +48,6 @@ public class ServerHTTP3StreamConnection extends HTTP3StreamConnection implement
this.session = session;
}
@Override
protected void onDataAvailable(long streamId)
{
session.onDataAvailable(streamId);
}
public Runnable onRequest(HTTP3StreamServer stream, HeadersFrame frame)
{
HttpChannel httpChannel = httpChannelFactory.newHttpChannel(this);

View File

@ -317,34 +317,33 @@ public class ClientServerTest extends AbstractClientServerTest
ByteBuffer byteBuffer = ByteBuffer.wrap(bytesReceived);
CountDownLatch clientDataLatch = new CountDownLatch(1);
Stream stream = clientSession.newRequest(frame, new Stream.Client.Listener()
{
@Override
public void onResponse(Stream.Client stream, HeadersFrame frame)
{
@Override
public void onResponse(Stream.Client stream, HeadersFrame frame)
{
clientResponseLatch.countDown();
stream.demand();
}
clientResponseLatch.countDown();
stream.demand();
}
@Override
public void onDataAvailable(Stream.Client stream)
@Override
public void onDataAvailable(Stream.Client stream)
{
// Read data.
Stream.Data data = stream.readData();
if (data == null)
{
// Read data.
Stream.Data data = stream.readData();
if (data == null)
{
stream.demand();
return;
}
// Consume data.
byteBuffer.put(data.getByteBuffer());
data.release();
if (data.isLast())
clientDataLatch.countDown();
else
stream.demand();
stream.demand();
return;
}
})
.get(5, TimeUnit.SECONDS);
// Consume data.
byteBuffer.put(data.getByteBuffer());
data.release();
if (data.isLast())
clientDataLatch.countDown();
else
stream.demand();
}
}).get(5, TimeUnit.SECONDS);
stream.data(new DataFrame(ByteBuffer.wrap(bytesSent), true));
assertTrue(clientResponseLatch.await(5, TimeUnit.SECONDS));

View File

@ -238,9 +238,7 @@ public class DataDemandTest extends AbstractClientServerTest
@Test
public void testHeadersNoDataThenTrailers() throws Exception
{
CountDownLatch serverDataLatch = new CountDownLatch(1);
CountDownLatch serverTrailerLatch = new CountDownLatch(1);
AtomicLong onDataAvailableCalls = new AtomicLong();
start(new Session.Server.Listener()
{
@Override
@ -249,18 +247,6 @@ public class DataDemandTest extends AbstractClientServerTest
stream.demand();
return new Stream.Server.Listener()
{
@Override
public void onDataAvailable(Stream.Server stream)
{
onDataAvailableCalls.incrementAndGet();
// Must read to EOF to trigger fill+parse of the trailer.
Stream.Data data = stream.readData();
assertNull(data);
// It's typical to demand after null data.
stream.demand();
serverDataLatch.countDown();
}
@Override
public void onTrailer(Stream.Server stream, HeadersFrame frame)
{
@ -276,13 +262,7 @@ public class DataDemandTest extends AbstractClientServerTest
Stream stream = session.newRequest(request, new Stream.Client.Listener() {}).get(5, TimeUnit.SECONDS);
stream.trailer(new HeadersFrame(new MetaData(HttpVersion.HTTP_3, HttpFields.EMPTY), true)).get(5, TimeUnit.SECONDS);
assertTrue(serverDataLatch.await(5, TimeUnit.SECONDS));
// Wait a little to be sure we do not spin.
Thread.sleep(500);
assertEquals(1, onDataAvailableCalls.get());
assertTrue(serverTrailerLatch.await(5, TimeUnit.SECONDS));
assertEquals(1, onDataAvailableCalls.get());
}
@Test
@ -313,7 +293,7 @@ public class DataDemandTest extends AbstractClientServerTest
}
if (dataRead.addAndGet(data.getByteBuffer().remaining()) == dataLength)
serverDataLatch.countDown();
else
if (!data.isLast())
stream.demand();
}

View File

@ -29,7 +29,6 @@ import org.eclipse.jetty.http3.server.AbstractHTTP3ServerConnectionFactory;
import org.junit.jupiter.api.Test;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -60,30 +59,9 @@ public class StreamIdleTimeoutTest extends AbstractClientServerTest
return new Stream.Server.Listener()
{
@Override
public void onDataAvailable(Stream.Server stream)
public void onFailure(Stream.Server stream, long error, Throwable failure)
{
// When the client closes the stream, the server
// may either receive an empty, last, DATA frame, or
// an exception because the stream has been reset.
try
{
Stream.Data data = stream.readData();
if (data != null)
{
assertTrue(data.isLast());
assertEquals(0, data.getByteBuffer().remaining());
serverLatch.countDown();
}
else
{
stream.demand();
}
}
catch (Exception x)
{
serverLatch.countDown();
throw x;
}
serverLatch.countDown();
}
};
}