Issue #6728 - QUIC and HTTP/3

- More fixes and improvement to HTTP client transport tests.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-10-29 13:27:41 +02:00
parent 6fbe79af46
commit aadc86d36a
11 changed files with 121 additions and 55 deletions

View File

@ -105,8 +105,8 @@ public class ClientHTTP3Session extends ClientProtocolSession
settings = Map.of();
// TODO: add default settings.
SettingsFrame frame = new SettingsFrame(settings);
controlFlusher.offer(frame, Callback.from(Invocable.InvocationType.NON_BLOCKING, session::onOpen, this::fail));
controlFlusher.iterate();
if (controlFlusher.offer(frame, Callback.from(Invocable.InvocationType.NON_BLOCKING, session::onOpen, this::fail)))
controlFlusher.iterate();
}
@Override
@ -192,15 +192,15 @@ public class ClientHTTP3Session extends ClientProtocolSession
void writeControlFrame(Frame frame, Callback callback)
{
controlFlusher.offer(frame, callback);
controlFlusher.iterate();
if (controlFlusher.offer(frame, callback))
controlFlusher.iterate();
}
void writeMessageFrame(long streamId, Frame frame, Callback callback)
{
QuicStreamEndPoint endPoint = getOrCreateStreamEndPoint(streamId, this::openProtocolEndPoint);
messageFlusher.offer(endPoint, frame, callback);
messageFlusher.iterate();
if (messageFlusher.offer(endPoint, frame, callback))
messageFlusher.iterate();
}
public void onDataAvailable(long streamId)

View File

@ -86,14 +86,6 @@ public interface Stream
* <p>{@link Stream.Data} objects may be stored away for later, asynchronous,
* processing (for example, to process them only when all of them have been
* received).</p>
* <p>This method <em>must only</em> be called when there is no outstanding
* {@link #demand() demand}.</p>
* <p>Practically, this means that this method should be called either
* synchronously from within {@link Stream.Listener#onDataAvailable(Stream)},
* or applications must arrange, for example using a
* {@link java.util.concurrent.Semaphore}, that a call to
* {@link Stream.Listener#onDataAvailable(Stream)} is made before
* calling this method (possibly from a different thread).</p>
*
* @return a {@link Stream.Data} object containing the request bytes or
* the response bytes, or null if no bytes are available

View File

@ -41,6 +41,7 @@ public class ControlFlusher extends IteratingCallback
private final ControlGenerator generator;
private final QuicStreamEndPoint endPoint;
private boolean initialized;
private Throwable terminated;
private List<Entry> entries;
private InvocationType invocationType = InvocationType.NON_BLOCKING;
@ -51,12 +52,19 @@ public class ControlFlusher extends IteratingCallback
this.generator = new ControlGenerator(useDirectByteBuffers);
}
public void offer(Frame frame, Callback callback)
public boolean offer(Frame frame, Callback callback)
{
Throwable closed;
try (AutoLock l = lock.lock())
{
queue.offer(new Entry(frame, callback));
closed = terminated;
if (closed == null)
queue.offer(new Entry(frame, callback));
}
if (closed == null)
return true;
callback.failed(closed);
return false;
}
@Override
@ -119,10 +127,22 @@ public class ControlFlusher extends IteratingCallback
lease.recycle();
entries.forEach(e -> e.callback.failed(failure));
List<Entry> allEntries = new ArrayList<>(entries);
entries.clear();
try (AutoLock l = lock.lock())
{
terminated = failure;
allEntries.addAll(queue);
queue.clear();
}
// TODO: I guess we should fail the whole connection, as we cannot proceed without the control stream.
allEntries.forEach(e -> e.callback.failed(failure));
long error = HTTP3ErrorCode.INTERNAL_ERROR.code();
endPoint.close(error, failure);
// Cannot continue without the control stream, close the session.
endPoint.getQuicSession().getProtocolSession().outwardClose(error, "control_stream_failure");
}
@Override

View File

@ -183,9 +183,6 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
if (LOG.isDebugEnabled())
LOG.debug("reading data on {}", this);
if (hasDemand())
throw new IllegalStateException("invalid call to readData(): outstanding demand");
switch (parseAndFill(false))
{
case FRAME:
@ -224,6 +221,7 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
}
catch (Throwable x)
{
cancelDemand();
getEndPoint().close(HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code(), x);
// Rethrow so the application has a chance to handle it.
throw x;
@ -260,6 +258,14 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
}
}
private void cancelDemand()
{
try (AutoLock l = lock.lock())
{
dataDemand = false;
}
}
private boolean isStalled()
{
try (AutoLock l = lock.lock())

View File

@ -42,6 +42,7 @@ public class InstructionFlusher extends IteratingCallback
private final QuicStreamEndPoint endPoint;
private final long streamType;
private boolean initialized;
private Throwable terminated;
public InstructionFlusher(QuicSession session, QuicStreamEndPoint endPoint, long streamType)
{
@ -50,12 +51,16 @@ public class InstructionFlusher extends IteratingCallback
this.streamType = streamType;
}
public void offer(List<Instruction> instructions)
public boolean offer(List<Instruction> instructions)
{
Throwable closed;
try (AutoLock l = lock.lock())
{
queue.addAll(instructions);
closed = terminated;
if (closed == null)
queue.addAll(instructions);
}
return closed == null;
}
@Override
@ -96,7 +101,9 @@ public class InstructionFlusher extends IteratingCallback
{
if (LOG.isDebugEnabled())
LOG.debug("succeeded to write {} on {}", lease.getByteBuffers(), this);
lease.recycle();
super.succeeded();
}
@ -105,7 +112,20 @@ public class InstructionFlusher extends IteratingCallback
{
if (LOG.isDebugEnabled())
LOG.debug("failed to write {} on {}", lease.getByteBuffers(), this, failure);
// TODO
lease.recycle();
try (AutoLock l = lock.lock())
{
terminated = failure;
queue.clear();
}
long error = HTTP3ErrorCode.INTERNAL_ERROR.code();
endPoint.close(error, failure);
// Cannot continue without the instruction stream, close the session.
endPoint.getQuicSession().getProtocolSession().outwardClose(error, "instruction_stream_failure");
}
@Override

View File

@ -33,9 +33,16 @@ public class InstructionHandler implements Instruction.Handler
@Override
public void onInstructions(List<Instruction> instructions)
{
if (LOG.isDebugEnabled())
LOG.debug("processing {}", instructions);
encoderFlusher.offer(instructions);
encoderFlusher.iterate();
if (encoderFlusher.offer(instructions))
{
if (LOG.isDebugEnabled())
LOG.debug("processing {}", instructions);
encoderFlusher.iterate();
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("could not process {}", instructions);
}
}
}

View File

@ -34,7 +34,7 @@ public class MessageFlusher extends IteratingCallback
private static final Logger LOG = LoggerFactory.getLogger(MessageFlusher.class);
private final AutoLock lock = new AutoLock();
private final Queue<Entry> queue = new ArrayDeque<>();
private final Queue<Entry> entries = new ArrayDeque<>();
private final ByteBufferPool.Lease lease;
private final MessageGenerator generator;
private Entry entry;
@ -45,12 +45,13 @@ public class MessageFlusher extends IteratingCallback
this.generator = new MessageGenerator(encoder, maxHeadersLength, useDirectByteBuffers);
}
public void offer(QuicStreamEndPoint endPoint, Frame frame, Callback callback)
public boolean offer(QuicStreamEndPoint endPoint, Frame frame, Callback callback)
{
try (AutoLock l = lock.lock())
{
queue.offer(new Entry(endPoint, frame, callback));
entries.offer(new Entry(endPoint, frame, callback));
}
return true;
}
@Override
@ -58,7 +59,7 @@ public class MessageFlusher extends IteratingCallback
{
try (AutoLock l = lock.lock())
{
entry = queue.poll();
entry = entries.poll();
if (entry == null)
return Action.IDLE;
}
@ -74,7 +75,7 @@ public class MessageFlusher extends IteratingCallback
return Action.SCHEDULED;
}
int generated = generator.generate(lease, entry.endPoint.getStreamId(), frame, this::fail);
int generated = generator.generate(lease, entry.endPoint.getStreamId(), frame, this::failed);
if (generated < 0)
return Action.SCHEDULED;
@ -91,30 +92,29 @@ public class MessageFlusher extends IteratingCallback
public void succeeded()
{
if (LOG.isDebugEnabled())
LOG.debug("succeeded to flush {} on {}", entry, this);
LOG.debug("succeeded to write {} on {}", entry, this);
lease.recycle();
entry.callback.succeeded();
entry = null;
super.succeeded();
}
private void fail(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("failed to flush {} on {}", entry, this);
lease.recycle();
entry.callback.failed(x);
entry = null;
// Continue the iteration.
super.succeeded();
}
@Override
protected void onCompleteFailure(Throwable failure)
public void failed(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("failed to flush {} on {}", entry, this, failure);
// TODO
LOG.debug("failed to write {} on {}", entry, this, x);
lease.recycle();
entry.callback.failed(x);
entry = null;
// Continue the iteration.
super.succeeded();
}
@Override

View File

@ -270,6 +270,7 @@ public class HttpChannelOverHTTP3 extends HttpChannel
public void onFailure(Throwable failure)
{
//TODO
throw new UnsupportedOperationException(failure);
// getHttpTransport().onStreamFailure(failure);
// boolean handle = failed(failure);
// consumeInput();
@ -369,13 +370,28 @@ public class HttpChannelOverHTTP3 extends HttpChannel
@Override
public boolean failAllContent(Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("failing all content with {} {}", failure, this);
// TODO: must read as much as possible to seek EOF.
HttpInput.Content result;
try (AutoLock l = lock.lock())
{
result = content;
if (result == null)
return false;
if (result.isSpecial())
return result.isEof();
content = null;
}
result.failed(failure);
return false;
}
@Override
public boolean failed(Throwable failure)
{
return false;
// TODO
throw new UnsupportedOperationException(failure);
}
@Override

View File

@ -104,8 +104,8 @@ public class ServerHTTP3Session extends ServerProtocolSession
settings = Map.of();
// TODO: add default settings.
SettingsFrame frame = new SettingsFrame(settings);
controlFlusher.offer(frame, Callback.from(Invocable.InvocationType.NON_BLOCKING, session::onOpen, this::fail));
controlFlusher.iterate();
if (controlFlusher.offer(frame, Callback.from(Invocable.InvocationType.NON_BLOCKING, session::onOpen, this::fail)))
controlFlusher.iterate();
}
private void fail(Throwable failure)
@ -184,15 +184,15 @@ public class ServerHTTP3Session extends ServerProtocolSession
void writeControlFrame(Frame frame, Callback callback)
{
controlFlusher.offer(frame, callback);
controlFlusher.iterate();
if (controlFlusher.offer(frame, callback))
controlFlusher.iterate();
}
void writeMessageFrame(long streamId, Frame frame, Callback callback)
{
QuicStreamEndPoint endPoint = getOrCreateStreamEndPoint(streamId, this::openProtocolEndPoint);
messageFlusher.offer(endPoint, frame, callback);
messageFlusher.iterate();
if (messageFlusher.offer(endPoint, frame, callback))
messageFlusher.iterate();
}
public void onDataAvailable(long streamId)

View File

@ -473,6 +473,10 @@ public class HttpClientStreamTest extends AbstractTest<TransportScenario>
@ArgumentsSource(TransportProvider.class)
public void testInputStreamResponseListenerFailedBeforeResponse(Transport transport) throws Exception
{
// Failure to connect is based on TCP connection refused
// (as the server is stopped), which does not work for UDP.
Assumptions.assumeTrue(transport != Transport.H3);
init(transport);
scenario.start(new EmptyServerHandler());
String uri = scenario.newURI();

View File

@ -71,7 +71,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class HttpClientTest extends AbstractTest<TransportScenario>
public class HttpClientTest extends AbstractTest<TransportScenario>
{
@Override
public void init(Transport transport) throws IOException
@ -700,6 +700,7 @@ public class HttpClientTest extends AbstractTest<TransportScenario>
public void testIPv6Host(Transport transport) throws Exception
{
Assumptions.assumeTrue(Net.isIpv6InterfaceAvailable());
Assumptions.assumeTrue(transport != Transport.UNIX_DOMAIN);
init(transport);
scenario.start(new EmptyServerHandler()