Issue #6728 - QUIC and HTTP/3

- Fixed end of stream detection.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-11-02 09:10:22 +01:00
parent 521a0adf0e
commit fd118a4766
4 changed files with 20 additions and 17 deletions

View File

@ -54,7 +54,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
private static final Logger LOG = LoggerFactory.getLogger(HTTP3Session.class); private static final Logger LOG = LoggerFactory.getLogger(HTTP3Session.class);
private final AutoLock lock = new AutoLock(); private final AutoLock lock = new AutoLock();
private final AtomicLong lastId = new AtomicLong(); private final AtomicLong lastStreamId = new AtomicLong(0);
private final Map<Long, HTTP3Stream> streams = new ConcurrentHashMap<>(); private final Map<Long, HTTP3Stream> streams = new ConcurrentHashMap<>();
private final ProtocolSession session; private final ProtocolSession session;
private final Session.Listener listener; private final Session.Listener listener;
@ -225,7 +225,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
protected GoAwayFrame newGoAwayFrame(boolean graceful) protected GoAwayFrame newGoAwayFrame(boolean graceful)
{ {
return new GoAwayFrame(lastId.get()); return new GoAwayFrame(lastStreamId.get());
} }
public CompletableFuture<Void> shutdown() public CompletableFuture<Void> shutdown()
@ -241,9 +241,9 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
return result; return result;
} }
protected void updateLastId(long id) private void updateLastStreamId(long id)
{ {
Atomics.updateMax(lastId, id); Atomics.updateMax(lastStreamId, id);
} }
public long getIdleTimeout() public long getIdleTimeout()
@ -340,6 +340,8 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
long idleTimeout = getStreamIdleTimeout(); long idleTimeout = getStreamIdleTimeout();
if (idleTimeout > 0) if (idleTimeout > 0)
stream.setIdleTimeout(idleTimeout); stream.setIdleTimeout(idleTimeout);
if (!local)
updateLastStreamId(stream.getId());
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("created {}", stream); LOG.debug("created {}", stream);
return stream; return stream;

View File

@ -396,20 +396,24 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
if (filled > 0) if (filled > 0)
continue; continue;
if (!remotelyClosed && getEndPoint().isStreamFinished())
{
if (LOG.isDebugEnabled())
LOG.debug("detected end of stream on {}", this);
parser.parse(EMPTY_DATA_FRAME.slice());
return MessageParser.Result.FRAME;
}
if (filled == 0) if (filled == 0)
{ {
// Workaround for a Quiche glitch, that sometimes reports
// an HTTP/3 frame with last=false, but a subsequent read
// of zero bytes reports that the stream is finished.
if (!remotelyClosed && getEndPoint().isStreamFinished())
{
if (LOG.isDebugEnabled())
LOG.debug("detected end of stream on {}", this);
parser.parse(EMPTY_DATA_FRAME.slice());
return MessageParser.Result.FRAME;
}
setNoData(true); setNoData(true);
if (setFillInterest) if (setFillInterest)
fillInterested(); fillInterested();
} }
return MessageParser.Result.NO_FRAME; return MessageParser.Result.NO_FRAME;
} }
} }

View File

@ -62,10 +62,7 @@ public class HTTP3SessionServer extends HTTP3Session implements Session.Server
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("received request {} on {}", frame, stream); LOG.debug("received request {} on {}", frame, stream);
if (stream != null) if (stream != null)
{
updateLastId(streamId);
stream.onRequest(frame); stream.onRequest(frame);
}
} }
else else
{ {

View File

@ -26,7 +26,7 @@ public class ServerProtocolSession extends ProtocolSession
private static final Logger LOG = LoggerFactory.getLogger(ServerProtocolSession.class); private static final Logger LOG = LoggerFactory.getLogger(ServerProtocolSession.class);
private final Runnable producer = Invocable.from(Invocable.InvocationType.BLOCKING, this::produce); private final Runnable producer = Invocable.from(Invocable.InvocationType.BLOCKING, this::produce);
private final Consumer<QuicStreamEndPoint> configureProtocolEndPoint = this::openProtocolEndPoint; private final Consumer<QuicStreamEndPoint> openProtocolEndPoint = this::openProtocolEndPoint;
public ServerProtocolSession(ServerQuicSession session) public ServerProtocolSession(ServerQuicSession session)
{ {
@ -71,7 +71,7 @@ public class ServerProtocolSession extends ProtocolSession
protected boolean onReadable(long readableStreamId) protected boolean onReadable(long readableStreamId)
{ {
// On the server, we need a get-or-create semantic in case of reads. // On the server, we need a get-or-create semantic in case of reads.
QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId, configureProtocolEndPoint); QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId, openProtocolEndPoint);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("stream #{} selected for read: {}", readableStreamId, streamEndPoint); LOG.debug("stream #{} selected for read: {}", readableStreamId, streamEndPoint);
return streamEndPoint.onReadable(); return streamEndPoint.onReadable();