Merged branch 'jetty-9.4.x' into 'jetty-10.0.x'.

This commit is contained in:
Simone Bordet 2020-06-09 16:11:53 +02:00
commit dd6cd4b881
15 changed files with 532 additions and 267 deletions

View File

@ -21,7 +21,6 @@ package org.eclipse.jetty.http2.client;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.servlet.http.HttpServlet;
import org.eclipse.jetty.http.HostPortHttpField;
@ -33,7 +32,7 @@ import org.eclipse.jetty.http2.FlowControlStrategy;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.HttpConfiguration;
@ -54,7 +53,7 @@ public class AbstractTest
protected void start(HttpServlet servlet) throws Exception
{
HTTP2ServerConnectionFactory connectionFactory = new HTTP2ServerConnectionFactory(new HttpConfiguration());
HTTP2CServerConnectionFactory connectionFactory = new HTTP2CServerConnectionFactory(new HttpConfiguration());
connectionFactory.setInitialSessionRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
connectionFactory.setInitialStreamRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
prepareServer(connectionFactory);

View File

@ -1,80 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http2.client;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.util.Promise;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class InvalidServerTest extends AbstractTest
{
@Test
public void testInvalidPreface() throws Exception
{
try (ServerSocket server = new ServerSocket(0))
{
prepareClient();
client.start();
CountDownLatch failureLatch = new CountDownLatch(1);
Promise.Completable<Session> promise = new Promise.Completable<>();
InetSocketAddress address = new InetSocketAddress("localhost", server.getLocalPort());
client.connect(address, new Session.Listener.Adapter()
{
@Override
public void onFailure(Session session, Throwable failure)
{
failureLatch.countDown();
}
}, promise);
try (Socket socket = server.accept())
{
OutputStream output = socket.getOutputStream();
output.write("enough_junk_bytes".getBytes(StandardCharsets.UTF_8));
Session session = promise.get(5, TimeUnit.SECONDS);
assertNotNull(session);
assertTrue(failureLatch.await(5, TimeUnit.SECONDS));
// Verify that the client closed the socket.
InputStream input = socket.getInputStream();
while (true)
{
int read = input.read();
if (read < 0)
break;
}
}
}
}
}

View File

@ -18,7 +18,11 @@
package org.eclipse.jetty.http2.client;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
@ -41,6 +45,7 @@ import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.FrameType;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PingFrame;
import org.eclipse.jetty.http2.frames.PrefaceFrame;
@ -63,6 +68,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class PrefaceTest extends AbstractTest
@ -332,4 +338,71 @@ public class PrefaceTest extends AbstractTest
assertTrue(clientSettingsLatch.await(5, TimeUnit.SECONDS));
}
}
@Test
public void testInvalidServerPreface() throws Exception
{
try (ServerSocket server = new ServerSocket(0))
{
prepareClient();
client.start();
CountDownLatch failureLatch = new CountDownLatch(1);
Promise.Completable<Session> promise = new Promise.Completable<>();
InetSocketAddress address = new InetSocketAddress("localhost", server.getLocalPort());
client.connect(address, new Session.Listener.Adapter()
{
@Override
public void onFailure(Session session, Throwable failure)
{
failureLatch.countDown();
}
}, promise);
try (Socket socket = server.accept())
{
OutputStream output = socket.getOutputStream();
output.write("enough_junk_bytes".getBytes(StandardCharsets.UTF_8));
Session session = promise.get(5, TimeUnit.SECONDS);
assertNotNull(session);
assertTrue(failureLatch.await(5, TimeUnit.SECONDS));
// Verify that the client closed the socket.
InputStream input = socket.getInputStream();
while (true)
{
int read = input.read();
if (read < 0)
break;
}
}
}
}
@Test
public void testInvalidClientPreface() throws Exception
{
start(new ServerSessionListener.Adapter());
try (Socket client = new Socket("localhost", connector.getLocalPort()))
{
OutputStream output = client.getOutputStream();
output.write("enough_junk_bytes".getBytes(StandardCharsets.UTF_8));
output.flush();
byte[] bytes = new byte[1024];
InputStream input = client.getInputStream();
int read = input.read(bytes);
if (read < 0)
{
// Closing the connection without GOAWAY frame is fine.
return;
}
int type = bytes[3];
assertEquals(FrameType.GO_AWAY.getType(), type);
}
}
}

View File

@ -425,12 +425,21 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
super.failed(x);
}
/**
* @return whether the entry is stale and must not be processed
*/
private boolean isStale()
{
return !isProtocol() && stream != null && stream.isReset();
// If it is a protocol frame, process it.
if (isProtocolFrame(frame))
return false;
// It's an application frame; is the stream gone already?
if (stream == null)
return true;
return stream.isReset();
}
private boolean isProtocol()
private boolean isProtocolFrame(Frame frame)
{
switch (frame.getType())
{

View File

@ -447,7 +447,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
// We received a GO_AWAY, so try to write
// what's in the queue and then disconnect.
closeFrame = frame;
notifyClose(this, frame, new DisconnectCallback());
onClose(frame, new DisconnectCallback());
return;
}
@ -514,9 +514,15 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
public void onStreamFailure(int streamId, int error, String reason)
{
Callback callback = new ResetCallback(streamId, error, Callback.NOOP);
Throwable failure = toFailure("Stream failure", error, reason);
onStreamFailure(streamId, error, reason, failure, callback);
}
private void onStreamFailure(int streamId, int error, String reason, Throwable failure, Callback callback)
{
IStream stream = getStream(streamId);
if (stream != null)
stream.process(new FailureFrame(error, reason), callback);
stream.process(new FailureFrame(error, reason, failure), callback);
else
callback.succeeded();
}
@ -529,38 +535,51 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
protected void onConnectionFailure(int error, String reason, Callback callback)
{
notifyFailure(this, new IOException(String.format("%d/%s", error, reason)), new FailureCallback(error, reason, callback));
Throwable failure = toFailure("Session failure", error, reason);
onFailure(error, reason, failure, new FailureCallback(error, reason, callback));
}
protected void abort(Throwable failure)
{
onFailure(ErrorCode.NO_ERROR.code, null, failure, new TerminateCallback(failure));
}
private void onFailure(int error, String reason, Throwable failure, Callback callback)
{
Collection<Stream> streams = getStreams();
int count = streams.size();
Callback countCallback = new CountingCallback(callback, count + 1);
for (Stream stream : streams)
{
onStreamFailure(stream.getId(), error, reason, failure, countCallback);
}
notifyFailure(this, failure, countCallback);
}
private void onClose(GoAwayFrame frame, Callback callback)
{
int error = frame.getError();
String reason = frame.tryConvertPayload();
Throwable failure = toFailure("Session close", error, reason);
Collection<Stream> streams = getStreams();
int count = streams.size();
Callback countCallback = new CountingCallback(callback, count + 1);
for (Stream stream : streams)
{
onStreamFailure(stream.getId(), error, reason, failure, countCallback);
}
notifyClose(this, frame, countCallback);
}
private Throwable toFailure(String message, int error, String reason)
{
return new IOException(String.format("%s %s/%s", message, ErrorCode.toString(error, null), reason));
}
@Override
public void newStream(HeadersFrame frame, Promise<Stream> promise, Stream.Listener listener)
{
streamCreator.newStream(frame, promise, listener);
/*
try
{
// Synchronization is necessary to atomically create
// the stream id and enqueue the frame to be sent.
IStream stream;
boolean queued;
synchronized (this)
{
HeadersFrame[] frameOut = new HeadersFrame[1];
stream = newLocalStream(frame, frameOut);
stream.setListener(listener);
ControlEntry entry = new ControlEntry(frameOut[0], stream, new StreamPromiseCallback(promise, stream));
queued = flusher.append(entry);
}
stream.process(new PrefaceFrame(), Callback.NOOP);
// Iterate outside the synchronized block.
if (queued)
flusher.iterate();
}
catch (Throwable x)
{
promise.failed(x);
}
*/
}
/**
@ -1100,11 +1119,6 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
}
}
protected void abort(Throwable failure)
{
notifyFailure(this, failure, new TerminateCallback(failure));
}
public boolean isDisconnected()
{
return !endPoint.isOpen();
@ -1629,7 +1643,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
public void failed(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("CloseCallback failed", x);
LOG.debug("FailureCallback failed", x);
complete();
}

View File

@ -151,7 +151,6 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
{
if (writing.compareAndSet(null, callback))
return true;
close();
callback.failed(new WritePendingException());
return false;
}
@ -190,7 +189,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
public boolean isRemotelyClosed()
{
CloseState state = closeState.get();
return state == CloseState.REMOTELY_CLOSED || state == CloseState.CLOSING;
return state == CloseState.REMOTELY_CLOSED || state == CloseState.CLOSING || state == CloseState.CLOSED;
}
public boolean isLocallyClosed()
@ -462,6 +461,8 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
private void onFailure(FailureFrame frame, Callback callback)
{
// Don't close or remove the stream, as the listener may
// want to use it, for example to send a RST_STREAM frame.
notifyFailure(this, frame, callback);
}
@ -749,7 +750,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
{
try
{
listener.onFailure(stream, frame.getError(), frame.getReason(), callback);
listener.onFailure(stream, frame.getError(), frame.getReason(), frame.getFailure(), callback);
}
catch (Throwable x)
{

View File

@ -305,9 +305,10 @@ public interface Stream
* @param stream the stream
* @param error the error code
* @param reason the error reason, or null
* @param failure the failure
* @param callback the callback to complete when the failure has been handled
*/
public default void onFailure(Stream stream, int error, String reason, Callback callback)
public default void onFailure(Stream stream, int error, String reason, Throwable failure, Callback callback)
{
callback.succeeded();
}

View File

@ -22,12 +22,14 @@ public class FailureFrame extends Frame
{
private final int error;
private final String reason;
private final Throwable failure;
public FailureFrame(int error, String reason)
public FailureFrame(int error, String reason, Throwable failure)
{
super(FrameType.FAILURE);
this.error = error;
this.reason = reason;
this.failure = failure;
}
public int getError()
@ -39,4 +41,9 @@ public class FailureFrame extends Frame
{
return reason;
}
public Throwable getFailure()
{
return failure;
}
}

View File

@ -61,7 +61,7 @@ public class Generator
this.generators[FrameType.WINDOW_UPDATE.getType()] = new WindowUpdateGenerator(headerGenerator);
this.generators[FrameType.CONTINUATION.getType()] = null; // Never generated explicitly.
this.generators[FrameType.PREFACE.getType()] = new PrefaceGenerator();
this.generators[FrameType.DISCONNECT.getType()] = new DisconnectGenerator();
this.generators[FrameType.DISCONNECT.getType()] = new NoOpGenerator();
this.dataGenerator = new DataGenerator(headerGenerator);
}

View File

@ -21,9 +21,9 @@ package org.eclipse.jetty.http2.generator;
import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.io.ByteBufferPool;
public class DisconnectGenerator extends FrameGenerator
public class NoOpGenerator extends FrameGenerator
{
public DisconnectGenerator()
public NoOpGenerator()
{
super(null);
}

View File

@ -18,8 +18,6 @@
package org.eclipse.jetty.http2.client.http;
import java.io.IOException;
import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpExchange;
@ -208,10 +206,10 @@ public class HttpChannelOverHTTP2 extends HttpChannel
}
@Override
public void onFailure(Stream stream, int error, String reason, Callback callback)
public void onFailure(Stream stream, int error, String reason, Throwable failure, Callback callback)
{
HTTP2Channel.Client channel = (HTTP2Channel.Client)((IStream)stream).getAttachment();
channel.onFailure(new IOException(String.format("Failure %s/%s", ErrorCode.toString(error, null), reason)), callback);
channel.onFailure(failure, callback);
}
@Override

View File

@ -23,7 +23,6 @@ import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
@ -43,7 +42,6 @@ import org.eclipse.jetty.http2.HTTP2Channel;
import org.eclipse.jetty.http2.HTTP2Connection;
import org.eclipse.jetty.http2.ISession;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.Frame;
@ -59,7 +57,6 @@ import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.CountingCallback;
import org.eclipse.jetty.util.TypeUtil;
public class HTTP2ServerConnection extends HTTP2Connection
@ -208,13 +205,17 @@ public class HTTP2ServerConnection extends HTTP2Connection
public void onStreamFailure(IStream stream, Throwable failure, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("Processing failure on {}: {}", stream, failure);
LOG.debug("Processing stream failure on {}", stream, failure);
HTTP2Channel.Server channel = (HTTP2Channel.Server)stream.getAttachment();
if (channel != null)
{
Runnable task = channel.onFailure(failure, callback);
if (task != null)
{
// We must dispatch to another thread because the task
// may call application code that performs blocking I/O.
offerTask(task, true);
}
}
else
{
@ -239,22 +240,10 @@ public class HTTP2ServerConnection extends HTTP2Connection
public void onSessionFailure(Throwable failure, Callback callback)
{
ISession session = getSession();
if (LOG.isDebugEnabled())
LOG.debug("Processing failure on {}: {}", session, failure);
Collection<Stream> streams = session.getStreams();
if (streams.isEmpty())
{
callback.succeeded();
}
else
{
CountingCallback counter = new CountingCallback(callback, streams.size());
for (Stream stream : streams)
{
onStreamFailure((IStream)stream, failure, counter);
}
}
LOG.debug("Processing session failure on {}", getSession(), failure);
// All the streams have already been failed, just succeed the callback.
callback.succeeded();
}
public void push(Connector connector, IStream stream, MetaData.Request request)

View File

@ -34,6 +34,7 @@ import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.QuietException;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.NegotiatingServerConnection.CipherDiscriminator;
@ -122,19 +123,14 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
return getConnection().onSessionTimeout(new TimeoutException("Session idle timeout " + idleTimeout + " ms"));
}
@Override
public boolean onIdleTimeout(Stream stream, Throwable x)
{
return getConnection().onStreamTimeout((IStream)stream, x);
}
@Override
public void onClose(Session session, GoAwayFrame frame, Callback callback)
{
String reason = frame.tryConvertPayload();
if (!StringUtil.isEmpty(reason))
reason = " (" + reason + ")";
getConnection().onSessionFailure(new EofException(String.format("Close %s/%s", ErrorCode.toString(frame.getError(), null), reason)), callback);
EofException failure = new EofException(String.format("Close %s/%s", ErrorCode.toString(frame.getError(), null), reason));
onFailure(session, failure, callback);
}
@Override
@ -143,12 +139,6 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
getConnection().onSessionFailure(failure, callback);
}
@Override
public void onFailure(Stream stream, int error, String reason, Callback callback)
{
getConnection().onStreamFailure((IStream)stream, new EofException(String.format("Failure %s/%s", ErrorCode.toString(error, null), reason)), callback);
}
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
@ -175,7 +165,27 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
@Override
public void onReset(Stream stream, ResetFrame frame, Callback callback)
{
getConnection().onStreamFailure((IStream)stream, new EofException("Reset " + ErrorCode.toString(frame.getError(), null)), callback);
EofException failure = new EofException("Reset " + ErrorCode.toString(frame.getError(), null));
onFailure(stream, failure, callback);
}
@Override
public void onFailure(Stream stream, int error, String reason, Throwable failure, Callback callback)
{
if (!(failure instanceof QuietException))
failure = new EofException(failure);
onFailure(stream, failure, callback);
}
private void onFailure(Stream stream, Throwable failure, Callback callback)
{
getConnection().onStreamFailure((IStream)stream, failure, callback);
}
@Override
public boolean onIdleTimeout(Stream stream, Throwable x)
{
return getConnection().onStreamTimeout((IStream)stream, x);
}
private void close(Stream stream, String reason)

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.http2.server;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.eclipse.jetty.http.BadMessageException;
@ -82,7 +83,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport
}
@Override
public void send(MetaData.Request request, MetaData.Response response, ByteBuffer content, boolean lastContent, Callback callback)
public void send(MetaData.Request request, final MetaData.Response response, ByteBuffer content, boolean lastContent, Callback callback)
{
boolean isHeadRequest = HttpMethod.HEAD.is(request.getMethod());
boolean hasContent = BufferUtil.hasContent(content) && !isHeadRequest;
@ -100,8 +101,8 @@ public class HttpTransportOverHTTP2 implements HttpTransport
}
else
{
if (transportCallback.start(callback, false))
sendHeadersFrame(response, false, transportCallback);
transportCallback.send(callback, false, c ->
sendHeadersFrame(metaData, false, c));
}
}
else
@ -114,7 +115,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport
long contentLength = response.getContentLength();
if (contentLength < 0)
{
response = new MetaData.Response(
metaData = new MetaData.Response(
response.getHttpVersion(),
response.getStatus(),
response.getReason(),
@ -142,53 +143,53 @@ public class HttpTransportOverHTTP2 implements HttpTransport
HttpFields trailers = retrieveTrailers();
if (trailers != null)
{
if (transportCallback.start(new SendTrailers(getCallback(), trailers), false))
sendDataFrame(content, true, false, transportCallback);
transportCallback.send(new SendTrailers(getCallback(), trailers), false, c ->
sendDataFrame(content, true, false, c));
}
else
{
if (transportCallback.start(getCallback(), false))
sendDataFrame(content, true, true, transportCallback);
transportCallback.send(getCallback(), false, c ->
sendDataFrame(content, true, true, c));
}
}
else
{
if (transportCallback.start(getCallback(), false))
sendDataFrame(content, false, false, transportCallback);
transportCallback.send(getCallback(), false, c ->
sendDataFrame(content, false, false, c));
}
}
};
if (transportCallback.start(commitCallback, true))
sendHeadersFrame(response, false, transportCallback);
transportCallback.send(commitCallback, true, c ->
sendHeadersFrame(metaData, false, c));
}
else
{
if (lastContent)
{
if (isTunnel(request, response))
if (isTunnel(request, metaData))
{
if (transportCallback.start(callback, true))
sendHeadersFrame(response, false, transportCallback);
transportCallback.send(callback, true, c ->
sendHeadersFrame(metaData, false, c));
}
else
{
HttpFields trailers = retrieveTrailers();
if (trailers != null)
{
if (transportCallback.start(new SendTrailers(callback, trailers), true))
sendHeadersFrame(response, false, transportCallback);
transportCallback.send(new SendTrailers(callback, trailers), true, c ->
sendHeadersFrame(metaData, false, c));
}
else
{
if (transportCallback.start(callback, true))
sendHeadersFrame(response, true, transportCallback);
transportCallback.send(callback, true, c ->
sendHeadersFrame(metaData, true, c));
}
}
}
else
{
if (transportCallback.start(callback, true))
sendHeadersFrame(response, false, transportCallback);
transportCallback.send(callback, true, c ->
sendHeadersFrame(metaData, false, c));
}
}
}
@ -210,8 +211,8 @@ public class HttpTransportOverHTTP2 implements HttpTransport
SendTrailers sendTrailers = new SendTrailers(callback, trailers);
if (hasContent)
{
if (transportCallback.start(sendTrailers, false))
sendDataFrame(content, true, false, transportCallback);
transportCallback.send(sendTrailers, false, c ->
sendDataFrame(content, true, false, c));
}
else
{
@ -220,14 +221,14 @@ public class HttpTransportOverHTTP2 implements HttpTransport
}
else
{
if (transportCallback.start(callback, false))
sendDataFrame(content, true, true, transportCallback);
transportCallback.send(callback, false, c ->
sendDataFrame(content, true, true, c));
}
}
else
{
if (transportCallback.start(callback, false))
sendDataFrame(content, false, false, transportCallback);
transportCallback.send(callback, false, c ->
sendDataFrame(content, false, false, c));
}
}
else
@ -334,7 +335,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport
public boolean onStreamTimeout(Throwable failure)
{
return transportCallback.onIdleTimeout(failure);
return transportCallback.idleTimeout(failure);
}
/**
@ -397,119 +398,359 @@ public class HttpTransportOverHTTP2 implements HttpTransport
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
}
/**
* <p>Callback that controls sends initiated by the transport, by eventually
* notifying a nested callback.</p>
* <p>There are 3 sources of concurrency after a send is initiated:</p>
* <ul>
* <li>the completion of the send operation, either success or failure</li>
* <li>an asynchronous failure coming from the read side such as a stream
* being reset, or the connection being closed</li>
* <li>an asynchronous idle timeout</li>
* </ul>
* <p>The last 2 cases may happen <em>during</em> a send, when the frames
* are being generated in the flusher.
* In such cases, this class must avoid that the nested callback is notified
* while the frame generation is in progress, because the nested callback
* may modify other states (such as clearing the {@code HttpOutput._buffer})
* that are accessed during frame generation.</p>
* <p>The solution implemented in this class works by splitting the send
* operation in 3 parts: {@code pre-send}, {@code send} and {@code post-send}.
* Asynchronous state changes happening during {@code send} are stored
* and only executed in {@code post-send}, therefore never interfering
* with frame generation.</p>
*
* @see State
*/
private class TransportCallback implements Callback
{
private State state = State.IDLE;
private Callback callback;
private Throwable failure;
private boolean commit;
private State _state = State.IDLE;
private Callback _callback;
private boolean _commit;
private Throwable _failure;
public boolean start(Callback callback, boolean commit)
private void reset(Throwable failure)
{
State state;
assert Thread.holdsLock(this);
_state = failure != null ? State.FAILED : State.IDLE;
_callback = null;
_commit = false;
_failure = failure;
}
private void send(Callback callback, boolean commit, Consumer<Callback> sendFrame)
{
Throwable failure = sending(callback, commit);
if (failure == null)
{
sendFrame.accept(this);
pending();
}
else
{
callback.failed(failure);
}
}
private Throwable sending(Callback callback, boolean commit)
{
synchronized (this)
{
switch (_state)
{
case IDLE:
{
_state = State.SENDING;
_callback = callback;
_commit = commit;
return null;
}
case FAILED:
{
return _failure;
}
default:
{
return new IllegalStateException("Invalid transport state: " + _state);
}
}
}
}
private void pending()
{
Callback callback;
boolean commit;
Throwable failure;
synchronized (this)
{
state = this.state;
failure = this.failure;
if (state == State.IDLE)
switch (_state)
{
this.state = State.WRITING;
this.callback = callback;
this.commit = commit;
return true;
case SENDING:
{
// The send has not completed the callback yet,
// wait for succeeded() or failed() to be called.
_state = State.PENDING;
return;
}
case SUCCEEDING:
{
// The send already completed successfully, but the
// call to succeeded() was delayed, so call it now.
callback = _callback;
commit = _commit;
failure = null;
reset(null);
break;
}
case FAILING:
{
// The send already completed with a failure, but
// the call to failed() was delayed, so call it now.
callback = _callback;
commit = _commit;
failure = _failure;
reset(failure);
break;
}
default:
{
callback = _callback;
commit = _commit;
failure = new IllegalStateException("Invalid transport state: " + _state);
reset(failure);
break;
}
}
}
if (failure == null)
failure = new IllegalStateException("Invalid transport state: " + state);
callback.failed(failure);
return false;
succeed(callback, commit);
else
fail(callback, commit, failure);
}
@Override
public void succeeded()
{
Callback callback;
boolean commit;
Callback callback = null;
synchronized (this)
{
commit = this.commit;
if (state == State.WRITING)
switch (_state)
{
this.state = State.IDLE;
callback = this.callback;
this.callback = null;
this.commit = false;
case SENDING:
{
_state = State.SUCCEEDING;
// Succeeding the callback will be done in postSend().
return;
}
case PENDING:
{
callback = _callback;
commit = _commit;
reset(null);
break;
}
default:
{
// This thread lost the race to succeed the current
// send, as other threads likely already failed it.
return;
}
}
}
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 Response #{}/{} {} {}",
stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
commit ? "commit" : "flush",
callback == null ? "failure" : "success");
if (callback != null)
callback.succeeded();
succeed(callback, commit);
}
@Override
public void failed(Throwable failure)
{
boolean commit;
Callback callback;
boolean commit;
synchronized (this)
{
commit = this.commit;
this.state = State.FAILED;
callback = this.callback;
this.callback = null;
this.failure = failure;
switch (_state)
{
case SENDING:
{
_state = State.FAILING;
_failure = failure;
// Failing the callback will be done in postSend().
return;
}
case IDLE:
case PENDING:
{
callback = _callback;
commit = _commit;
reset(failure);
break;
}
default:
{
// This thread lost the race to fail the current send,
// as other threads already succeeded or failed it.
return;
}
}
}
fail(callback, commit, failure);
}
private boolean idleTimeout(Throwable failure)
{
Callback callback;
boolean timeout;
synchronized (this)
{
switch (_state)
{
case PENDING:
{
// The send was started but idle timed out, fail it.
callback = _callback;
timeout = true;
reset(failure);
break;
}
case IDLE:
// The application may be suspended, ignore the idle timeout.
case SENDING:
// A send has been started at the same time of an idle timeout;
// Ignore the idle timeout and let the write continue normally.
case SUCCEEDING:
case FAILING:
// An idle timeout during these transient states is ignored.
case FAILED:
// Already failed, ignore the idle timeout.
{
callback = null;
timeout = false;
break;
}
default:
{
// Should not happen, but just in case.
callback = _callback;
if (callback == null)
callback = Callback.NOOP;
timeout = true;
failure = new IllegalStateException("Invalid transport state: " + _state, failure);
reset(failure);
break;
}
}
}
idleTimeout(callback, timeout, failure);
return timeout;
}
private void succeed(Callback callback, boolean commit)
{
if (LOG.isDebugEnabled())
LOG.debug(String.format("HTTP2 Response #%d/%h %s %s", stream.getId(), stream.getSession(),
commit ? "commit" : "flush", callback == null ? "ignored" : "failed"), failure);
LOG.debug("HTTP2 Response #{}/{} {} success",
stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
commit ? "commit" : "flush");
callback.succeeded();
}
private void fail(Callback callback, boolean commit, Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 Response #{}/{} {} failure",
stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
commit ? "commit" : "flush",
failure);
if (callback != null)
callback.failed(failure);
}
private boolean onIdleTimeout(Throwable failure)
private void idleTimeout(Callback callback, boolean timeout, Throwable failure)
{
boolean result;
Callback callback = null;
synchronized (this)
{
// Ignore idle timeouts if not writing,
// as the application may be suspended.
result = state == State.WRITING;
if (result)
{
this.state = State.TIMEOUT;
callback = this.callback;
this.callback = null;
this.failure = failure;
}
}
if (LOG.isDebugEnabled())
LOG.debug(String.format("HTTP2 Response #%d/%h idle timeout %s", stream.getId(), stream.getSession(), result ? "expired" : "ignored"), failure);
if (result)
LOG.debug("HTTP2 Response #{}/{} idle timeout {}",
stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
timeout ? "expired" : "ignored",
failure);
if (timeout)
callback.failed(failure);
return result;
}
@Override
public InvocationType getInvocationType()
{
Callback callback;
synchronized (this)
{
callback = this.callback;
}
return callback != null ? callback.getInvocationType() : Callback.super.getInvocationType();
}
}
/**
* <p>Send states for {@link TransportCallback}.</p>
*
* @see TransportCallback
*/
private enum State
{
IDLE, WRITING, FAILED, TIMEOUT
/**
* <p>No send initiated or in progress.</p>
* <p>Next states could be:</p>
* <ul>
* <li>{@link #SENDING}, when {@link TransportCallback#send(Callback, boolean, Consumer)}
* is called by the transport to initiate a send</li>
* <li>{@link #FAILED}, when {@link TransportCallback#failed(Throwable)}
* is called by an asynchronous failure</li>
* </ul>
*/
IDLE,
/**
* <p>A send is initiated; the nested callback in {@link TransportCallback}
* cannot be notified while in this state.</p>
* <p>Next states could be:</p>
* <ul>
* <li>{@link #SUCCEEDING}, when {@link TransportCallback#succeeded()}
* is called synchronously because the send succeeded</li>
* <li>{@link #FAILING}, when {@link TransportCallback#failed(Throwable)}
* is called synchronously because the send failed</li>
* <li>{@link #PENDING}, when {@link TransportCallback#pending()}
* is called before the send completes</li>
* </ul>
*/
SENDING,
/**
* <p>A send was initiated and is now pending, waiting for the {@link TransportCallback}
* to be notified of success or failure.</p>
* <p>Next states could be:</p>
* <ul>
* <li>{@link #IDLE}, when {@link TransportCallback#succeeded()}
* is called because the send succeeded</li>
* <li>{@link #FAILED}, when {@link TransportCallback#failed(Throwable)}
* is called because either the send failed, or an asynchronous failure happened</li>
* </ul>
*/
PENDING,
/**
* <p>A send was initiated and succeeded, but {@link TransportCallback#pending()}
* has not been called yet.</p>
* <p>This state indicates that the success actions (such as notifying the
* {@link TransportCallback} nested callback) must be performed when
* {@link TransportCallback#pending()} is called.</p>
* <p>Next states could be:</p>
* <ul>
* <li>{@link #IDLE}, when {@link TransportCallback#pending()}
* is called</li>
* </ul>
*/
SUCCEEDING,
/**
* <p>A send was initiated and failed, but {@link TransportCallback#pending()}
* has not been called yet.</p>
* <p>This state indicates that the failure actions (such as notifying the
* {@link TransportCallback} nested callback) must be performed when
* {@link TransportCallback#pending()} is called.</p>
* <p>Next states could be:</p>
* <ul>
* <li>{@link #FAILED}, when {@link TransportCallback#pending()}
* is called</li>
* </ul>
*/
FAILING,
/**
* <p>The terminal state indicating failure of the send.</p>
*/
FAILED
}
private class SendTrailers extends Callback.Nested
@ -525,8 +766,8 @@ public class HttpTransportOverHTTP2 implements HttpTransport
@Override
public void succeeded()
{
if (transportCallback.start(getCallback(), false))
sendTrailersFrame(new MetaData(HttpVersion.HTTP_2, trailers), transportCallback);
transportCallback.send(getCallback(), false, c ->
sendTrailersFrame(new MetaData(HttpVersion.HTTP_2, trailers), c));
}
}
}

View File

@ -822,13 +822,16 @@ public class HttpChannelState
// check the actions of the listeners
synchronized (this)
{
// If we are still async and nobody has called sendError
if (_requestState == RequestState.ASYNC && !_sendError)
// Then the listeners did not invoke API methods
// and the container must provide a default error dispatch.
{
// The listeners did not invoke API methods and the
// container must provide a default error dispatch.
sendError(th);
else
}
else if (_requestState != RequestState.COMPLETE)
{
LOG.warn("unhandled in state " + _requestState, new IllegalStateException(th));
}
}
}