Merged branch 'jetty-9.3.x' into 'jetty-9.4.x'.
This commit is contained in:
commit
d8961139f3
|
@ -0,0 +1,237 @@
|
||||||
|
//
|
||||||
|
// ========================================================================
|
||||||
|
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
|
||||||
|
// ------------------------------------------------------------------------
|
||||||
|
// All rights reserved. This program and the accompanying materials
|
||||||
|
// are made available under the terms of the Eclipse Public License v1.0
|
||||||
|
// and Apache License v2.0 which accompanies this distribution.
|
||||||
|
//
|
||||||
|
// The Eclipse Public License is available at
|
||||||
|
// http://www.eclipse.org/legal/epl-v10.html
|
||||||
|
//
|
||||||
|
// The Apache License v2.0 is available at
|
||||||
|
// http://www.opensource.org/licenses/apache2.0.php
|
||||||
|
//
|
||||||
|
// You may elect to redistribute this code under either of these licenses.
|
||||||
|
// ========================================================================
|
||||||
|
//
|
||||||
|
|
||||||
|
package org.eclipse.jetty.http2.client;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.Locale;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
|
import javax.servlet.ServletException;
|
||||||
|
import javax.servlet.http.HttpServlet;
|
||||||
|
import javax.servlet.http.HttpServletRequest;
|
||||||
|
import javax.servlet.http.HttpServletResponse;
|
||||||
|
|
||||||
|
import org.eclipse.jetty.http.HttpFields;
|
||||||
|
import org.eclipse.jetty.http.HttpMethod;
|
||||||
|
import org.eclipse.jetty.http.MetaData;
|
||||||
|
import org.eclipse.jetty.http2.api.Session;
|
||||||
|
import org.eclipse.jetty.http2.api.Stream;
|
||||||
|
import org.eclipse.jetty.http2.frames.DataFrame;
|
||||||
|
import org.eclipse.jetty.http2.frames.HeadersFrame;
|
||||||
|
import org.eclipse.jetty.http2.frames.ResetFrame;
|
||||||
|
import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory;
|
||||||
|
import org.eclipse.jetty.servlet.ServletContextHandler;
|
||||||
|
import org.eclipse.jetty.util.Callback;
|
||||||
|
import org.eclipse.jetty.util.FuturePromise;
|
||||||
|
import org.eclipse.jetty.util.IO;
|
||||||
|
import org.eclipse.jetty.util.log.Log;
|
||||||
|
import org.eclipse.jetty.util.log.Logger;
|
||||||
|
import org.eclipse.jetty.util.thread.ExecutionStrategy;
|
||||||
|
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
||||||
|
import org.eclipse.jetty.util.thread.Scheduler;
|
||||||
|
import org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume;
|
||||||
|
import org.hamcrest.Matchers;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class SmallThreadPoolLoadTest extends AbstractTest
|
||||||
|
{
|
||||||
|
private final Logger logger = Log.getLogger(SmallThreadPoolLoadTest.class);
|
||||||
|
private final AtomicLong requestIds = new AtomicLong();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void customizeContext(ServletContextHandler context)
|
||||||
|
{
|
||||||
|
QueuedThreadPool serverThreads = (QueuedThreadPool)context.getServer().getThreadPool();
|
||||||
|
serverThreads.setDetailedDump(true);
|
||||||
|
serverThreads.setMaxThreads(5);
|
||||||
|
serverThreads.setLowThreadsThreshold(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConcurrentWithSmallServerThreadPool() throws Exception
|
||||||
|
{
|
||||||
|
start(new LoadServlet());
|
||||||
|
AbstractHTTP2ServerConnectionFactory factory = connector.getBean(AbstractHTTP2ServerConnectionFactory.class);
|
||||||
|
factory.setExecutionStrategyFactory(new ExecuteProduceConsume.Factory()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public ExecutionStrategy newExecutionStrategy(ExecutionStrategy.Producer producer, Executor executor)
|
||||||
|
{
|
||||||
|
return new ExecuteProduceConsume(producer, executor)
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
protected void executeTask(Runnable task)
|
||||||
|
{
|
||||||
|
if (task instanceof Rejectable)
|
||||||
|
((Rejectable)task).reject();
|
||||||
|
else
|
||||||
|
super.executeTask(task);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Only one connection to the server.
|
||||||
|
Session session = newClient(new Session.Listener.Adapter());
|
||||||
|
|
||||||
|
int runs = 10;
|
||||||
|
int iterations = 512;
|
||||||
|
boolean result = IntStream.range(0, 16).parallel()
|
||||||
|
.mapToObj(i -> IntStream.range(0, runs)
|
||||||
|
.mapToObj(j -> run(session, iterations))
|
||||||
|
.reduce(true, (acc, res) -> acc && res))
|
||||||
|
.reduce(true, (acc, res) -> acc && res);
|
||||||
|
|
||||||
|
Assert.assertTrue(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean run(Session session, int iterations)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
CountDownLatch latch = new CountDownLatch(iterations);
|
||||||
|
int factor = (logger.isDebugEnabled() ? 25 : 1) * 100;
|
||||||
|
|
||||||
|
// Dumps the state of the client if the test takes too long.
|
||||||
|
final Thread testThread = Thread.currentThread();
|
||||||
|
Scheduler.Task task = client.getScheduler().schedule(() ->
|
||||||
|
{
|
||||||
|
logger.warn("Interrupting test, it is taking too long{}{}{}{}",
|
||||||
|
System.lineSeparator(), server.dump(),
|
||||||
|
System.lineSeparator(), client.dump());
|
||||||
|
testThread.interrupt();
|
||||||
|
}, iterations * factor, TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
|
long successes = 0;
|
||||||
|
long begin = System.nanoTime();
|
||||||
|
for (int i = 0; i < iterations; ++i)
|
||||||
|
{
|
||||||
|
boolean success = test(session, latch);
|
||||||
|
if (success)
|
||||||
|
++successes;
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.assertTrue(latch.await(iterations, TimeUnit.SECONDS));
|
||||||
|
long end = System.nanoTime();
|
||||||
|
Assert.assertThat(successes, Matchers.greaterThan(0L));
|
||||||
|
task.cancel();
|
||||||
|
long elapsed = TimeUnit.NANOSECONDS.toMillis(end - begin);
|
||||||
|
logger.info("{} requests in {} ms, {}/{} success/failure, {} req/s",
|
||||||
|
iterations, elapsed,
|
||||||
|
successes, iterations - successes,
|
||||||
|
elapsed > 0 ? iterations * 1000 / elapsed : -1);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
catch (Exception x)
|
||||||
|
{
|
||||||
|
x.printStackTrace();
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean test(Session session, CountDownLatch latch) throws Exception
|
||||||
|
{
|
||||||
|
ThreadLocalRandom random = ThreadLocalRandom.current();
|
||||||
|
// Choose a random method
|
||||||
|
boolean download = random.nextBoolean();
|
||||||
|
HttpMethod method = download ? HttpMethod.GET : HttpMethod.POST;
|
||||||
|
|
||||||
|
int maxContentLength = 128 * 1024;
|
||||||
|
int contentLength = random.nextInt(maxContentLength) + 1;
|
||||||
|
|
||||||
|
long requestId = requestIds.incrementAndGet();
|
||||||
|
MetaData.Request request = newRequest(method.asString(), "/" + requestId, new HttpFields());
|
||||||
|
if (download)
|
||||||
|
request.getFields().put("X-Download", String.valueOf(contentLength));
|
||||||
|
HeadersFrame requestFrame = new HeadersFrame(request, null, download);
|
||||||
|
FuturePromise<Stream> promise = new FuturePromise<>();
|
||||||
|
CountDownLatch requestLatch = new CountDownLatch(1);
|
||||||
|
AtomicBoolean reset = new AtomicBoolean();
|
||||||
|
session.newStream(requestFrame, promise, new Stream.Listener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onHeaders(Stream stream, HeadersFrame frame)
|
||||||
|
{
|
||||||
|
if (frame.isEndStream())
|
||||||
|
requestLatch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onData(Stream stream, DataFrame frame, Callback callback)
|
||||||
|
{
|
||||||
|
callback.succeeded();
|
||||||
|
if (frame.isEndStream())
|
||||||
|
requestLatch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onReset(Stream stream, ResetFrame frame)
|
||||||
|
{
|
||||||
|
reset.set(true);
|
||||||
|
requestLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
if (!download)
|
||||||
|
{
|
||||||
|
Stream stream = promise.get(5, TimeUnit.SECONDS);
|
||||||
|
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(contentLength), true), Callback.NOOP);
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean success = requestLatch.await(5, TimeUnit.SECONDS);
|
||||||
|
if (success)
|
||||||
|
latch.countDown();
|
||||||
|
else
|
||||||
|
logger.warn("Request {} took too long{}{}{}{}", requestId,
|
||||||
|
System.lineSeparator(), server.dump(),
|
||||||
|
System.lineSeparator(), client.dump());
|
||||||
|
return !reset.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class LoadServlet extends HttpServlet
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
|
||||||
|
{
|
||||||
|
String method = request.getMethod().toUpperCase(Locale.ENGLISH);
|
||||||
|
switch (method)
|
||||||
|
{
|
||||||
|
case "GET":
|
||||||
|
{
|
||||||
|
int contentLength = request.getIntHeader("X-Download");
|
||||||
|
if (contentLength > 0)
|
||||||
|
response.getOutputStream().write(new byte[contentLength]);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case "POST":
|
||||||
|
{
|
||||||
|
IO.copy(request.getInputStream(), response.getOutputStream());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -37,6 +37,8 @@ import org.eclipse.jetty.http.HttpFields;
|
||||||
import org.eclipse.jetty.http.HttpVersion;
|
import org.eclipse.jetty.http.HttpVersion;
|
||||||
import org.eclipse.jetty.http.MetaData;
|
import org.eclipse.jetty.http.MetaData;
|
||||||
import org.eclipse.jetty.http2.ErrorCode;
|
import org.eclipse.jetty.http2.ErrorCode;
|
||||||
|
import org.eclipse.jetty.http2.FlowControlStrategy;
|
||||||
|
import org.eclipse.jetty.http2.ISession;
|
||||||
import org.eclipse.jetty.http2.IStream;
|
import org.eclipse.jetty.http2.IStream;
|
||||||
import org.eclipse.jetty.http2.api.Session;
|
import org.eclipse.jetty.http2.api.Session;
|
||||||
import org.eclipse.jetty.http2.api.Stream;
|
import org.eclipse.jetty.http2.api.Stream;
|
||||||
|
@ -48,6 +50,7 @@ import org.eclipse.jetty.server.HttpOutput;
|
||||||
import org.eclipse.jetty.util.Callback;
|
import org.eclipse.jetty.util.Callback;
|
||||||
import org.eclipse.jetty.util.FutureCallback;
|
import org.eclipse.jetty.util.FutureCallback;
|
||||||
import org.eclipse.jetty.util.FuturePromise;
|
import org.eclipse.jetty.util.FuturePromise;
|
||||||
|
import org.hamcrest.Matchers;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -360,4 +363,87 @@ public class StreamResetTest extends AbstractTest
|
||||||
|
|
||||||
Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
|
Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testClientResetConsumesQueuedData() throws Exception
|
||||||
|
{
|
||||||
|
start(new EmptyHttpServlet());
|
||||||
|
|
||||||
|
Session client = newClient(new Session.Listener.Adapter());
|
||||||
|
MetaData.Request request = newRequest("GET", new HttpFields());
|
||||||
|
HeadersFrame frame = new HeadersFrame(request, null, false);
|
||||||
|
FuturePromise<Stream> promise = new FuturePromise<>();
|
||||||
|
client.newStream(frame, promise, new Stream.Listener.Adapter());
|
||||||
|
Stream stream = promise.get(5, TimeUnit.SECONDS);
|
||||||
|
ByteBuffer data = ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
|
||||||
|
CountDownLatch dataLatch = new CountDownLatch(1);
|
||||||
|
stream.data(new DataFrame(stream.getId(), data, false), new Callback()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void succeeded()
|
||||||
|
{
|
||||||
|
dataLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
// The server does not read the data, so the flow control window should be zero.
|
||||||
|
Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
Assert.assertEquals(0, ((ISession)client).updateSendWindow(0));
|
||||||
|
|
||||||
|
// Now reset the stream.
|
||||||
|
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
|
||||||
|
|
||||||
|
// Wait for the server to receive the reset and process
|
||||||
|
// it, and for the client to process the window updates.
|
||||||
|
Thread.sleep(1000);
|
||||||
|
|
||||||
|
Assert.assertThat(((ISession)client).updateSendWindow(0), Matchers.greaterThan(0));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testServerExceptionConsumesQueuedData() throws Exception
|
||||||
|
{
|
||||||
|
start(new HttpServlet()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
// Wait to let the data sent by the client to be queued.
|
||||||
|
Thread.sleep(1000);
|
||||||
|
throw new IllegalStateException();
|
||||||
|
}
|
||||||
|
catch (InterruptedException e)
|
||||||
|
{
|
||||||
|
throw new InterruptedIOException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Session client = newClient(new Session.Listener.Adapter());
|
||||||
|
MetaData.Request request = newRequest("GET", new HttpFields());
|
||||||
|
HeadersFrame frame = new HeadersFrame(request, null, false);
|
||||||
|
FuturePromise<Stream> promise = new FuturePromise<>();
|
||||||
|
client.newStream(frame, promise, new Stream.Listener.Adapter());
|
||||||
|
Stream stream = promise.get(5, TimeUnit.SECONDS);
|
||||||
|
ByteBuffer data = ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
|
||||||
|
CountDownLatch dataLatch = new CountDownLatch(1);
|
||||||
|
stream.data(new DataFrame(stream.getId(), data, false), new Callback()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void succeeded()
|
||||||
|
{
|
||||||
|
dataLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
// The server does not read the data, so the flow control window should be zero.
|
||||||
|
Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
Assert.assertEquals(0, ((ISession)client).updateSendWindow(0));
|
||||||
|
|
||||||
|
// Wait for the server process the exception, and
|
||||||
|
// for the client to process the window updates.
|
||||||
|
Thread.sleep(2000);
|
||||||
|
|
||||||
|
Assert.assertThat(((ISession)client).updateSendWindow(0), Matchers.greaterThan(0));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
|
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
|
||||||
org.eclipse.jetty.http2.hpack.LEVEL=INFO
|
#org.eclipse.jetty.LEVEL=DEBUG
|
||||||
#org.eclipse.jetty.http2.LEVEL=DEBUG
|
#org.eclipse.jetty.http2.LEVEL=DEBUG
|
||||||
|
org.eclipse.jetty.http2.hpack.LEVEL=INFO
|
||||||
#org.eclipse.jetty.servlets.LEVEL=DEBUG
|
#org.eclipse.jetty.servlets.LEVEL=DEBUG
|
||||||
|
|
|
@ -167,7 +167,7 @@ public class HTTP2Connection extends AbstractConnection
|
||||||
|
|
||||||
task = tasks.poll();
|
task = tasks.poll();
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("Dequeued task {}", task);
|
LOG.debug("Dequeued new task {}", task);
|
||||||
if (task != null)
|
if (task != null)
|
||||||
{
|
{
|
||||||
release();
|
release();
|
||||||
|
|
|
@ -30,6 +30,7 @@ import org.eclipse.jetty.http.HttpHeader;
|
||||||
import org.eclipse.jetty.http.HttpMethod;
|
import org.eclipse.jetty.http.HttpMethod;
|
||||||
import org.eclipse.jetty.http.MetaData;
|
import org.eclipse.jetty.http.MetaData;
|
||||||
import org.eclipse.jetty.http.MetaData.Request;
|
import org.eclipse.jetty.http.MetaData.Request;
|
||||||
|
import org.eclipse.jetty.http2.ErrorCode;
|
||||||
import org.eclipse.jetty.http2.HTTP2Connection;
|
import org.eclipse.jetty.http2.HTTP2Connection;
|
||||||
import org.eclipse.jetty.http2.ISession;
|
import org.eclipse.jetty.http2.ISession;
|
||||||
import org.eclipse.jetty.http2.IStream;
|
import org.eclipse.jetty.http2.IStream;
|
||||||
|
@ -38,6 +39,7 @@ import org.eclipse.jetty.http2.frames.DataFrame;
|
||||||
import org.eclipse.jetty.http2.frames.Frame;
|
import org.eclipse.jetty.http2.frames.Frame;
|
||||||
import org.eclipse.jetty.http2.frames.HeadersFrame;
|
import org.eclipse.jetty.http2.frames.HeadersFrame;
|
||||||
import org.eclipse.jetty.http2.frames.PrefaceFrame;
|
import org.eclipse.jetty.http2.frames.PrefaceFrame;
|
||||||
|
import org.eclipse.jetty.http2.frames.ResetFrame;
|
||||||
import org.eclipse.jetty.http2.frames.SettingsFrame;
|
import org.eclipse.jetty.http2.frames.SettingsFrame;
|
||||||
import org.eclipse.jetty.http2.parser.ServerParser;
|
import org.eclipse.jetty.http2.parser.ServerParser;
|
||||||
import org.eclipse.jetty.http2.parser.SettingsBodyParser;
|
import org.eclipse.jetty.http2.parser.SettingsBodyParser;
|
||||||
|
@ -187,19 +189,36 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private class ServerHttpChannelOverHTTP2 extends HttpChannelOverHTTP2
|
private class ServerHttpChannelOverHTTP2 extends HttpChannelOverHTTP2 implements ExecutionStrategy.Rejectable
|
||||||
{
|
{
|
||||||
public ServerHttpChannelOverHTTP2(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransportOverHTTP2 transport)
|
public ServerHttpChannelOverHTTP2(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransportOverHTTP2 transport)
|
||||||
{
|
{
|
||||||
super(connector, configuration, endPoint, transport);
|
super(connector, configuration, endPoint, transport);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void recycle()
|
||||||
|
{
|
||||||
|
super.recycle();
|
||||||
|
channels.offer(this);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onCompleted()
|
public void onCompleted()
|
||||||
{
|
{
|
||||||
super.onCompleted();
|
super.onCompleted();
|
||||||
recycle();
|
recycle();
|
||||||
channels.offer(this);
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reject()
|
||||||
|
{
|
||||||
|
IStream stream = getStream();
|
||||||
|
stream.reset(new ResetFrame(stream.getId(), ErrorCode.ENHANCE_YOUR_CALM_ERROR.code), Callback.NOOP);
|
||||||
|
// Consume the existing queued data frames to
|
||||||
|
// avoid stalling the session flow control.
|
||||||
|
getHttpTransport().consumeInput();
|
||||||
|
recycle();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,7 +59,7 @@ public class HttpChannelOverHTTP2 extends HttpChannel
|
||||||
super(connector, configuration, endPoint, transport);
|
super(connector, configuration, endPoint, transport);
|
||||||
}
|
}
|
||||||
|
|
||||||
private IStream getStream()
|
protected IStream getStream()
|
||||||
{
|
{
|
||||||
return getHttpTransport().getStream();
|
return getHttpTransport().getStream();
|
||||||
}
|
}
|
||||||
|
|
|
@ -100,7 +100,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport
|
||||||
{
|
{
|
||||||
if (hasContent)
|
if (hasContent)
|
||||||
{
|
{
|
||||||
commit(info, false, new Callback()
|
commit(info, false, new Callback.NonBlocking()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public void succeeded()
|
public void succeeded()
|
||||||
|
@ -206,18 +206,22 @@ public class HttpTransportOverHTTP2 implements HttpTransport
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onCompleted()
|
public void onCompleted()
|
||||||
{
|
|
||||||
if (!stream.isClosed())
|
|
||||||
{
|
{
|
||||||
// If the stream is not closed, it is still reading the request content.
|
// If the stream is not closed, it is still reading the request content.
|
||||||
// Send a reset to the other end so that it stops sending data.
|
// Send a reset to the other end so that it stops sending data.
|
||||||
|
if (!stream.isClosed())
|
||||||
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
|
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
|
||||||
// Now that this stream is reset, in-flight data frames will be consumed and discarded.
|
|
||||||
// Consume the existing queued data frames to avoid stalling the flow control.
|
// Consume the existing queued data frames to
|
||||||
|
// avoid stalling the session flow control.
|
||||||
|
consumeInput();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void consumeInput()
|
||||||
|
{
|
||||||
HttpChannel channel = (HttpChannel)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
|
HttpChannel channel = (HttpChannel)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
|
||||||
channel.getRequest().getHttpInput().consumeAll();
|
channel.getRequest().getHttpInput().consumeAll();
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void abort(Throwable failure)
|
public void abort(Throwable failure)
|
||||||
|
|
|
@ -147,9 +147,9 @@ public class HttpInput extends ServletInputStream implements Runnable
|
||||||
Content item = nextContent();
|
Content item = nextContent();
|
||||||
if (item!=null)
|
if (item!=null)
|
||||||
{
|
{
|
||||||
if (LOG.isDebugEnabled())
|
|
||||||
LOG.debug("{} read {} from {}",this,len,item);
|
|
||||||
int l = get(item, b, off, len);
|
int l = get(item, b, off, len);
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("{} read {} from {}",this,l,item);
|
||||||
|
|
||||||
consumeNonContent();
|
consumeNonContent();
|
||||||
|
|
||||||
|
@ -357,7 +357,7 @@ public class HttpInput extends ServletInputStream implements Runnable
|
||||||
}
|
}
|
||||||
|
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("{} blocking for content timeout={} ...", this,timeout);
|
LOG.debug("{} blocking for content timeout={}", this,timeout);
|
||||||
if (timeout>0)
|
if (timeout>0)
|
||||||
_inputQ.wait(timeout);
|
_inputQ.wait(timeout);
|
||||||
else
|
else
|
||||||
|
@ -845,5 +845,4 @@ public class HttpInput extends ServletInputStream implements Runnable
|
||||||
return "AEOF";
|
return "AEOF";
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@ package org.eclipse.jetty.util.thread;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.RejectedExecutionException;
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
|
@ -43,7 +44,7 @@ import org.eclipse.jetty.util.log.Log;
|
||||||
import org.eclipse.jetty.util.log.Logger;
|
import org.eclipse.jetty.util.log.Logger;
|
||||||
import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
|
import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
|
||||||
|
|
||||||
@ManagedObject("A thread pool with no max bound by default")
|
@ManagedObject("A thread pool")
|
||||||
public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Dumpable
|
public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Dumpable
|
||||||
{
|
{
|
||||||
private static final Logger LOG = Log.getLogger(QueuedThreadPool.class);
|
private static final Logger LOG = Log.getLogger(QueuedThreadPool.class);
|
||||||
|
@ -51,7 +52,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
|
||||||
private final AtomicInteger _threadsStarted = new AtomicInteger();
|
private final AtomicInteger _threadsStarted = new AtomicInteger();
|
||||||
private final AtomicInteger _threadsIdle = new AtomicInteger();
|
private final AtomicInteger _threadsIdle = new AtomicInteger();
|
||||||
private final AtomicLong _lastShrink = new AtomicLong();
|
private final AtomicLong _lastShrink = new AtomicLong();
|
||||||
private final ConcurrentHashSet<Thread> _threads=new ConcurrentHashSet<Thread>();
|
private final ConcurrentHashSet<Thread> _threads=new ConcurrentHashSet<>();
|
||||||
private final Object _joinLock = new Object();
|
private final Object _joinLock = new Object();
|
||||||
private final BlockingQueue<Runnable> _jobs;
|
private final BlockingQueue<Runnable> _jobs;
|
||||||
private final ThreadGroup _threadGroup;
|
private final ThreadGroup _threadGroup;
|
||||||
|
@ -62,6 +63,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
|
||||||
private int _priority = Thread.NORM_PRIORITY;
|
private int _priority = Thread.NORM_PRIORITY;
|
||||||
private boolean _daemon = false;
|
private boolean _daemon = false;
|
||||||
private boolean _detailedDump = false;
|
private boolean _detailedDump = false;
|
||||||
|
private int _lowThreadsThreshold = 1;
|
||||||
|
|
||||||
public QueuedThreadPool()
|
public QueuedThreadPool()
|
||||||
{
|
{
|
||||||
|
@ -126,13 +128,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
|
||||||
jobs.clear();
|
jobs.clear();
|
||||||
|
|
||||||
// Fill job Q with noop jobs to wakeup idle
|
// Fill job Q with noop jobs to wakeup idle
|
||||||
Runnable noop = new Runnable()
|
Runnable noop = () -> {};
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public void run()
|
|
||||||
{
|
|
||||||
}
|
|
||||||
};
|
|
||||||
for (int i = _threadsStarted.get(); i-- > 0; )
|
for (int i = _threadsStarted.get(); i-- > 0; )
|
||||||
jobs.offer(noop);
|
jobs.offer(noop);
|
||||||
|
|
||||||
|
@ -338,24 +334,23 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
|
||||||
*
|
*
|
||||||
* @return Number of jobs queued waiting for a thread
|
* @return Number of jobs queued waiting for a thread
|
||||||
*/
|
*/
|
||||||
@ManagedAttribute("Size of the job queue")
|
@ManagedAttribute("size of the job queue")
|
||||||
public int getQueueSize()
|
public int getQueueSize()
|
||||||
{
|
{
|
||||||
return _jobs.size();
|
return _jobs.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Is thread pool using daemon threading
|
* @return whether this thread pool is using daemon threads
|
||||||
*
|
|
||||||
* @return true if delegating to named or anonymous pool
|
|
||||||
* @see Thread#setDaemon(boolean)
|
* @see Thread#setDaemon(boolean)
|
||||||
*/
|
*/
|
||||||
@ManagedAttribute("thread pool using a daemon thread")
|
@ManagedAttribute("thread pool uses daemon threads")
|
||||||
public boolean isDaemon()
|
public boolean isDaemon()
|
||||||
{
|
{
|
||||||
return _daemon;
|
return _daemon;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ManagedAttribute("reports additional details in the dump")
|
||||||
public boolean isDetailedDump()
|
public boolean isDetailedDump()
|
||||||
{
|
{
|
||||||
return _detailedDump;
|
return _detailedDump;
|
||||||
|
@ -366,6 +361,17 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
|
||||||
_detailedDump = detailedDump;
|
_detailedDump = detailedDump;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ManagedAttribute("threshold at which the pool is low on threads")
|
||||||
|
public int getLowThreadsThreshold()
|
||||||
|
{
|
||||||
|
return _lowThreadsThreshold;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setLowThreadsThreshold(int lowThreadsThreshold)
|
||||||
|
{
|
||||||
|
_lowThreadsThreshold = lowThreadsThreshold;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void execute(Runnable job)
|
public void execute(Runnable job)
|
||||||
{
|
{
|
||||||
|
@ -401,42 +407,49 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return The total number of threads currently in the pool
|
* @return the total number of threads currently in the pool
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
@ManagedAttribute("total number of threads currently in the pool")
|
@ManagedAttribute("number of threads in the pool")
|
||||||
public int getThreads()
|
public int getThreads()
|
||||||
{
|
{
|
||||||
return _threadsStarted.get();
|
return _threadsStarted.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return The number of idle threads in the pool
|
* @return the number of idle threads in the pool
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
@ManagedAttribute("total number of idle threads in the pool")
|
@ManagedAttribute("number of idle threads in the pool")
|
||||||
public int getIdleThreads()
|
public int getIdleThreads()
|
||||||
{
|
{
|
||||||
return _threadsIdle.get();
|
return _threadsIdle.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return The number of busy threads in the pool
|
* @return the number of busy threads in the pool
|
||||||
*/
|
*/
|
||||||
@ManagedAttribute("total number of busy threads in the pool")
|
@ManagedAttribute("number of busy threads in the pool")
|
||||||
public int getBusyThreads()
|
public int getBusyThreads()
|
||||||
{
|
{
|
||||||
return getThreads() - getIdleThreads();
|
return getThreads() - getIdleThreads();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return True if the pool is at maxThreads and there are not more idle threads than queued jobs
|
* <p>Returns whether this thread pool is low on threads.</p>
|
||||||
|
* <p>The current formula is:</p>
|
||||||
|
* <pre>
|
||||||
|
* maxThreads - threads + idleThreads - queueSize <= lowThreadsThreshold
|
||||||
|
* </pre>
|
||||||
|
*
|
||||||
|
* @return whether the pool is low on threads
|
||||||
|
* @see #getLowThreadsThreshold()
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
@ManagedAttribute("True if the pools is at maxThreads and there are not idle threads than queued jobs")
|
@ManagedAttribute(value = "thread pool is low on threads", readonly = true)
|
||||||
public boolean isLowOnThreads()
|
public boolean isLowOnThreads()
|
||||||
{
|
{
|
||||||
return _threadsStarted.get() == _maxThreads && _jobs.size() >= _threadsIdle.get();
|
return getMaxThreads() - getThreads() + getIdleThreads() - getQueueSize() <= getLowThreadsThreshold();
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean startThreads(int threadsToStart)
|
private boolean startThreads(int threadsToStart)
|
||||||
|
@ -478,7 +491,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ManagedOperation("dump thread state")
|
@ManagedOperation("dumps thread pool state")
|
||||||
public String dump()
|
public String dump()
|
||||||
{
|
{
|
||||||
return ContainerLifeCycle.dump(this);
|
return ContainerLifeCycle.dump(this);
|
||||||
|
@ -487,7 +500,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
|
||||||
@Override
|
@Override
|
||||||
public void dump(Appendable out, String indent) throws IOException
|
public void dump(Appendable out, String indent) throws IOException
|
||||||
{
|
{
|
||||||
List<Object> dump = new ArrayList<>(getMaxThreads());
|
List<Object> threads = new ArrayList<>(getMaxThreads());
|
||||||
for (final Thread thread : _threads)
|
for (final Thread thread : _threads)
|
||||||
{
|
{
|
||||||
final StackTraceElement[] trace = thread.getStackTrace();
|
final StackTraceElement[] trace = thread.getStackTrace();
|
||||||
|
@ -504,7 +517,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
|
||||||
|
|
||||||
if (isDetailedDump())
|
if (isDetailedDump())
|
||||||
{
|
{
|
||||||
dump.add(new Dumpable()
|
threads.add(new Dumpable()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public void dump(Appendable out, String indent) throws IOException
|
public void dump(Appendable out, String indent) throws IOException
|
||||||
|
@ -527,12 +540,16 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
int p=thread.getPriority();
|
int p=thread.getPriority();
|
||||||
dump.add(thread.getId() + " " + thread.getName() + " " + thread.getState() + " @ " + (trace.length > 0 ? trace[0] : "???") + (idle ? " IDLE" : "")+ (p==Thread.NORM_PRIORITY?"":(" prio="+p)));
|
threads.add(thread.getId() + " " + thread.getName() + " " + thread.getState() + " @ " + (trace.length > 0 ? trace[0] : "???") + (idle ? " IDLE" : "")+ (p==Thread.NORM_PRIORITY?"":(" prio="+p)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
List<Runnable> jobs = Collections.emptyList();
|
||||||
|
if (isDetailedDump())
|
||||||
|
jobs = new ArrayList<>(getQueue());
|
||||||
|
|
||||||
ContainerLifeCycle.dumpObject(out, this);
|
ContainerLifeCycle.dumpObject(out, this);
|
||||||
ContainerLifeCycle.dump(out, indent, dump);
|
ContainerLifeCycle.dump(out, indent, threads, jobs);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -664,17 +681,19 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param queue the job queue
|
* @param queue the job queue
|
||||||
|
* @deprecated pass the queue to the constructor instead
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
public void setQueue(BlockingQueue<Runnable> queue)
|
public void setQueue(BlockingQueue<Runnable> queue)
|
||||||
{
|
{
|
||||||
throw new UnsupportedOperationException("Use constructor injection");
|
throw new UnsupportedOperationException("Use constructor injection");
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param id The thread ID to interrupt.
|
* @param id the thread ID to interrupt.
|
||||||
* @return true if the thread was found and interrupted.
|
* @return true if the thread was found and interrupted.
|
||||||
*/
|
*/
|
||||||
@ManagedOperation("interrupt a pool thread")
|
@ManagedOperation("interrupts a pool thread")
|
||||||
public boolean interruptThread(@Name("id") long id)
|
public boolean interruptThread(@Name("id") long id)
|
||||||
{
|
{
|
||||||
for (Thread thread : _threads)
|
for (Thread thread : _threads)
|
||||||
|
@ -689,10 +708,10 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param id The thread ID to interrupt.
|
* @param id the thread ID to interrupt.
|
||||||
* @return true if the thread was found and interrupted.
|
* @return the stack frames dump
|
||||||
*/
|
*/
|
||||||
@ManagedOperation("dump a pool thread stack")
|
@ManagedOperation("dumps a pool thread stack")
|
||||||
public String dumpThread(@Name("id") long id)
|
public String dumpThread(@Name("id") long id)
|
||||||
{
|
{
|
||||||
for (Thread thread : _threads)
|
for (Thread thread : _threads)
|
||||||
|
|
|
@ -191,7 +191,7 @@ public class ExecuteProduceConsume extends ExecutingExecutionStrategy implements
|
||||||
}
|
}
|
||||||
|
|
||||||
// Execute the task.
|
// Execute the task.
|
||||||
execute(task);
|
executeTask(task);
|
||||||
}
|
}
|
||||||
return !idle;
|
return !idle;
|
||||||
}
|
}
|
||||||
|
@ -203,6 +203,11 @@ public class ExecuteProduceConsume extends ExecutingExecutionStrategy implements
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void executeTask(Runnable task)
|
||||||
|
{
|
||||||
|
execute(task);
|
||||||
|
}
|
||||||
|
|
||||||
private void executeProduceConsume()
|
private void executeProduceConsume()
|
||||||
{
|
{
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.eclipse.jetty.util.thread.strategy;
|
||||||
|
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.RejectedExecutionException;
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
|
|
||||||
import org.eclipse.jetty.util.log.Log;
|
import org.eclipse.jetty.util.log.Log;
|
||||||
import org.eclipse.jetty.util.log.Logger;
|
import org.eclipse.jetty.util.log.Logger;
|
||||||
import org.eclipse.jetty.util.thread.ExecutionStrategy;
|
import org.eclipse.jetty.util.thread.ExecutionStrategy;
|
||||||
|
@ -35,7 +36,6 @@ public abstract class ExecutingExecutionStrategy implements ExecutionStrategy
|
||||||
_executor=executor;
|
_executor=executor;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
protected boolean execute(Runnable task)
|
protected boolean execute(Runnable task)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
|
@ -61,5 +61,4 @@ public abstract class ExecutingExecutionStrategy implements ExecutionStrategy
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue