Merged branch 'jetty-11.0.x' into 'jetty-12.0.x'.
Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
commit
e7ab35e749
|
@ -108,7 +108,8 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
|
|||
case OSHUTTING:
|
||||
if (!writeState.compareAndSet(current, WriteState.OSHUT))
|
||||
break;
|
||||
stream.data(new DataFrame(stream.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.from(this::oshutSuccess, this::oshutFailure));
|
||||
Callback oshutCallback = Callback.from(Invocable.InvocationType.NON_BLOCKING, this::oshutSuccess, this::oshutFailure);
|
||||
stream.data(new DataFrame(stream.getId(), BufferUtil.EMPTY_BUFFER, true), oshutCallback);
|
||||
return;
|
||||
case PENDING:
|
||||
if (!writeState.compareAndSet(current, WriteState.OSHUTTING))
|
||||
|
@ -176,7 +177,7 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
|
|||
if (closed.compareAndSet(false, true))
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("closing {}, cause: {}", this, cause);
|
||||
LOG.debug("closing {}", this, cause);
|
||||
Stream.Data data = this.data.getAndSet(null);
|
||||
if (data != null)
|
||||
data.release();
|
||||
|
@ -249,92 +250,34 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
|
|||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("flushing {} on {}", BufferUtil.toDetailString(buffers), this);
|
||||
if (buffers == null || buffers.length == 0)
|
||||
{
|
||||
if (buffers == null || buffers.length == 0 || remaining(buffers) == 0)
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
WriteState current = writeState.get();
|
||||
switch (current.state)
|
||||
{
|
||||
case IDLE:
|
||||
if (!writeState.compareAndSet(current, WriteState.PENDING))
|
||||
break;
|
||||
// We must copy the buffers because, differently from
|
||||
// write(), the semantic of flush() is that it does not
|
||||
// own them, but stream.data() needs to own them.
|
||||
ByteBuffer buffer = coalesce(buffers, true);
|
||||
Callback.Completable callback = new Callback.Completable(Invocable.InvocationType.NON_BLOCKING);
|
||||
stream.data(new DataFrame(stream.getId(), buffer, false), callback);
|
||||
callback.whenComplete((nothing, failure) ->
|
||||
{
|
||||
if (failure == null)
|
||||
flushSuccess();
|
||||
else
|
||||
flushFailure(failure);
|
||||
});
|
||||
return callback.isDone();
|
||||
case PENDING:
|
||||
return false;
|
||||
case OSHUTTING:
|
||||
case OSHUT:
|
||||
throw new EofException("Output shutdown");
|
||||
case FAILED:
|
||||
Throwable failure = current.failure;
|
||||
if (failure instanceof IOException)
|
||||
throw (IOException)failure;
|
||||
throw new IOException(failure);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void flushSuccess()
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
WriteState current = writeState.get();
|
||||
switch (current.state)
|
||||
{
|
||||
case IDLE:
|
||||
case OSHUT:
|
||||
throw new IllegalStateException();
|
||||
case PENDING:
|
||||
if (!writeState.compareAndSet(current, WriteState.IDLE))
|
||||
break;
|
||||
return;
|
||||
case OSHUTTING:
|
||||
shutdownOutput();
|
||||
return;
|
||||
case FAILED:
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Differently from other EndPoint implementations, where write() calls flush(),
|
||||
// in this implementation all the work is done in write(), and flush() is mostly
|
||||
// a no-operation.
|
||||
// This is because the flush() semantic is that it must not leave pending
|
||||
// operations if it cannot write the buffers; therefore we cannot call
|
||||
// stream.data() from flush() because if the stream is congested, the buffers
|
||||
// would not be fully written, we would return false from flush(), but
|
||||
// stream.data() would remain as a pending operation.
|
||||
|
||||
private void flushFailure(Throwable failure)
|
||||
{
|
||||
while (true)
|
||||
WriteState current = writeState.get();
|
||||
switch (current.state)
|
||||
{
|
||||
WriteState current = writeState.get();
|
||||
switch (current.state)
|
||||
case IDLE, PENDING ->
|
||||
{
|
||||
case IDLE:
|
||||
case OSHUT:
|
||||
throw new IllegalStateException();
|
||||
case PENDING:
|
||||
if (!writeState.compareAndSet(current, new WriteState(WriteState.State.FAILED, failure)))
|
||||
break;
|
||||
return;
|
||||
case OSHUTTING:
|
||||
shutdownOutput();
|
||||
return;
|
||||
case FAILED:
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
case OSHUTTING, OSHUT -> throw new EofException("Output shutdown");
|
||||
case FAILED ->
|
||||
{
|
||||
Throwable failure = current.failure;
|
||||
if (failure instanceof IOException)
|
||||
throw (IOException)failure;
|
||||
throw new IOException(failure);
|
||||
}
|
||||
default -> throw new IllegalStateException("Unexpected state: " + current.state);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -404,12 +347,14 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
|
|||
if (!writeState.compareAndSet(current, WriteState.PENDING))
|
||||
continue;
|
||||
// TODO: we really need a Stream primitive to write multiple frames.
|
||||
ByteBuffer result = coalesce(buffers, false);
|
||||
stream.data(new DataFrame(stream.getId(), result, false), Callback.from(() -> writeSuccess(callback), x -> writeFailure(x, callback)));
|
||||
ByteBuffer result = coalesce(buffers);
|
||||
Callback dataCallback = Callback.from(Invocable.getInvocationType(callback), () -> writeSuccess(callback), x -> writeFailure(x, callback));
|
||||
stream.data(new DataFrame(stream.getId(), result, false), dataCallback);
|
||||
}
|
||||
case PENDING -> callback.failed(new WritePendingException());
|
||||
case OSHUTTING, OSHUT -> callback.failed(new EofException("Output shutdown"));
|
||||
case FAILED -> callback.failed(current.failure);
|
||||
default -> callback.failed(new IllegalStateException("Unexpected state: " + current.state));
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
@ -436,6 +381,7 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
|
|||
shutdownOutput();
|
||||
}
|
||||
case FAILED -> callback.failed(current.failure);
|
||||
default -> callback.failed(new IllegalStateException("Unexpected state: " + current.state));
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
@ -448,35 +394,31 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
|
|||
WriteState current = writeState.get();
|
||||
switch (current.state)
|
||||
{
|
||||
case IDLE:
|
||||
case OSHUT:
|
||||
callback.failed(new IllegalStateException(failure));
|
||||
return;
|
||||
case PENDING:
|
||||
case OSHUTTING:
|
||||
case IDLE, OSHUT -> callback.failed(new IllegalStateException(failure));
|
||||
case PENDING, OSHUTTING ->
|
||||
{
|
||||
if (!writeState.compareAndSet(current, new WriteState(WriteState.State.FAILED, failure)))
|
||||
break;
|
||||
continue;
|
||||
callback.failed(failure);
|
||||
return;
|
||||
case FAILED:
|
||||
return;
|
||||
}
|
||||
case FAILED ->
|
||||
{
|
||||
// Already failed.
|
||||
}
|
||||
default -> callback.failed(new IllegalStateException("Unexpected state: " + current.state));
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
private long remaining(ByteBuffer... buffers)
|
||||
{
|
||||
long total = 0;
|
||||
for (ByteBuffer buffer : buffers)
|
||||
{
|
||||
total += buffer.remaining();
|
||||
}
|
||||
return total;
|
||||
return BufferUtil.remaining(buffers);
|
||||
}
|
||||
|
||||
private ByteBuffer coalesce(ByteBuffer[] buffers, boolean forceCopy)
|
||||
private ByteBuffer coalesce(ByteBuffer[] buffers)
|
||||
{
|
||||
if (buffers.length == 1 && !forceCopy)
|
||||
if (buffers.length == 1)
|
||||
return buffers[0];
|
||||
long capacity = remaining(buffers);
|
||||
if (capacity > Integer.MAX_VALUE)
|
||||
|
|
|
@ -277,7 +277,7 @@ public abstract class WriteFlusher
|
|||
if (buffers != null)
|
||||
{
|
||||
if (DEBUG)
|
||||
LOG.debug("flushed incomplete");
|
||||
LOG.debug("flush incomplete {}", this);
|
||||
PendingState pending = new PendingState(callback, address, buffers);
|
||||
if (updateState(__WRITING, pending))
|
||||
onIncompleteFlush();
|
||||
|
|
|
@ -22,9 +22,11 @@ import java.util.List;
|
|||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import org.eclipse.jetty.alpn.server.ALPNServerConnectionFactory;
|
||||
import org.eclipse.jetty.client.AbstractConnectionPool;
|
||||
import org.eclipse.jetty.client.ByteBufferRequestContent;
|
||||
import org.eclipse.jetty.client.ContentResponse;
|
||||
import org.eclipse.jetty.client.Destination;
|
||||
import org.eclipse.jetty.client.HttpClient;
|
||||
|
@ -34,6 +36,7 @@ import org.eclipse.jetty.client.transport.HttpClientConnectionFactory;
|
|||
import org.eclipse.jetty.client.transport.HttpClientTransportDynamic;
|
||||
import org.eclipse.jetty.http.HostPortHttpField;
|
||||
import org.eclipse.jetty.http.HttpFields;
|
||||
import org.eclipse.jetty.http.HttpMethod;
|
||||
import org.eclipse.jetty.http.HttpScheme;
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.http.HttpVersion;
|
||||
|
@ -52,6 +55,7 @@ import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
|
|||
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
|
||||
import org.eclipse.jetty.io.ClientConnectionFactory;
|
||||
import org.eclipse.jetty.io.ClientConnector;
|
||||
import org.eclipse.jetty.io.Content;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.server.ConnectionFactory;
|
||||
import org.eclipse.jetty.server.Handler;
|
||||
|
@ -285,6 +289,59 @@ public class ForwardProxyWithDynamicTransportTest
|
|||
assertEquals(1, connectionPool.getConnectionCount());
|
||||
}
|
||||
|
||||
@ParameterizedTest(name = "proxyProtocol={0}, proxySecure={1}, serverProtocol={2}, serverSecure={3}")
|
||||
@MethodSource("testParams")
|
||||
public void testProxyConcurrentLoad(Origin.Protocol proxyProtocol, boolean proxySecure, HttpVersion serverProtocol, boolean serverSecure) throws Exception
|
||||
{
|
||||
start(new Handler.Abstract()
|
||||
{
|
||||
@Override
|
||||
public boolean handle(Request request, Response response, Callback callback)
|
||||
{
|
||||
Content.copy(request, response, callback);
|
||||
return true;
|
||||
}
|
||||
});
|
||||
|
||||
int parallelism = 8;
|
||||
boolean proxyMultiplexed = proxyProtocol.getProtocols().stream().allMatch(p -> p.startsWith("h2"));
|
||||
client.setMaxConnectionsPerDestination(proxyMultiplexed ? 1 : parallelism);
|
||||
|
||||
int proxyPort = proxySecure ? proxyTLSConnector.getLocalPort() : proxyConnector.getLocalPort();
|
||||
Origin.Address proxyAddress = new Origin.Address("localhost", proxyPort);
|
||||
HttpProxy proxy = new HttpProxy(proxyAddress, proxySecure, proxyProtocol);
|
||||
client.getProxyConfiguration().addProxy(proxy);
|
||||
|
||||
String scheme = serverSecure ? "https" : "http";
|
||||
int serverPort = serverSecure ? serverTLSConnector.getLocalPort() : serverConnector.getLocalPort();
|
||||
int contentLength = 128 * 1024;
|
||||
|
||||
int iterations = 16;
|
||||
IntStream.range(0, parallelism).parallel().forEach(p ->
|
||||
IntStream.range(0, iterations).forEach(i ->
|
||||
{
|
||||
try
|
||||
{
|
||||
String id = p + "-" + i;
|
||||
ContentResponse response = client.newRequest("localhost", serverPort)
|
||||
.scheme(scheme)
|
||||
.method(HttpMethod.POST)
|
||||
.path("/path/" + id)
|
||||
.version(serverProtocol)
|
||||
.body(new ByteBufferRequestContent(ByteBuffer.allocate(contentLength)))
|
||||
.timeout(5, TimeUnit.SECONDS)
|
||||
.send();
|
||||
|
||||
assertEquals(HttpStatus.OK_200, response.getStatus());
|
||||
assertEquals(contentLength, response.getContent().length);
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
throw new RuntimeException(x);
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHTTP2TunnelClosedByClient() throws Exception
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue