Merge remote-tracking branch 'origin/jetty-12.0.x' into jetty-12.0.x-VirtualThreadPoolSemaphore
This commit is contained in:
commit
4fd306fcc2
|
@ -13,12 +13,34 @@
|
|||
|
||||
= Eclipse Jetty {page-version}
|
||||
|
||||
This section of the site contains the documentation for {page-component-title} {page-version}.
|
||||
This is the main documentation page for the Eclipse Jetty Project.
|
||||
|
||||
Jetty provides a highly scalable and memory-efficient web server and Servlet container, supporting web protocols such as HTTP/1.1, HTTP/2, HTTP/3 and WebSocket.
|
||||
Furthermore, Jetty offers integrations with many other technologies, such as OSGi, JMX, JNDI, JAAS, CDI, etc. and with the relevant Jakarta EE technologies.
|
||||
|
||||
Jetty is open source and are freely available for commercial use and distribution under either the link:https://www.eclipse.org/legal/epl-2.0/[Eclipse Public License v2] or the link:https://www.apache.org/licenses/LICENSE-2.0[Apache License v2].
|
||||
|
||||
Jetty can either be used as a standalone server to deploy web applications, or as a library that can be used in your code as a dependency.
|
||||
|
||||
.Jetty Versions and Compatibilities
|
||||
[cols="1a,1a,1a,1a", options="header"]
|
||||
|===
|
||||
| Jetty Version | Required Java Version | Jakarta EE Version | Status
|
||||
| Jetty 12.1.x | Java 17 | Jakarta EE11, EE10, EE9, EE8 | Development
|
||||
|
||||
| Jetty 12.0.x | Java 17 | Jakarta EE10, EE9, EE8 | Stable
|
||||
|
||||
| Jetty 11.0.x | Java 11 | Jakarta EE9 | EOL (see link:https://github.com/jetty/jetty.project/issues/10485[#10485])
|
||||
|
||||
| Jetty 10.0.x | Java 11 | Jakarta EE8 | EOL (see link:https://github.com/jetty/jetty.project/issues/10485[#10485])
|
||||
|
||||
| Jetty 9.4.x | Java 8 | Jakarta EE7 | EOL (see link:https://github.com/jetty/jetty.project/issues/7958[#7958])
|
||||
|===
|
||||
|
||||
== xref:operations-guide:index.adoc[]
|
||||
|
||||
The Eclipse Jetty Operations Guide targets sysops, devops, and developers who want to install Eclipse Jetty as a standalone server to deploy web applications.
|
||||
The Operations Guide targets sysops, devops, and developers who want to install Jetty as a standalone server to deploy web applications.
|
||||
|
||||
== xref:programming-guide:index.adoc[]
|
||||
|
||||
The Eclipse Jetty Programming Guide targets developers who want to use the Eclipse Jetty libraries in their applications, and advanced sysops/devops that want to customize the deployment of web applications.
|
||||
The Programming Guide targets developers who want to use the Jetty libraries in their applications, and advanced sysops/devops that want to customize the deployment of web applications.
|
||||
|
|
|
@ -17,6 +17,7 @@ import java.io.FileInputStream;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.net.SocketAddress;
|
||||
import java.net.URI;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.file.Path;
|
||||
|
@ -34,6 +35,7 @@ import org.eclipse.jetty.client.BasicAuthentication;
|
|||
import org.eclipse.jetty.client.BufferingResponseListener;
|
||||
import org.eclipse.jetty.client.BytesRequestContent;
|
||||
import org.eclipse.jetty.client.CompletableResponseListener;
|
||||
import org.eclipse.jetty.client.Connection;
|
||||
import org.eclipse.jetty.client.ConnectionPool;
|
||||
import org.eclipse.jetty.client.ContentResponse;
|
||||
import org.eclipse.jetty.client.Destination;
|
||||
|
@ -72,6 +74,7 @@ import org.eclipse.jetty.http3.client.transport.HttpClientTransportOverHTTP3;
|
|||
import org.eclipse.jetty.io.ClientConnectionFactory;
|
||||
import org.eclipse.jetty.io.ClientConnector;
|
||||
import org.eclipse.jetty.io.Content;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.io.Transport;
|
||||
import org.eclipse.jetty.io.ssl.SslHandshakeListener;
|
||||
import org.eclipse.jetty.quic.client.ClientQuicConfiguration;
|
||||
|
@ -1180,4 +1183,29 @@ public class HTTPClientDocs
|
|||
.send();
|
||||
// end::mixedTransports[]
|
||||
}
|
||||
|
||||
public void connectionInformation() throws Exception
|
||||
{
|
||||
// tag::connectionInformation[]
|
||||
HttpClient httpClient = new HttpClient();
|
||||
httpClient.start();
|
||||
|
||||
ContentResponse response = httpClient.newRequest("http://domain.com/path")
|
||||
// The connection information is only available starting from the request begin event.
|
||||
.onRequestBegin(request ->
|
||||
{
|
||||
Connection connection = request.getConnection();
|
||||
|
||||
// Obtain the address of the server.
|
||||
SocketAddress remoteAddress = connection.getRemoteSocketAddress();
|
||||
System.getLogger("connection").log(INFO, "Server address: %s", remoteAddress);
|
||||
|
||||
// Obtain the SslSessionData.
|
||||
EndPoint.SslSessionData sslSessionData = connection.getSslSessionData();
|
||||
if (sslSessionData != null)
|
||||
System.getLogger("connection").log(INFO, "SslSessionData: %s", sslSessionData);
|
||||
})
|
||||
.send();
|
||||
// end::connectionInformation[]
|
||||
}
|
||||
}
|
||||
|
|
|
@ -235,7 +235,12 @@ A second request with the same origin sent _after_ the first request/response cy
|
|||
A second request with the same origin sent _concurrently_ with the first request will likely cause the opening of a second connection, depending on the connection pool implementation.
|
||||
The configuration parameter `HttpClient.maxConnectionsPerDestination` (see also the <<configuration,configuration section>>) controls the max number of connections that can be opened for a destination.
|
||||
|
||||
NOTE: If opening connections to a given origin takes a long time, then requests for that origin will queue up in the corresponding destination until the connections are established.
|
||||
[NOTE]
|
||||
====
|
||||
If opening connections to a given origin takes a long time, then requests for that origin will queue up in the corresponding destination until the connections are established.
|
||||
|
||||
To save the time spent opening connections, you can xref:connection-pool-precreate-connections[pre-create connections].
|
||||
====
|
||||
|
||||
Each connection can handle a limited number of concurrent requests.
|
||||
For HTTP/1.1, this number is always `1`: there can only be one outstanding request for each connection.
|
||||
|
@ -528,6 +533,28 @@ This is a fancy example of how to mix HTTP versions and low-level transports:
|
|||
include::code:example$src/main/java/org/eclipse/jetty/docs/programming/client/http/HTTPClientDocs.java[tag=mixedTransports]
|
||||
----
|
||||
|
||||
[[connection-information]]
|
||||
=== Request Connection Information
|
||||
|
||||
In order to send a request, it is necessary to obtain a connection, as explained in the xref:request-processing[request processing section].
|
||||
|
||||
The HTTP/1.1 protocol may send only one request at a time on a single connection, while multiplexed protocols such as HTTP/2 may send many requests at a time on a single connection.
|
||||
|
||||
You can access the connection information, for example the local and remote `SocketAddress`, or the `SslSessionData` if the connection is secured, in the following way:
|
||||
|
||||
[,java,indent=0]
|
||||
----
|
||||
include::code:example$src/main/java/org/eclipse/jetty/docs/programming/client/http/HTTPClientDocs.java[tag=connectionInformation]
|
||||
----
|
||||
|
||||
[NOTE]
|
||||
====
|
||||
The connection information is only available when the request is associated with a connection.
|
||||
|
||||
This means that the connection is not available in the _request queued_ event, but only starting from the _request begin_ event.
|
||||
For more information about request events, see xref:non-blocking[this section].
|
||||
====
|
||||
|
||||
[[configuration]]
|
||||
== HttpClient Configuration
|
||||
|
||||
|
|
|
@ -23,3 +23,24 @@ You may use the xref:client/index.adoc[Jetty client-side library] in your applic
|
|||
Likewise, you may use the xref:server/index.adoc[Jetty server-side library] to quickly create an HTTP or REST service without having to create a web application archive file (a `+*.war+` file) and without having to deploy it to a Jetty standalone server that you would have to download and install.
|
||||
|
||||
This guide will walk you through the design of the Jetty libraries and how to use its classes to write your applications.
|
||||
|
||||
== Code Deprecation Policy
|
||||
|
||||
As the Jetty code evolves, classes and/or methods are deprecated using the `@Deprecated` annotation and will be removed in a future Jetty release.
|
||||
|
||||
The Jetty release numbering follows this scheme: `<major>.<minor>.<micro>`. For example, 12.0.5 has `major=12`, `minor=0` and `micro=5`.
|
||||
|
||||
As much as possible, deprecated code is not removed in micro releases.
|
||||
Deprecated code may be removed in major releases.
|
||||
Deprecated code may be removed in minor releases, but only if it has been deprecated for at least 6 micro releases.
|
||||
|
||||
For example, let's assume that Jetty 12.1.0 (a new minor release) is released after the release of Jetty 12.0.11.
|
||||
|
||||
Then, code that was deprecated in Jetty 12.0.5 or earlier may be removed from Jetty 12.1.0 (because it has been deprecated for more than 6 micro releases).
|
||||
|
||||
On the other hand, code that was deprecated in Jetty 12.0.8 may be removed in Jetty 12.1.3 (because it has been deprecated for 3 micro releases in Jetty 12.0.x, and for 3 micro releases in Jetty 12.1.x -- 12.1.0, 12.1.1 and 12.1.2).
|
||||
|
||||
[NOTE]
|
||||
====
|
||||
There could be rare cases where code (possibly not even deprecated) must be removed earlier than specified above to address security vulnerabilities.
|
||||
====
|
||||
|
|
|
@ -16,6 +16,7 @@ package org.eclipse.jetty.client;
|
|||
import java.io.Closeable;
|
||||
import java.net.SocketAddress;
|
||||
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.util.Promise;
|
||||
|
||||
/**
|
||||
|
@ -63,4 +64,13 @@ public interface Connection extends Closeable
|
|||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the {@link EndPoint.SslSessionData} associated with
|
||||
* the connection, or {@code null} if the connection is not secure.
|
||||
*/
|
||||
default EndPoint.SslSessionData getSslSessionData()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -346,6 +346,12 @@ public class HttpProxy extends ProxyConfiguration.Proxy
|
|||
return connection.getRemoteSocketAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public EndPoint.SslSessionData getSslSessionData()
|
||||
{
|
||||
return connection.getSslSessionData();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(Request request, Response.CompleteListener listener)
|
||||
{
|
||||
|
|
|
@ -116,6 +116,12 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements IConne
|
|||
return delegate.getRemoteSocketAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public EndPoint.SslSessionData getSslSessionData()
|
||||
{
|
||||
return delegate.getSslSessionData();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getBytesIn()
|
||||
{
|
||||
|
@ -350,6 +356,12 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements IConne
|
|||
return getEndPoint().getRemoteSocketAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public EndPoint.SslSessionData getSslSessionData()
|
||||
{
|
||||
return getEndPoint().getSslSessionData();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SendFailure send(HttpExchange exchange)
|
||||
{
|
||||
|
|
|
@ -101,6 +101,12 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
|
|||
return delegate.getRemoteSocketAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public EndPoint.SslSessionData getSslSessionData()
|
||||
{
|
||||
return delegate.getSslSessionData();
|
||||
}
|
||||
|
||||
protected Flusher getFlusher()
|
||||
{
|
||||
return flusher;
|
||||
|
@ -359,6 +365,12 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
|
|||
return getEndPoint().getRemoteSocketAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public EndPoint.SslSessionData getSslSessionData()
|
||||
{
|
||||
return getEndPoint().getSslSessionData();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SendFailure send(HttpExchange exchange)
|
||||
{
|
||||
|
|
|
@ -44,6 +44,7 @@ import org.eclipse.jetty.http2.HTTP2Session;
|
|||
import org.eclipse.jetty.http2.api.Session;
|
||||
import org.eclipse.jetty.http2.api.Stream;
|
||||
import org.eclipse.jetty.http2.frames.HeadersFrame;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.thread.Sweeper;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -85,6 +86,12 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S
|
|||
return session.getRemoteSocketAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public EndPoint.SslSessionData getSslSessionData()
|
||||
{
|
||||
return connection.getEndPoint().getSslSessionData();
|
||||
}
|
||||
|
||||
public boolean isRecycleHttpChannels()
|
||||
{
|
||||
return recycleHttpChannels;
|
||||
|
|
|
@ -95,8 +95,7 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
|
|||
public void onOpen()
|
||||
{
|
||||
Map<Integer, Integer> settings = listener.onPreface(getSession());
|
||||
if (settings == null)
|
||||
settings = new HashMap<>();
|
||||
settings = settings == null ? new HashMap<>() : new HashMap<>(settings);
|
||||
|
||||
// Below we want to populate any settings to send to the server
|
||||
// that have a different default than what prescribed by the RFC.
|
||||
|
|
|
@ -1248,6 +1248,11 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements Session
|
|||
this.stream = stream;
|
||||
}
|
||||
|
||||
public Frame frame()
|
||||
{
|
||||
return frame;
|
||||
}
|
||||
|
||||
public abstract int getFrameBytesGenerated();
|
||||
|
||||
public int getDataBytesRemaining()
|
||||
|
|
|
@ -15,6 +15,7 @@ package org.eclipse.jetty.http2.internal;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
@ -25,9 +26,11 @@ import java.util.List;
|
|||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
|
||||
import org.eclipse.jetty.http2.ErrorCode;
|
||||
import org.eclipse.jetty.http2.FlowControlStrategy;
|
||||
import org.eclipse.jetty.http2.HTTP2Session;
|
||||
import org.eclipse.jetty.http2.HTTP2Stream;
|
||||
import org.eclipse.jetty.http2.frames.FrameType;
|
||||
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
|
||||
import org.eclipse.jetty.http2.hpack.HpackException;
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
|
@ -92,10 +95,9 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
|
|||
entries.offerFirst(entry);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Prepended {}, entries={}", entry, entries.size());
|
||||
return true;
|
||||
}
|
||||
}
|
||||
if (closed == null)
|
||||
return true;
|
||||
closed(entry, closed);
|
||||
return false;
|
||||
}
|
||||
|
@ -106,15 +108,17 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
|
|||
try (AutoLock ignored = lock.lock())
|
||||
{
|
||||
closed = terminated;
|
||||
// If it was not possible to HPACK encode, then allow to send a GOAWAY.
|
||||
if (closed instanceof HpackException.SessionException && entry.frame().getType() == FrameType.GO_AWAY)
|
||||
closed = null;
|
||||
if (closed == null)
|
||||
{
|
||||
entries.offer(entry);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Appended {}, entries={}, {}", entry, entries.size(), this);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
if (closed == null)
|
||||
return true;
|
||||
closed(entry, closed);
|
||||
return false;
|
||||
}
|
||||
|
@ -130,10 +134,9 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
|
|||
list.forEach(entries::offer);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Appended {}, entries={} {}", list, entries.size(), this);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
if (closed == null)
|
||||
return true;
|
||||
list.forEach(entry -> closed(entry, closed));
|
||||
return false;
|
||||
}
|
||||
|
@ -163,7 +166,21 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
|
|||
try (AutoLock ignored = lock.lock())
|
||||
{
|
||||
if (terminated != null)
|
||||
throw terminated;
|
||||
{
|
||||
boolean rethrow = true;
|
||||
if (terminated instanceof HpackException.SessionException)
|
||||
{
|
||||
HTTP2Session.Entry entry = entries.peek();
|
||||
if (entry != null && entry.frame().getType() == FrameType.GO_AWAY)
|
||||
{
|
||||
// Allow a SessionException to be processed once to send a GOAWAY.
|
||||
terminated = new ClosedChannelException().initCause(terminated);
|
||||
rethrow = false;
|
||||
}
|
||||
}
|
||||
if (rethrow)
|
||||
throw terminated;
|
||||
}
|
||||
|
||||
WindowEntry windowEntry;
|
||||
while ((windowEntry = windows.poll()) != null)
|
||||
|
@ -248,6 +265,15 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
|
|||
entry.failed(failure);
|
||||
pending.remove();
|
||||
}
|
||||
catch (HpackException.SessionException failure)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Failure generating {}", entry, failure);
|
||||
onSessionFailure(failure);
|
||||
// The method above will try to send
|
||||
// a GOAWAY, so we will iterate again.
|
||||
return Action.IDLE;
|
||||
}
|
||||
catch (Throwable failure)
|
||||
{
|
||||
// Failure to generate the entry is catastrophic.
|
||||
|
@ -339,7 +365,23 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
|
|||
protected void onCompleteFailure(Throwable x)
|
||||
{
|
||||
accumulator.release();
|
||||
Throwable closed = fail(x);
|
||||
// If the failure came from within the
|
||||
// flusher, we need to close the connection.
|
||||
if (closed == null)
|
||||
session.onWriteFailure(x);
|
||||
}
|
||||
|
||||
private void onSessionFailure(Throwable x)
|
||||
{
|
||||
accumulator.release();
|
||||
Throwable closed = fail(x);
|
||||
if (closed == null)
|
||||
session.close(ErrorCode.COMPRESSION_ERROR.code, null, NOOP);
|
||||
}
|
||||
|
||||
private Throwable fail(Throwable x)
|
||||
{
|
||||
Throwable closed;
|
||||
Set<HTTP2Session.Entry> allEntries;
|
||||
try (AutoLock ignored = lock.lock())
|
||||
|
@ -361,11 +403,7 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
|
|||
allEntries.addAll(pendingEntries);
|
||||
pendingEntries.clear();
|
||||
allEntries.forEach(entry -> entry.failed(x));
|
||||
|
||||
// If the failure came from within the
|
||||
// flusher, we need to close the connection.
|
||||
if (closed == null)
|
||||
session.onWriteFailure(x);
|
||||
return closed;
|
||||
}
|
||||
|
||||
public void terminate(Throwable cause)
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.eclipse.jetty.http2.frames.HeadersFrame;
|
|||
import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
|
||||
import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory;
|
||||
import org.eclipse.jetty.io.AbstractEndPoint;
|
||||
import org.eclipse.jetty.io.Content;
|
||||
import org.eclipse.jetty.server.Handler;
|
||||
import org.eclipse.jetty.server.HttpConfiguration;
|
||||
import org.eclipse.jetty.server.Request;
|
||||
|
@ -45,7 +46,6 @@ import org.eclipse.jetty.util.Promise;
|
|||
import org.eclipse.jetty.util.component.LifeCycle;
|
||||
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.Tag;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||
|
@ -100,7 +100,6 @@ public class BlockedWritesWithSmallThreadPoolTest
|
|||
}
|
||||
|
||||
@Test
|
||||
@Tag("flaky")
|
||||
public void testServerThreadsBlockedInWrites() throws Exception
|
||||
{
|
||||
int contentLength = 16 * 1024 * 1024;
|
||||
|
@ -108,11 +107,12 @@ public class BlockedWritesWithSmallThreadPoolTest
|
|||
start(new Handler.Abstract()
|
||||
{
|
||||
@Override
|
||||
public boolean handle(Request request, Response response, Callback callback)
|
||||
public boolean handle(Request request, Response response, Callback callback) throws Exception
|
||||
{
|
||||
serverEndPointRef.compareAndSet(null, (AbstractEndPoint)request.getConnectionMetaData().getConnection().getEndPoint());
|
||||
// Write a large content to cause TCP congestion.
|
||||
response.write(true, ByteBuffer.wrap(new byte[contentLength]), callback);
|
||||
// Blocking write a large content to cause TCP congestion.
|
||||
Content.Sink.write(response, true, ByteBuffer.wrap(new byte[contentLength]));
|
||||
callback.succeeded();
|
||||
return true;
|
||||
}
|
||||
});
|
||||
|
@ -138,21 +138,20 @@ public class BlockedWritesWithSmallThreadPoolTest
|
|||
@Override
|
||||
public void onDataAvailable(Stream stream)
|
||||
{
|
||||
Stream.Data data = stream.readData();
|
||||
try
|
||||
{
|
||||
// Block here to stop reading from the network
|
||||
// to cause the server to TCP congest.
|
||||
clientBlockLatch.await(5, SECONDS);
|
||||
Stream.Data data = stream.readData();
|
||||
data.release();
|
||||
if (data.frame().isEndStream())
|
||||
clientDataLatch.countDown();
|
||||
else
|
||||
stream.demand();
|
||||
}
|
||||
catch (InterruptedException x)
|
||||
catch (InterruptedException ignored)
|
||||
{
|
||||
data.release();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
@ -172,18 +171,139 @@ public class BlockedWritesWithSmallThreadPoolTest
|
|||
await().atMost(5, SECONDS).until(() -> serverThreads.getAvailableReservedThreads() == 1);
|
||||
}
|
||||
// Use the reserved thread for a blocking operation, simulating another blocking write.
|
||||
long delaySeconds = 10;
|
||||
CountDownLatch serverBlockLatch = new CountDownLatch(1);
|
||||
assertTrue(serverThreads.tryExecute(() -> await().atMost(20, SECONDS).until(() -> serverBlockLatch.await(15, SECONDS), b -> true)));
|
||||
assertTrue(serverThreads.tryExecute(() ->
|
||||
{
|
||||
try
|
||||
{
|
||||
serverBlockLatch.await(2 * delaySeconds, SECONDS);
|
||||
}
|
||||
catch (InterruptedException ignored)
|
||||
{
|
||||
}
|
||||
}));
|
||||
|
||||
// No more threads are available on the server.
|
||||
assertEquals(0, serverThreads.getReadyThreads());
|
||||
|
||||
// Unblock the client to read from the network, which should unblock the server write().
|
||||
clientBlockLatch.countDown();
|
||||
|
||||
assertTrue(clientDataLatch.await(10, SECONDS), server.dump());
|
||||
assertTrue(clientDataLatch.await(delaySeconds, SECONDS), server.dump());
|
||||
|
||||
// Unblock blocked threads.
|
||||
serverBlockLatch.countDown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testServerThreadsInPendingWrites() throws Exception
|
||||
{
|
||||
int contentLength = 16 * 1024 * 1024;
|
||||
AtomicReference<AbstractEndPoint> serverEndPointRef = new AtomicReference<>();
|
||||
start(new Handler.Abstract()
|
||||
{
|
||||
@Override
|
||||
public boolean handle(Request request, Response response, Callback callback)
|
||||
{
|
||||
serverEndPointRef.set((AbstractEndPoint)request.getConnectionMetaData().getConnection().getEndPoint());
|
||||
// Large write that will TCP congest, but it is non-blocking.
|
||||
response.write(true, ByteBuffer.allocate(contentLength), callback);
|
||||
return true;
|
||||
}
|
||||
});
|
||||
|
||||
client = new HTTP2Client();
|
||||
// Set large flow control windows so the server hits TCP congestion.
|
||||
int window = 2 * contentLength;
|
||||
client.setInitialSessionRecvWindow(window);
|
||||
client.setInitialStreamRecvWindow(window);
|
||||
client.start();
|
||||
|
||||
CountDownLatch clientBlockLatch = new CountDownLatch(1);
|
||||
CountDownLatch clientDataLatch = new CountDownLatch(1);
|
||||
Session session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Listener() {})
|
||||
.get(5, SECONDS);
|
||||
HttpURI uri = HttpURI.build("http://localhost:" + connector.getLocalPort() + "/congest");
|
||||
MetaData.Request request = new MetaData.Request("GET", uri, HttpVersion.HTTP_2, HttpFields.EMPTY);
|
||||
session.newStream(new HeadersFrame(request, null, true), new Stream.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onDataAvailable(Stream stream)
|
||||
{
|
||||
try
|
||||
{
|
||||
// Block here to stop reading from the network
|
||||
// to cause the server to TCP congest.
|
||||
clientBlockLatch.await(5, SECONDS);
|
||||
Stream.Data data = stream.readData();
|
||||
data.release();
|
||||
if (data.frame().isEndStream())
|
||||
clientDataLatch.countDown();
|
||||
else
|
||||
stream.demand();
|
||||
}
|
||||
catch (InterruptedException ignored)
|
||||
{
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
await().atMost(5, SECONDS).until(() ->
|
||||
{
|
||||
AbstractEndPoint serverEndPoint = serverEndPointRef.get();
|
||||
return serverEndPoint != null && serverEndPoint.getWriteFlusher().isPending();
|
||||
});
|
||||
// Wait for NIO on the server to be OP_WRITE interested.
|
||||
Thread.sleep(1000);
|
||||
|
||||
// Handler.handle() should have returned, make sure we block that thread.
|
||||
long delaySeconds = 10;
|
||||
await().atMost(5, SECONDS).until(() -> serverThreads.getIdleThreads() == 1);
|
||||
CountDownLatch serverBlockLatch = new CountDownLatch(1);
|
||||
serverThreads.execute(() ->
|
||||
{
|
||||
try
|
||||
{
|
||||
serverBlockLatch.await(2 * delaySeconds, SECONDS);
|
||||
}
|
||||
catch (InterruptedException ignored)
|
||||
{
|
||||
}
|
||||
});
|
||||
|
||||
// Make sure there is a reserved thread.
|
||||
if (serverThreads.getAvailableReservedThreads() != 1)
|
||||
{
|
||||
assertFalse(serverThreads.tryExecute(() -> {}));
|
||||
await().atMost(5, SECONDS).until(() -> serverThreads.getAvailableReservedThreads() == 1);
|
||||
}
|
||||
// Use the reserved thread for a blocking operation, simulating another blocking write.
|
||||
CountDownLatch reservedBlockLatch = new CountDownLatch(1);
|
||||
assertTrue(serverThreads.tryExecute(() ->
|
||||
{
|
||||
try
|
||||
{
|
||||
reservedBlockLatch.await(2 * delaySeconds, SECONDS);
|
||||
}
|
||||
catch (InterruptedException ignored)
|
||||
{
|
||||
}
|
||||
}));
|
||||
|
||||
// No more threads are available on the server.
|
||||
assertEquals(0, serverThreads.getReadyThreads());
|
||||
|
||||
// Unblock the client to read from the network, which must unblock the server write() and send a response.
|
||||
clientBlockLatch.countDown();
|
||||
|
||||
assertTrue(clientDataLatch.await(delaySeconds, SECONDS), server.dump());
|
||||
|
||||
// Unblock blocked threads.
|
||||
serverBlockLatch.countDown();
|
||||
reservedBlockLatch.countDown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClientThreadsBlockedInWrite() throws Exception
|
||||
{
|
||||
|
@ -200,12 +320,12 @@ public class BlockedWritesWithSmallThreadPoolTest
|
|||
@Override
|
||||
public void onDataAvailable(Stream stream)
|
||||
{
|
||||
Stream.Data data = stream.readData();
|
||||
try
|
||||
{
|
||||
// Block here to stop reading from the network
|
||||
// to cause the client to TCP congest.
|
||||
serverBlockLatch.await(5, SECONDS);
|
||||
Stream.Data data = stream.readData();
|
||||
data.release();
|
||||
if (data.frame().isEndStream())
|
||||
{
|
||||
|
@ -217,9 +337,8 @@ public class BlockedWritesWithSmallThreadPoolTest
|
|||
stream.demand();
|
||||
}
|
||||
}
|
||||
catch (InterruptedException x)
|
||||
catch (InterruptedException ignored)
|
||||
{
|
||||
data.release();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
@ -277,14 +396,27 @@ public class BlockedWritesWithSmallThreadPoolTest
|
|||
await().atMost(5, SECONDS).until(() -> clientThreads.getAvailableReservedThreads() == 1);
|
||||
}
|
||||
// Use the reserved thread for a blocking operation, simulating another blocking write.
|
||||
assertTrue(clientThreads.tryExecute(() -> await().until(() -> clientBlockLatch.await(15, SECONDS), b -> true)));
|
||||
long delaySeconds = 10;
|
||||
assertTrue(clientThreads.tryExecute(() ->
|
||||
{
|
||||
try
|
||||
{
|
||||
clientBlockLatch.await(2 * delaySeconds, SECONDS);
|
||||
}
|
||||
catch (InterruptedException ignored)
|
||||
{
|
||||
}
|
||||
}));
|
||||
|
||||
// No more threads are available on the client.
|
||||
await().atMost(5, SECONDS).until(() -> clientThreads.getReadyThreads() == 0);
|
||||
|
||||
// Unblock the server to read from the network, which should unblock the client.
|
||||
serverBlockLatch.countDown();
|
||||
|
||||
assertTrue(latch.await(10, SECONDS), client.dump());
|
||||
assertTrue(latch.await(delaySeconds, SECONDS), client.dump());
|
||||
|
||||
// Unblock blocked threads.
|
||||
clientBlockLatch.countDown();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -428,11 +428,20 @@ public class HTTP2ServerTest extends AbstractServerTest
|
|||
}
|
||||
output.flush();
|
||||
|
||||
AtomicBoolean goAway = new AtomicBoolean();
|
||||
Parser parser = new Parser(bufferPool, 8192);
|
||||
parser.init(new Parser.Listener() {});
|
||||
parser.init(new Parser.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onGoAway(GoAwayFrame frame)
|
||||
{
|
||||
goAway.set(true);
|
||||
}
|
||||
});
|
||||
boolean closed = parseResponse(client, parser);
|
||||
|
||||
assertTrue(closed);
|
||||
assertFalse(closed);
|
||||
assertTrue(goAway.get());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,7 +16,9 @@ package org.eclipse.jetty.http2.tests;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
|
@ -37,9 +39,17 @@ import org.eclipse.jetty.http2.frames.SettingsFrame;
|
|||
import org.eclipse.jetty.http2.hpack.HpackException;
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.awaitility.Awaitility.await;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class SettingsTest extends AbstractTest
|
||||
{
|
||||
@Test
|
||||
|
@ -107,11 +117,11 @@ public class SettingsTest extends AbstractTest
|
|||
.flip();
|
||||
((HTTP2Session)clientSession).getEndPoint().write(Callback.NOOP, byteBuffer);
|
||||
|
||||
Assertions.assertFalse(serverSettingsLatch.get().await(1, TimeUnit.SECONDS));
|
||||
Assertions.assertTrue(serverFailureLatch.await(5, TimeUnit.SECONDS));
|
||||
Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
|
||||
Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||
Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
|
||||
assertFalse(serverSettingsLatch.get().await(1, TimeUnit.SECONDS));
|
||||
assertTrue(serverFailureLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -181,11 +191,11 @@ public class SettingsTest extends AbstractTest
|
|||
.flip();
|
||||
((HTTP2Session)clientSession).getEndPoint().write(Callback.NOOP, byteBuffer);
|
||||
|
||||
Assertions.assertFalse(serverSettingsLatch.get().await(1, TimeUnit.SECONDS));
|
||||
Assertions.assertTrue(serverFailureLatch.await(5, TimeUnit.SECONDS));
|
||||
Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
|
||||
Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||
Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
|
||||
assertFalse(serverSettingsLatch.get().await(1, TimeUnit.SECONDS));
|
||||
assertTrue(serverFailureLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -213,7 +223,7 @@ public class SettingsTest extends AbstractTest
|
|||
}
|
||||
});
|
||||
|
||||
Assertions.assertTrue(serverFailureLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(serverFailureLatch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -242,7 +252,7 @@ public class SettingsTest extends AbstractTest
|
|||
}
|
||||
});
|
||||
|
||||
Assertions.assertTrue(clientFailureLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(clientFailureLatch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -304,9 +314,9 @@ public class SettingsTest extends AbstractTest
|
|||
}
|
||||
});
|
||||
|
||||
Assertions.assertTrue(serverPushFailureLatch.await(5, TimeUnit.SECONDS));
|
||||
Assertions.assertTrue(clientResponseLatch.await(5, TimeUnit.SECONDS));
|
||||
Assertions.assertFalse(clientPushLatch.await(1, TimeUnit.SECONDS));
|
||||
assertTrue(serverPushFailureLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(clientResponseLatch.await(5, TimeUnit.SECONDS));
|
||||
assertFalse(clientPushLatch.await(1, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -361,6 +371,122 @@ public class SettingsTest extends AbstractTest
|
|||
HeadersFrame frame = new HeadersFrame(request, null, true);
|
||||
clientSession.newStream(frame, Stream.Listener.AUTO_DISCARD);
|
||||
|
||||
Assertions.assertTrue(clientFailureLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(clientFailureLatch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaxHeaderListSizeExceededServerSendsGoAway() throws Exception
|
||||
{
|
||||
int maxHeadersSize = 512;
|
||||
start(new ServerSessionListener()
|
||||
{
|
||||
@Override
|
||||
public void onSettings(Session session, SettingsFrame frame)
|
||||
{
|
||||
((HTTP2Session)session).getParser().getHpackDecoder().setMaxHeaderListSize(maxHeadersSize);
|
||||
}
|
||||
});
|
||||
|
||||
CountDownLatch goAwayLatch = new CountDownLatch(1);
|
||||
Session clientSession = newClientSession(new Session.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onGoAway(Session session, GoAwayFrame frame)
|
||||
{
|
||||
goAwayLatch.countDown();
|
||||
}
|
||||
});
|
||||
HttpFields requestHeaders = HttpFields.build()
|
||||
.put("X-Large", "x".repeat(maxHeadersSize * 2));
|
||||
MetaData.Request request = newRequest("GET", requestHeaders);
|
||||
HeadersFrame frame = new HeadersFrame(request, null, true);
|
||||
Stream stream = clientSession.newStream(frame, new Stream.Listener() {}).get(5, TimeUnit.SECONDS);
|
||||
|
||||
// The request can be sent by the client, the server will reject it.
|
||||
// The spec suggests to send 431, but we do not want to "taint" the
|
||||
// HPACK context with large headers.
|
||||
assertNotNull(stream);
|
||||
|
||||
// The server should send a GOAWAY.
|
||||
assertTrue(goAwayLatch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaxHeaderListSizeExceededByClient() throws Exception
|
||||
{
|
||||
int maxHeadersSize = 512;
|
||||
CountDownLatch goAwayLatch = new CountDownLatch(1);
|
||||
start(new ServerSessionListener()
|
||||
{
|
||||
@Override
|
||||
public Map<Integer, Integer> onPreface(Session session)
|
||||
{
|
||||
return Map.of(SettingsFrame.MAX_HEADER_LIST_SIZE, maxHeadersSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onGoAway(Session session, GoAwayFrame frame)
|
||||
{
|
||||
goAwayLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
Session clientSession = newClientSession(new Session.Listener() {});
|
||||
HttpFields requestHeaders = HttpFields.build()
|
||||
.put("X-Large", "x".repeat(maxHeadersSize * 2));
|
||||
MetaData.Request request = newRequest("GET", requestHeaders);
|
||||
HeadersFrame frame = new HeadersFrame(request, null, true);
|
||||
|
||||
Throwable failure = assertThrows(ExecutionException.class,
|
||||
() -> clientSession.newStream(frame, new Stream.Listener() {}).get(5, TimeUnit.SECONDS))
|
||||
.getCause();
|
||||
// The HPACK context is compromised trying to encode the large header.
|
||||
assertThat(failure, Matchers.instanceOf(HpackException.SessionException.class));
|
||||
|
||||
assertTrue(goAwayLatch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaxHeaderListSizeExceededByServer() throws Exception
|
||||
{
|
||||
int maxHeadersSize = 512;
|
||||
AtomicReference<CompletableFuture<Stream>> responseRef = new AtomicReference<>();
|
||||
start(new ServerSessionListener()
|
||||
{
|
||||
@Override
|
||||
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
HttpFields responseHeaders = HttpFields.build()
|
||||
.put("X-Large", "x".repeat(maxHeadersSize * 2));
|
||||
MetaData.Response response = new MetaData.Response(HttpStatus.OK_200, null, HttpVersion.HTTP_2, responseHeaders);
|
||||
responseRef.set(stream.headers(new HeadersFrame(stream.getId(), response, null, true)));
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
CountDownLatch goAwayLatch = new CountDownLatch(1);
|
||||
Session clientSession = newClientSession(new Session.Listener()
|
||||
{
|
||||
@Override
|
||||
public Map<Integer, Integer> onPreface(Session session)
|
||||
{
|
||||
return Map.of(SettingsFrame.MAX_HEADER_LIST_SIZE, maxHeadersSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onGoAway(Session session, GoAwayFrame frame)
|
||||
{
|
||||
goAwayLatch.countDown();
|
||||
}
|
||||
});
|
||||
MetaData.Request request = newRequest("GET", HttpFields.EMPTY);
|
||||
HeadersFrame frame = new HeadersFrame(request, null, true);
|
||||
clientSession.newStream(frame, new Stream.Listener() {});
|
||||
|
||||
CompletableFuture<Stream> completable = await().atMost(5, TimeUnit.SECONDS).until(responseRef::get, notNullValue());
|
||||
Throwable failure = assertThrows(ExecutionException.class, () -> completable.get(5, TimeUnit.SECONDS)).getCause();
|
||||
assertThat(failure, Matchers.instanceOf(HpackException.SessionException.class));
|
||||
|
||||
assertTrue(goAwayLatch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,6 +30,8 @@ import org.eclipse.jetty.client.transport.HttpRequest;
|
|||
import org.eclipse.jetty.client.transport.SendFailure;
|
||||
import org.eclipse.jetty.http.HttpVersion;
|
||||
import org.eclipse.jetty.http3.client.HTTP3SessionClient;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.quic.common.QuicSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -64,6 +66,13 @@ public class HttpConnectionOverHTTP3 extends HttpConnection implements Connectio
|
|||
return session.getRemoteSocketAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public EndPoint.SslSessionData getSslSessionData()
|
||||
{
|
||||
QuicSession quicSession = getSession().getProtocolSession().getQuicSession();
|
||||
return EndPoint.SslSessionData.from(null, null, null, quicSession.getPeerCertificates());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMaxMultiplex()
|
||||
{
|
||||
|
|
|
@ -135,6 +135,12 @@ public interface HttpStream extends Callback
|
|||
return CONTENT_NOT_CONSUMED;
|
||||
}
|
||||
|
||||
@Override
|
||||
default InvocationType getInvocationType()
|
||||
{
|
||||
return InvocationType.NON_BLOCKING;
|
||||
}
|
||||
|
||||
class Wrapper implements HttpStream
|
||||
{
|
||||
private final HttpStream _wrapped;
|
||||
|
|
|
@ -1636,7 +1636,6 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
@Override
|
||||
public InvocationType getInvocationType()
|
||||
{
|
||||
// TODO review this as it is probably not correct
|
||||
return _request.getHttpStream().getInvocationType();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1610,12 +1610,6 @@ public class HttpConnection extends AbstractMetaDataConnection implements Runnab
|
|||
{
|
||||
getEndPoint().close(failure);
|
||||
}
|
||||
|
||||
@Override
|
||||
public InvocationType getInvocationType()
|
||||
{
|
||||
return HttpStream.super.getInvocationType();
|
||||
}
|
||||
}
|
||||
|
||||
private class TunnelSupportOverHTTP1 implements TunnelSupport
|
||||
|
|
|
@ -115,7 +115,7 @@ public class QoSHandlerTest
|
|||
endPoints.add(endPoint);
|
||||
|
||||
assertEquals(maxRequests, callbacks.size());
|
||||
await().atMost(5, TimeUnit.SECONDS).until(qosHandler::getSuspendedRequestCount, is(1L));
|
||||
await().atMost(5, TimeUnit.SECONDS).until(qosHandler::getSuspendedRequestCount, is(1));
|
||||
|
||||
// Finish and verify the waiting requests.
|
||||
List<Callback> copy = List.copyOf(callbacks);
|
||||
|
@ -130,7 +130,7 @@ public class QoSHandlerTest
|
|||
}
|
||||
|
||||
// The suspended request should have been resumed.
|
||||
await().atMost(5, TimeUnit.SECONDS).until(qosHandler::getSuspendedRequestCount, is(0L));
|
||||
await().atMost(5, TimeUnit.SECONDS).until(qosHandler::getSuspendedRequestCount, is(0));
|
||||
await().atMost(5, TimeUnit.SECONDS).until(callbacks::size, is(1));
|
||||
|
||||
// Finish the resumed request that is now waiting.
|
||||
|
@ -175,11 +175,11 @@ public class QoSHandlerTest
|
|||
Host: localhost
|
||||
|
||||
""");
|
||||
await().atMost(5, TimeUnit.SECONDS).until(qosHandler::getSuspendedRequestCount, is(1L));
|
||||
await().atMost(5, TimeUnit.SECONDS).until(qosHandler::getSuspendedRequestCount, is(1));
|
||||
|
||||
// Do not succeed the callback of the first request.
|
||||
// Wait for the second request to time out.
|
||||
await().atMost(2 * timeout, TimeUnit.MILLISECONDS).until(qosHandler::getSuspendedRequestCount, is(0L));
|
||||
await().atMost(2 * timeout, TimeUnit.MILLISECONDS).until(qosHandler::getSuspendedRequestCount, is(0));
|
||||
|
||||
String text = endPoint1.getResponse(false, 5, TimeUnit.SECONDS);
|
||||
HttpTester.Response response = HttpTester.parseResponse(text);
|
||||
|
|
|
@ -33,6 +33,7 @@ import java.util.stream.IntStream;
|
|||
|
||||
import org.eclipse.jetty.client.BytesRequestContent;
|
||||
import org.eclipse.jetty.client.CompletableResponseListener;
|
||||
import org.eclipse.jetty.client.Connection;
|
||||
import org.eclipse.jetty.client.ContentResponse;
|
||||
import org.eclipse.jetty.client.Destination;
|
||||
import org.eclipse.jetty.client.InputStreamResponseListener;
|
||||
|
@ -45,6 +46,7 @@ import org.eclipse.jetty.http.HttpStatus;
|
|||
import org.eclipse.jetty.http.HttpURI;
|
||||
import org.eclipse.jetty.http2.FlowControlStrategy;
|
||||
import org.eclipse.jetty.io.Content;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.server.Handler;
|
||||
import org.eclipse.jetty.server.NetworkConnector;
|
||||
import org.eclipse.jetty.server.Request;
|
||||
|
@ -1024,9 +1026,19 @@ public class HttpClientTest extends AbstractTest
|
|||
ContentResponse response = client.newRequest(newURI(transport))
|
||||
.onRequestBegin(r ->
|
||||
{
|
||||
if (r.getConnection() == null)
|
||||
Connection connection = r.getConnection();
|
||||
if (connection == null)
|
||||
r.abort(new IllegalStateException());
|
||||
})
|
||||
.onRequestHeaders(r ->
|
||||
{
|
||||
if (transport.isSecure())
|
||||
{
|
||||
EndPoint.SslSessionData sslSessionData = r.getConnection().getSslSessionData();
|
||||
if (sslSessionData == null)
|
||||
r.abort(new IllegalStateException());
|
||||
}
|
||||
})
|
||||
.send();
|
||||
|
||||
assertEquals(200, response.getStatus());
|
||||
|
|
Loading…
Reference in New Issue