Merge remote-tracking branch 'origin/jetty-9.4.x' into jetty-9.4.x-2695-hpack-compliance
This commit is contained in:
commit
a890dbfc01
|
@ -18,13 +18,16 @@
|
|||
|
||||
package org.eclipse.jetty.http;
|
||||
|
||||
import static org.hamcrest.Matchers.contains;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.hamcrest.Matchers.contains;
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
||||
public class QuotedQualityCSVTest
|
||||
{
|
||||
|
||||
|
@ -310,4 +313,45 @@ public class QuotedQualityCSVTest
|
|||
Assert.assertThat(values.getValues(),Matchers.contains("one","two","three;x=y"));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testQuality()
|
||||
{
|
||||
List<String> results = new ArrayList<>();
|
||||
|
||||
QuotedQualityCSV values = new QuotedQualityCSV()
|
||||
{
|
||||
@Override
|
||||
protected void parsedValue(StringBuffer buffer)
|
||||
{
|
||||
results.add("parsedValue: " + buffer.toString());
|
||||
|
||||
super.parsedValue(buffer);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void parsedParam(StringBuffer buffer, int valueLength, int paramName, int paramValue)
|
||||
{
|
||||
String param = buffer.substring(paramName, buffer.length());
|
||||
results.add("parsedParam: " + param);
|
||||
|
||||
super.parsedParam(buffer, valueLength, paramName, paramValue);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
// The provided string is not legal according to some RFCs ( not a token because of = and not a parameter because not preceded by ; )
|
||||
// The string is legal according to RFC7239 which allows for just parameters (called forwarded-pairs)
|
||||
values.addValue("p=0.5,q=0.5");
|
||||
|
||||
|
||||
// The QuotedCSV implementation is lenient and adopts the later interpretation and thus sees q=0.5 and p=0.5 both as parameters
|
||||
assertThat(results,contains("parsedValue: ", "parsedParam: p=0.5",
|
||||
"parsedValue: ", "parsedParam: q=0.5"));
|
||||
|
||||
|
||||
// However the QuotedQualityCSV only handles the q parameter and that is consumed from the parameter string.
|
||||
assertThat(values,contains("p=0.5", ""));
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -66,6 +66,7 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
|
|||
Generator generator = new Generator(byteBufferPool);
|
||||
FlowControlStrategy flowControl = client.getFlowControlStrategyFactory().newFlowControlStrategy();
|
||||
HTTP2ClientSession session = new HTTP2ClientSession(scheduler, endPoint, generator, listener, flowControl);
|
||||
|
||||
Parser parser = new Parser(byteBufferPool, session, 4096, 8192);
|
||||
|
||||
HTTP2ClientConnection connection = new HTTP2ClientConnection(client, byteBufferPool, executor, endPoint,
|
||||
|
|
|
@ -20,7 +20,10 @@ package org.eclipse.jetty.http2.client.http;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicMarkableReference;
|
||||
|
||||
import org.eclipse.jetty.alpn.client.ALPNClientConnectionFactory;
|
||||
import org.eclipse.jetty.client.AbstractHttpClientTransport;
|
||||
|
@ -135,7 +138,12 @@ public class HttpClientTransportOverHTTP2 extends AbstractHttpClientTransport
|
|||
if (HttpScheme.HTTPS.is(destination.getScheme()))
|
||||
sslContextFactory = httpClient.getSslContextFactory();
|
||||
|
||||
client.connect(sslContextFactory, address, listenerPromise, listenerPromise, context);
|
||||
connect(sslContextFactory, address, listenerPromise, listenerPromise, context);
|
||||
}
|
||||
|
||||
protected void connect(SslContextFactory sslContextFactory, InetSocketAddress address, Session.Listener listener, Promise<Session> promise, Map<String, Object> context)
|
||||
{
|
||||
getHTTP2Client().connect(sslContextFactory, address, listener, promise, context);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -164,8 +172,8 @@ public class HttpClientTransportOverHTTP2 extends AbstractHttpClientTransport
|
|||
|
||||
private class SessionListenerPromise extends Session.Listener.Adapter implements Promise<Session>
|
||||
{
|
||||
private final AtomicMarkableReference<HttpConnectionOverHTTP2> connection = new AtomicMarkableReference<>(null, false);
|
||||
private final Map<String, Object> context;
|
||||
private HttpConnectionOverHTTP2 connection;
|
||||
|
||||
private SessionListenerPromise(Map<String, Object> context)
|
||||
{
|
||||
|
@ -175,14 +183,15 @@ public class HttpClientTransportOverHTTP2 extends AbstractHttpClientTransport
|
|||
@Override
|
||||
public void succeeded(Session session)
|
||||
{
|
||||
connection = newHttpConnection(destination(), session);
|
||||
promise().succeeded(connection);
|
||||
// This method is invoked when the client preface
|
||||
// is sent, but we want to succeed the nested
|
||||
// promise when the server preface is received.
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable failure)
|
||||
{
|
||||
promise().failed(failure);
|
||||
failConnectionPromise(failure);
|
||||
}
|
||||
|
||||
private HttpDestinationOverHTTP2 destination()
|
||||
|
@ -191,7 +200,7 @@ public class HttpClientTransportOverHTTP2 extends AbstractHttpClientTransport
|
|||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private Promise<Connection> promise()
|
||||
private Promise<Connection> connectionPromise()
|
||||
{
|
||||
return (Promise<Connection>)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
|
||||
}
|
||||
|
@ -202,26 +211,55 @@ public class HttpClientTransportOverHTTP2 extends AbstractHttpClientTransport
|
|||
Map<Integer, Integer> settings = frame.getSettings();
|
||||
if (settings.containsKey(SettingsFrame.MAX_CONCURRENT_STREAMS))
|
||||
destination().setMaxRequestsPerConnection(settings.get(SettingsFrame.MAX_CONCURRENT_STREAMS));
|
||||
if (!connection.isMarked())
|
||||
onServerPreface(session);
|
||||
}
|
||||
|
||||
private void onServerPreface(Session session)
|
||||
{
|
||||
HttpConnectionOverHTTP2 connection = newHttpConnection(destination(), session);
|
||||
if (this.connection.compareAndSet(null, connection, false, true))
|
||||
connectionPromise().succeeded(connection);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClose(Session session, GoAwayFrame frame)
|
||||
{
|
||||
if (failConnectionPromise(new ClosedChannelException()))
|
||||
return;
|
||||
HttpConnectionOverHTTP2 connection = this.connection.getReference();
|
||||
if (connection != null)
|
||||
HttpClientTransportOverHTTP2.this.onClose(connection, frame);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean onIdleTimeout(Session session)
|
||||
{
|
||||
return connection.onIdleTimeout(((HTTP2Session)session).getEndPoint().getIdleTimeout());
|
||||
long idleTimeout = ((HTTP2Session)session).getEndPoint().getIdleTimeout();
|
||||
if (failConnectionPromise(new TimeoutException("Idle timeout expired: " + idleTimeout + " ms")))
|
||||
return true;
|
||||
HttpConnectionOverHTTP2 connection = this.connection.getReference();
|
||||
if (connection != null)
|
||||
return connection.onIdleTimeout(idleTimeout);
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Session session, Throwable failure)
|
||||
{
|
||||
HttpConnectionOverHTTP2 c = connection;
|
||||
if (c != null)
|
||||
c.close(failure);
|
||||
if (failConnectionPromise(failure))
|
||||
return;
|
||||
HttpConnectionOverHTTP2 connection = this.connection.getReference();
|
||||
if (connection != null)
|
||||
connection.close(failure);
|
||||
}
|
||||
|
||||
private boolean failConnectionPromise(Throwable failure)
|
||||
{
|
||||
boolean result = connection.compareAndSet(null, null, false, true);
|
||||
if (result)
|
||||
connectionPromise().failed(failure);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -477,22 +477,33 @@ public class HttpClientTransportOverHTTP2Test extends AbstractTest
|
|||
ServerParser parser = new ServerParser(byteBufferPool, new ServerParser.Listener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public void onHeaders(HeadersFrame request)
|
||||
public void onPreface()
|
||||
{
|
||||
// Server's preface.
|
||||
generator.control(lease, new SettingsFrame(new HashMap<>(), false));
|
||||
// Reply to client's SETTINGS.
|
||||
generator.control(lease, new SettingsFrame(new HashMap<>(), true));
|
||||
writeFrames();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onHeaders(HeadersFrame request)
|
||||
{
|
||||
// Response.
|
||||
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
|
||||
HeadersFrame response = new HeadersFrame(request.getStreamId(), metaData, null, true);
|
||||
generator.control(lease, response);
|
||||
writeFrames();
|
||||
}
|
||||
|
||||
private void writeFrames()
|
||||
{
|
||||
try
|
||||
{
|
||||
// Write the frames.
|
||||
for (ByteBuffer buffer : lease.getByteBuffers())
|
||||
output.write(BufferUtil.toArray(buffer));
|
||||
lease.recycle();
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
|
|
|
@ -18,33 +18,64 @@
|
|||
|
||||
package org.eclipse.jetty.http2.client.http;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.eclipse.jetty.client.AbstractConnectionPool;
|
||||
import org.eclipse.jetty.client.HttpClient;
|
||||
import org.eclipse.jetty.client.HttpDestination;
|
||||
import org.eclipse.jetty.client.HttpResponseException;
|
||||
import org.eclipse.jetty.client.api.ContentResponse;
|
||||
import org.eclipse.jetty.client.api.Response;
|
||||
import org.eclipse.jetty.client.api.Result;
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.http2.api.Session;
|
||||
import org.eclipse.jetty.http2.api.Stream;
|
||||
import org.eclipse.jetty.http2.client.HTTP2Client;
|
||||
import org.eclipse.jetty.http2.frames.GoAwayFrame;
|
||||
import org.eclipse.jetty.http2.frames.HeadersFrame;
|
||||
import org.eclipse.jetty.http2.frames.PingFrame;
|
||||
import org.eclipse.jetty.http2.frames.ResetFrame;
|
||||
import org.eclipse.jetty.http2.frames.SettingsFrame;
|
||||
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
|
||||
import org.eclipse.jetty.server.Handler;
|
||||
import org.eclipse.jetty.server.HttpConfiguration;
|
||||
import org.eclipse.jetty.server.Request;
|
||||
import org.eclipse.jetty.util.Promise;
|
||||
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class MaxConcurrentStreamsTest extends AbstractTest
|
||||
{
|
||||
private void start(int maxConcurrentStreams, Handler handler) throws Exception
|
||||
{
|
||||
startServer(maxConcurrentStreams, handler);
|
||||
prepareClient();
|
||||
client.start();
|
||||
}
|
||||
|
||||
private void startServer(int maxConcurrentStreams, Handler handler) throws Exception
|
||||
{
|
||||
HTTP2ServerConnectionFactory http2 = new HTTP2ServerConnectionFactory(new HttpConfiguration());
|
||||
http2.setMaxConcurrentStreams(maxConcurrentStreams);
|
||||
prepareServer(http2);
|
||||
server.setHandler(handler);
|
||||
server.start();
|
||||
prepareClient();
|
||||
client.start();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -114,6 +145,85 @@ public class MaxConcurrentStreamsTest extends AbstractTest
|
|||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSmallMaxConcurrentStreamsExceededOnClient() throws Exception
|
||||
{
|
||||
int maxConcurrentStreams = 1;
|
||||
startServer(maxConcurrentStreams, new EmptyServerHandler()
|
||||
{
|
||||
@Override
|
||||
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response)
|
||||
{
|
||||
sleep(1000);
|
||||
}
|
||||
});
|
||||
|
||||
String scheme = "http";
|
||||
String host = "localhost";
|
||||
int port = connector.getLocalPort();
|
||||
|
||||
AtomicInteger connections = new AtomicInteger();
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
List<Throwable> failures = new ArrayList<>();
|
||||
client = new HttpClient(new HttpClientTransportOverHTTP2(new HTTP2Client())
|
||||
{
|
||||
@Override
|
||||
protected void connect(SslContextFactory sslContextFactory, InetSocketAddress address, Session.Listener listener, Promise<Session> promise, Map<String, Object> context)
|
||||
{
|
||||
super.connect(sslContextFactory, address, new Wrapper(listener)
|
||||
{
|
||||
@Override
|
||||
public void onSettings(Session session, SettingsFrame frame)
|
||||
{
|
||||
// Send another request to simulate a request being
|
||||
// sent concurrently with connection establishment.
|
||||
// Sending this request will trigger the creation of
|
||||
// another connection since maxConcurrentStream=1.
|
||||
if (connections.incrementAndGet() == 1)
|
||||
{
|
||||
client.newRequest(host, port)
|
||||
.path("/2")
|
||||
.send(result ->
|
||||
{
|
||||
if (result.isSucceeded())
|
||||
{
|
||||
Response response2 = result.getResponse();
|
||||
if (response2.getStatus() == HttpStatus.OK_200)
|
||||
latch.countDown();
|
||||
else
|
||||
failures.add(new HttpResponseException("", response2));
|
||||
}
|
||||
else
|
||||
{
|
||||
failures.add(result.getFailure());
|
||||
}
|
||||
});
|
||||
}
|
||||
super.onSettings(session, frame);
|
||||
}
|
||||
}, promise, context);
|
||||
}
|
||||
}, null);
|
||||
QueuedThreadPool clientExecutor = new QueuedThreadPool();
|
||||
clientExecutor.setName("client");
|
||||
client.setExecutor(clientExecutor);
|
||||
client.start();
|
||||
|
||||
// This request will be queued and establish the connection,
|
||||
// which will trigger the send of the second request.
|
||||
ContentResponse response1 = client.newRequest(host, port)
|
||||
.path("/1")
|
||||
.timeout(5, TimeUnit.SECONDS)
|
||||
.send();
|
||||
|
||||
Assert.assertEquals(HttpStatus.OK_200, response1.getStatus());
|
||||
Assert.assertTrue(failures.toString(), latch.await(5, TimeUnit.SECONDS));
|
||||
Assert.assertEquals(2, connections.get());
|
||||
HttpDestination destination = (HttpDestination)client.getDestination(scheme, host, port);
|
||||
AbstractConnectionPool connectionPool = (AbstractConnectionPool)destination.getConnectionPool();
|
||||
Assert.assertEquals(2, connectionPool.getConnectionCount());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTwoConcurrentStreamsThirdWaits() throws Exception
|
||||
{
|
||||
|
@ -214,6 +324,49 @@ public class MaxConcurrentStreamsTest extends AbstractTest
|
|||
Assert.assertTrue(latch.await(maxConcurrent * sleep / 2, TimeUnit.MILLISECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testManyConcurrentRequestsWithSmallConcurrentStreams() throws Exception
|
||||
{
|
||||
byte[] data = new byte[64 * 1024];
|
||||
start(1, new EmptyServerHandler()
|
||||
{
|
||||
@Override
|
||||
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
|
||||
{
|
||||
response.getOutputStream().write(data);
|
||||
}
|
||||
});
|
||||
|
||||
client.setMaxConnectionsPerDestination(32768);
|
||||
client.setMaxRequestsQueuedPerDestination(1024 * 1024);
|
||||
|
||||
int parallelism = 16;
|
||||
int runs = 1;
|
||||
int iterations = 256;
|
||||
int total = parallelism * runs * iterations;
|
||||
CountDownLatch latch = new CountDownLatch(total);
|
||||
Queue<Result> failures = new ConcurrentLinkedQueue<>();
|
||||
ForkJoinPool pool = new ForkJoinPool(parallelism);
|
||||
pool.submit(() -> IntStream.range(0, parallelism).parallel().forEach(i ->
|
||||
IntStream.range(0, runs).forEach(j ->
|
||||
{
|
||||
for (int k = 0; k < iterations; ++k)
|
||||
{
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.path("/" + i + "_" + j + "_" + k)
|
||||
.send(result ->
|
||||
{
|
||||
if (result.isFailed())
|
||||
failures.offer(result);
|
||||
latch.countDown();
|
||||
});
|
||||
}
|
||||
})));
|
||||
|
||||
Assert.assertTrue(latch.await(total * 10, TimeUnit.MILLISECONDS));
|
||||
Assert.assertTrue(failures.toString(), failures.isEmpty());
|
||||
}
|
||||
|
||||
private void primeConnection() throws Exception
|
||||
{
|
||||
// Prime the connection so that the maxConcurrentStream setting arrives to the client.
|
||||
|
@ -233,4 +386,62 @@ public class MaxConcurrentStreamsTest extends AbstractTest
|
|||
throw new RuntimeException(x);
|
||||
}
|
||||
}
|
||||
|
||||
private static class Wrapper implements Session.Listener
|
||||
{
|
||||
private final Session.Listener listener;
|
||||
|
||||
private Wrapper(Session.Listener listener)
|
||||
{
|
||||
this.listener = listener;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<Integer, Integer> onPreface(Session session)
|
||||
{
|
||||
return listener.onPreface(session);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
return listener.onNewStream(stream, frame);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSettings(Session session, SettingsFrame frame)
|
||||
{
|
||||
listener.onSettings(session, frame);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPing(Session session, PingFrame frame)
|
||||
{
|
||||
listener.onPing(session, frame);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onReset(Session session, ResetFrame frame)
|
||||
{
|
||||
listener.onReset(session, frame);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClose(Session session, GoAwayFrame frame)
|
||||
{
|
||||
listener.onClose(session, frame);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean onIdleTimeout(Session session)
|
||||
{
|
||||
return listener.onIdleTimeout(session);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Session session, Throwable failure)
|
||||
{
|
||||
listener.onFailure(session, failure);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -584,7 +584,7 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
|
|||
FutureCallback shutdown=new FutureCallback(false);
|
||||
_shutdown.compareAndSet(null,shutdown);
|
||||
shutdown=_shutdown.get();
|
||||
if (_dispatchedStats.getCurrent()==0)
|
||||
if (_requestStats.getCurrent()==0)
|
||||
shutdown.succeeded();
|
||||
return shutdown;
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.eclipse.jetty.server.handler;
|
|||
import java.io.IOException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
|
@ -31,6 +32,7 @@ import javax.servlet.ServletException;
|
|||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.io.ConnectionStatistics;
|
||||
import org.eclipse.jetty.server.LocalConnector;
|
||||
import org.eclipse.jetty.server.Request;
|
||||
|
@ -41,6 +43,7 @@ import org.junit.Test;
|
|||
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
@ -85,7 +88,7 @@ public class StatisticsHandlerTest
|
|||
_statsHandler.setHandler(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String path, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws IOException, ServletException
|
||||
public void handle(String path, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws IOException
|
||||
{
|
||||
request.setHandled(true);
|
||||
try
|
||||
|
@ -97,7 +100,7 @@ public class StatisticsHandlerTest
|
|||
catch (Exception x)
|
||||
{
|
||||
Thread.currentThread().interrupt();
|
||||
throw (IOException)new IOException().initCause(x);
|
||||
throw new IOException(x);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
@ -182,7 +185,7 @@ public class StatisticsHandlerTest
|
|||
_statsHandler.setHandler(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String path, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws IOException, ServletException
|
||||
public void handle(String path, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws IOException
|
||||
{
|
||||
request.setHandled(true);
|
||||
try
|
||||
|
@ -193,7 +196,7 @@ public class StatisticsHandlerTest
|
|||
catch (Exception x)
|
||||
{
|
||||
Thread.currentThread().interrupt();
|
||||
throw (IOException)new IOException().initCause(x);
|
||||
throw new IOException(x);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
@ -245,7 +248,7 @@ public class StatisticsHandlerTest
|
|||
_statsHandler.setHandler(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String path, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws IOException, ServletException
|
||||
public void handle(String path, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws ServletException
|
||||
{
|
||||
request.setHandled(true);
|
||||
try
|
||||
|
@ -306,22 +309,22 @@ public class StatisticsHandlerTest
|
|||
asyncHolder.get().addListener(new AsyncListener()
|
||||
{
|
||||
@Override
|
||||
public void onTimeout(AsyncEvent event) throws IOException
|
||||
public void onTimeout(AsyncEvent event)
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onStartAsync(AsyncEvent event) throws IOException
|
||||
public void onStartAsync(AsyncEvent event)
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(AsyncEvent event) throws IOException
|
||||
public void onError(AsyncEvent event)
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete(AsyncEvent event) throws IOException
|
||||
public void onComplete(AsyncEvent event)
|
||||
{
|
||||
try
|
||||
{
|
||||
|
@ -375,7 +378,7 @@ public class StatisticsHandlerTest
|
|||
_statsHandler.setHandler(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String path, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws IOException, ServletException
|
||||
public void handle(String path, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws ServletException
|
||||
{
|
||||
request.setHandled(true);
|
||||
try
|
||||
|
@ -428,23 +431,23 @@ public class StatisticsHandlerTest
|
|||
asyncHolder.get().addListener(new AsyncListener()
|
||||
{
|
||||
@Override
|
||||
public void onTimeout(AsyncEvent event) throws IOException
|
||||
public void onTimeout(AsyncEvent event)
|
||||
{
|
||||
event.getAsyncContext().complete();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onStartAsync(AsyncEvent event) throws IOException
|
||||
public void onStartAsync(AsyncEvent event)
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(AsyncEvent event) throws IOException
|
||||
public void onError(AsyncEvent event)
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete(AsyncEvent event) throws IOException
|
||||
public void onComplete(AsyncEvent event)
|
||||
{
|
||||
try
|
||||
{
|
||||
|
@ -491,7 +494,7 @@ public class StatisticsHandlerTest
|
|||
_statsHandler.setHandler(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String path, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws IOException, ServletException
|
||||
public void handle(String path, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws ServletException
|
||||
{
|
||||
request.setHandled(true);
|
||||
try
|
||||
|
@ -550,22 +553,22 @@ public class StatisticsHandlerTest
|
|||
asyncHolder.get().addListener(new AsyncListener()
|
||||
{
|
||||
@Override
|
||||
public void onTimeout(AsyncEvent event) throws IOException
|
||||
public void onTimeout(AsyncEvent event)
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onStartAsync(AsyncEvent event) throws IOException
|
||||
public void onStartAsync(AsyncEvent event)
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(AsyncEvent event) throws IOException
|
||||
public void onError(AsyncEvent event)
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete(AsyncEvent event) throws IOException
|
||||
public void onComplete(AsyncEvent event)
|
||||
{
|
||||
try
|
||||
{
|
||||
|
@ -601,6 +604,53 @@ public class StatisticsHandlerTest
|
|||
assertEquals(_statsHandler.getDispatchedTimeTotal(), _statsHandler.getDispatchedTimeMean(), 0.01);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAsyncRequestWithShutdown() throws Exception
|
||||
{
|
||||
long delay = 500;
|
||||
CountDownLatch serverLatch = new CountDownLatch(1);
|
||||
_statsHandler.setHandler(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
|
||||
{
|
||||
AsyncContext asyncContext = request.startAsync();
|
||||
asyncContext.setTimeout(0);
|
||||
new Thread(() ->
|
||||
{
|
||||
try
|
||||
{
|
||||
Thread.sleep(delay);
|
||||
asyncContext.complete();
|
||||
}
|
||||
catch (InterruptedException e)
|
||||
{
|
||||
response.setStatus(HttpStatus.INTERNAL_SERVER_ERROR_500);
|
||||
asyncContext.complete();
|
||||
}
|
||||
}).start();
|
||||
serverLatch.countDown();
|
||||
}
|
||||
});
|
||||
_server.start();
|
||||
|
||||
String request = "GET / HTTP/1.1\r\n" +
|
||||
"Host: localhost\r\n" +
|
||||
"\r\n";
|
||||
_connector.executeRequest(request);
|
||||
|
||||
assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
Future<Void> shutdown = _statsHandler.shutdown();
|
||||
assertFalse(shutdown.isDone());
|
||||
|
||||
Thread.sleep(delay / 2);
|
||||
assertFalse(shutdown.isDone());
|
||||
|
||||
Thread.sleep(delay);
|
||||
assertTrue(shutdown.isDone());
|
||||
}
|
||||
|
||||
/**
|
||||
* This handler is external to the statistics handler and it is used to ensure that statistics handler's
|
||||
* handle() is fully executed before asserting its values in the tests, to avoid race conditions with the
|
||||
|
|
Loading…
Reference in New Issue