Jetty 9.4.x 572 (#614)

* Issue #572 Scheduling Strategy Deadlocks

Implemented dual strategy idea from #572 discussion

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* fixed http2 strategy choice

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* code cleanups

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* clean up seelctor actions/products

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* cleanups
This commit is contained in:
Greg Wilkins 2016-06-03 09:07:00 +10:00
parent 0578d15813
commit 98c328fb23
38 changed files with 784 additions and 815 deletions

View File

@ -31,6 +31,7 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
/**
* {@link HttpSender} abstracts the algorithm to send HTTP requests, so that subclasses only implement
@ -675,9 +676,9 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
{
@Override
public boolean isNonBlocking()
public InvocationType getInvocationType()
{
return content.isNonBlocking();
return content.getInvocationType();
}
@Override
@ -891,9 +892,9 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
private class LastContentCallback implements Callback
{
@Override
public boolean isNonBlocking()
public InvocationType getInvocationType()
{
return content.isNonBlocking();
return content.getInvocationType();
}
@Override

View File

@ -29,6 +29,7 @@ import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
/**
* A {@link ContentProvider} that provides content asynchronously through an {@link OutputStream}
@ -79,9 +80,9 @@ public class OutputStreamContentProvider implements AsyncContentProvider, Callba
private final OutputStream output = new DeferredOutputStream();
@Override
public boolean isNonBlocking()
public InvocationType getInvocationType()
{
return deferred.isNonBlocking();
return deferred.getInvocationType();
}
@Override

View File

@ -228,16 +228,6 @@ public class HTTP2Client extends ContainerLifeCycle
this.flowControlStrategyFactory = flowControlStrategyFactory;
}
public ExecutionStrategy.Factory getExecutionStrategyFactory()
{
return executionStrategyFactory;
}
public void setExecutionStrategyFactory(ExecutionStrategy.Factory executionStrategyFactory)
{
this.executionStrategyFactory = executionStrategyFactory;
}
@ManagedAttribute("The number of selectors")
public int getSelectors()
{

View File

@ -69,7 +69,7 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
HTTP2ClientSession session = new HTTP2ClientSession(scheduler, endPoint, generator, listener, flowControl);
Parser parser = new Parser(byteBufferPool, session, 4096, 8192);
HTTP2ClientConnection connection = new HTTP2ClientConnection(client, byteBufferPool, executor, endPoint,
parser, session, client.getInputBufferSize(), client.getExecutionStrategyFactory(), promise, listener);
parser, session, client.getInputBufferSize(), promise, listener);
connection.addListener(connectionListener);
return customize(connection, context);
}
@ -80,9 +80,9 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
private final Promise<Session> promise;
private final Session.Listener listener;
private HTTP2ClientConnection(HTTP2Client client, ByteBufferPool byteBufferPool, Executor executor, EndPoint endpoint, Parser parser, ISession session, int bufferSize, ExecutionStrategy.Factory executionFactory, Promise<Session> promise, Session.Listener listener)
private HTTP2ClientConnection(HTTP2Client client, ByteBufferPool byteBufferPool, Executor executor, EndPoint endpoint, Parser parser, ISession session, int bufferSize, Promise<Session> promise, Session.Listener listener)
{
super(byteBufferPool, executor, endpoint, parser, session, bufferSize, executionFactory);
super(byteBufferPool, executor, endpoint, parser, session, bufferSize);
this.client = client;
this.promise = promise;
this.listener = listener;

View File

@ -63,6 +63,7 @@ import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
import org.junit.After;
import org.junit.Assert;
import org.junit.Ignore;
@ -834,8 +835,14 @@ public abstract class FlowControlStrategyTest
Stream stream = completable.get(5, TimeUnit.SECONDS);
ByteBuffer data = ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
final CountDownLatch dataLatch = new CountDownLatch(1);
stream.data(new DataFrame(stream.getId(), data, false), new Callback.NonBlocking()
stream.data(new DataFrame(stream.getId(), data, false), new Callback()
{
@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
@Override
public void succeeded()
{
@ -899,8 +906,14 @@ public abstract class FlowControlStrategyTest
Stream stream = streamPromise.get(5, TimeUnit.SECONDS);
ByteBuffer data = ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
final CountDownLatch dataLatch = new CountDownLatch(1);
stream.data(new DataFrame(stream.getId(), data, false), new Callback.NonBlocking()
stream.data(new DataFrame(stream.getId(), data, false), new Callback()
{
@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
@Override
public void succeeded()
{
@ -974,8 +987,14 @@ public abstract class FlowControlStrategyTest
// Perform a big upload that will stall the flow control windows.
ByteBuffer data = ByteBuffer.allocate(5 * FlowControlStrategy.DEFAULT_WINDOW_SIZE);
final CountDownLatch dataLatch = new CountDownLatch(1);
stream.data(new DataFrame(stream.getId(), data, true), new Callback.NonBlocking()
stream.data(new DataFrame(stream.getId(), data, true), new Callback()
{
@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
@Override
public void failed(Throwable x)
{

View File

@ -47,6 +47,7 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
@ -463,8 +464,13 @@ public class IdleTimeoutTest extends AbstractTest
{
sleep(idleTimeout / 2);
final boolean last = ++sends == 2;
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), last), !last ? this : new Callback.NonBlocking()
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), last), !last ? this : new Callback()
{
@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
@Override
public void succeeded()
{

View File

@ -43,6 +43,7 @@ import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.util.ByteArrayOutputStream2;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.IO;
@ -101,9 +102,9 @@ public class SmallThreadPoolLoadTest extends AbstractTest
final Thread testThread = Thread.currentThread();
Scheduler.Task task = client.getScheduler().schedule(() ->
{
logger.warn("Interrupting test, it is taking too long{}{}{}{}",
System.lineSeparator(), server.dump(),
System.lineSeparator(), client.dump());
logger.warn("Interrupting test, it is taking too long{}Server:{}{}{}Client:{}{}",
System.lineSeparator(), System.lineSeparator(), server.dump(),
System.lineSeparator(), System.lineSeparator(), client.dump());
testThread.interrupt();
}, iterations * factor, TimeUnit.MILLISECONDS);
@ -186,9 +187,9 @@ public class SmallThreadPoolLoadTest extends AbstractTest
if (success)
latch.countDown();
else
logger.warn("Request {} took too long{}{}{}{}", requestId,
System.lineSeparator(), server.dump(),
System.lineSeparator(), client.dump());
logger.warn("Request {} took too long{}Server:{}{}{}Client:{}{}", requestId,
System.lineSeparator(), System.lineSeparator(), server.dump(),
System.lineSeparator(), System.lineSeparator(), client.dump());
return !reset.get();
}
@ -209,7 +210,10 @@ public class SmallThreadPoolLoadTest extends AbstractTest
}
case "POST":
{
IO.copy(request.getInputStream(), response.getOutputStream());
int content_length=request.getContentLength();
ByteArrayOutputStream2 bout = new ByteArrayOutputStream2(content_length>0?content_length:16*1024);
IO.copy(request.getInputStream(), bout);
response.getOutputStream().write(bout.getBuf(),0,bout.getCount());
break;
}
}

View File

@ -33,6 +33,10 @@ import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
import org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume;
import org.eclipse.jetty.util.thread.strategy.ProduceExecuteConsume;
public class HTTP2Connection extends AbstractConnection
{
@ -44,16 +48,18 @@ public class HTTP2Connection extends AbstractConnection
private final ISession session;
private final int bufferSize;
private final HTTP2Producer producer = new HTTP2Producer();
private final ExecutionStrategy executionStrategy;
private final ExecutionStrategy blockingStrategy;
private final ExecutionStrategy nonBlockingStrategy;
public HTTP2Connection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize, ExecutionStrategy.Factory executionFactory)
public HTTP2Connection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize)
{
super(endPoint, executor);
this.byteBufferPool = byteBufferPool;
this.parser = parser;
this.session = session;
this.bufferSize = bufferSize;
this.executionStrategy = executionFactory.newExecutionStrategy(producer, executor);
this.blockingStrategy = new ExecuteProduceConsume(producer, executor);
this.nonBlockingStrategy = new ProduceExecuteConsume(producer, executor);
}
public ISession getSession()
@ -78,7 +84,7 @@ public class HTTP2Connection extends AbstractConnection
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 Open {} ", this);
super.onOpen();
executionStrategy.execute();
blockingStrategy.produce();
}
@Override
@ -89,12 +95,25 @@ public class HTTP2Connection extends AbstractConnection
super.onClose();
}
@Override
public void onFillable()
{
throw new UnsupportedOperationException();
}
private void onFillableBlocking()
{
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 onFillable {} ", this);
executionStrategy.execute();
LOG.debug("HTTP2 onFillableBlocking {} ", this);
blockingStrategy.produce();
}
private void onFillableNonBlocking()
{
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 onFillableNonBlocking {} ", this);
nonBlockingStrategy.produce();
}
private int fill(EndPoint endPoint, ByteBuffer buffer)
@ -125,10 +144,16 @@ public class HTTP2Connection extends AbstractConnection
protected void offerTask(Runnable task, boolean dispatch)
{
tasks.offer(task);
// Because producing calls parse and parse can call offerTask, we have to make sure
// we use the same strategy otherwise produce can be reentrant and that messes with
// the release mechanism. TODO is this test sufficient to protect from this?
ExecutionStrategy s = Invocable.isNonBlockingInvocation()?nonBlockingStrategy:blockingStrategy;
if (dispatch)
executionStrategy.dispatch();
// TODO Why again is this necessary?
s.dispatch();
else
executionStrategy.execute();
s.produce();
}
@Override
@ -141,12 +166,12 @@ public class HTTP2Connection extends AbstractConnection
protected class HTTP2Producer implements ExecutionStrategy.Producer
{
private final Callback fillCallback = new FillCallback();
private final Callback fillableCallback = new FillableCallback();
private ByteBuffer buffer;
@Override
public Runnable produce()
{
public synchronized Runnable produce()
{
Runnable task = tasks.poll();
if (LOG.isDebugEnabled())
LOG.debug("Dequeued task {}", task);
@ -183,7 +208,7 @@ public class HTTP2Connection extends AbstractConnection
if (filled == 0)
{
release();
getEndPoint().fillInterested(fillCallback);
getEndPoint().fillInterested(fillableCallback);
return null;
}
else if (filled < 0)
@ -207,12 +232,21 @@ public class HTTP2Connection extends AbstractConnection
}
}
private class FillCallback implements Callback.NonBlocking
private class FillableCallback implements Callback
{
@Override
public InvocationType getInvocationType()
{
return InvocationType.EITHER;
}
@Override
public void succeeded()
{
onFillable();
if (Invocable.isNonBlockingInvocation())
onFillableNonBlocking();
else
onFillableBlocking();
}
@Override

View File

@ -147,16 +147,6 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
this.flowControlStrategyFactory = flowControlStrategyFactory;
}
public ExecutionStrategy.Factory getExecutionStrategyFactory()
{
return executionStrategyFactory;
}
public void setExecutionStrategyFactory(ExecutionStrategy.Factory executionStrategyFactory)
{
this.executionStrategyFactory = executionStrategyFactory;
}
public HttpConfiguration getHttpConfiguration()
{
return httpConfiguration;
@ -181,7 +171,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
ServerParser parser = newServerParser(connector, session);
HTTP2Connection connection = new HTTP2ServerConnection(connector.getByteBufferPool(), connector.getExecutor(),
endPoint, httpConfiguration, parser, session, getInputBufferSize(), getExecutionStrategyFactory(), listener);
endPoint, httpConfiguration, parser, session, getInputBufferSize(), listener);
connection.addListener(connectionListener);
return configure(connection, connector, endPoint);
}

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.http2.server;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@ -62,9 +63,9 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
private final HttpConfiguration httpConfig;
private final List<Frame> upgradeFrames = new ArrayList<>();
public HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ExecutionStrategy.Factory executionFactory, ServerSessionListener listener)
public HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ServerSessionListener listener)
{
super(byteBufferPool, executor, endPoint, parser, session, inputBufferSize, executionFactory);
super(byteBufferPool, executor, endPoint, parser, session, inputBufferSize);
this.listener = listener;
this.httpConfig = httpConfig;
}
@ -189,7 +190,7 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
return true;
}
private class ServerHttpChannelOverHTTP2 extends HttpChannelOverHTTP2 implements ExecutionStrategy.Rejectable
private class ServerHttpChannelOverHTTP2 extends HttpChannelOverHTTP2 implements Closeable
{
public ServerHttpChannelOverHTTP2(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransportOverHTTP2 transport)
{
@ -211,7 +212,7 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
}
@Override
public void reject()
public void close()
{
IStream stream = getStream();
stream.reset(new ResetFrame(stream.getId(), ErrorCode.ENHANCE_YOUR_CALM_ERROR.code), Callback.NOOP);

View File

@ -44,6 +44,7 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
public class HttpChannelOverHTTP2 extends HttpChannel
{
@ -207,9 +208,9 @@ public class HttpChannelOverHTTP2 extends HttpChannel
boolean handle = onContent(new HttpInput.Content(copy)
{
@Override
public boolean isNonBlocking()
public InvocationType getInvocationType()
{
return callback.isNonBlocking();
return callback.getInvocationType();
}
@Override

View File

@ -38,6 +38,7 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
public class HttpTransportOverHTTP2 implements HttpTransport
{
@ -100,8 +101,15 @@ public class HttpTransportOverHTTP2 implements HttpTransport
{
if (hasContent)
{
commit(info, false, new Callback.NonBlocking()
commit(info, false, new Callback()
{
@Override
public InvocationType getInvocationType()
{
// TODO is this dependent on the callback itself?
return InvocationType.NON_BLOCKING;
}
@Override
public void succeeded()
{

View File

@ -27,6 +27,7 @@ import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Invocable;
/**
* <p>A convenience base implementation of {@link Connection}.</p>
@ -82,50 +83,45 @@ public abstract class AbstractConnection implements Connection
return _executor;
}
@Deprecated
public boolean isDispatchIO()
{
return false;
}
protected void failedCallback(final Callback callback, final Throwable x)
{
if (callback.isNonBlocking())
Runnable failCallback = new Runnable()
{
try
@Override
public void run()
{
callback.failed(x);
}
catch (Exception e)
{
LOG.warn(e);
}
}
else
{
try
{
getExecutor().execute(new Runnable()
try
{
@Override
public void run()
{
try
{
callback.failed(x);
}
catch (Exception e)
{
LOG.warn(e);
}
}
});
}
catch(RejectedExecutionException e)
{
LOG.debug(e);
callback.failed(x);
callback.failed(x);
}
catch (Exception e)
{
LOG.warn(e);
}
}
};
switch(Invocable.getInvocationType(callback))
{
case BLOCKING:
try
{
getExecutor().execute(failCallback);
}
catch(RejectedExecutionException e)
{
LOG.debug(e);
callback.failed(x);
}
break;
case NON_BLOCKING:
failCallback.run();
break;
case EITHER:
Invocable.invokeNonBlocking(failCallback);
}
}

View File

@ -29,6 +29,7 @@ import java.nio.channels.SelectionKey;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Scheduler;
@ -57,7 +58,7 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
*/
protected int _desiredInterestOps;
private abstract class RunnableTask implements Runnable
private abstract class RunnableTask implements Runnable, Invocable
{
private final String _operation;
@ -96,6 +97,12 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
private final Runnable _runUpdateKey = new RunnableTask("runUpdateKey")
{
@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
@Override
public void run()
{
@ -105,6 +112,12 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
private final Runnable _runFillable = new RunnableCloseable("runFillable")
{
@Override
public InvocationType getInvocationType()
{
return getFillInterest().getCallbackInvocationType();
}
@Override
public void run()
{
@ -114,6 +127,12 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
private final Runnable _runCompleteWrite = new RunnableCloseable("runCompleteWrite")
{
@Override
public InvocationType getInvocationType()
{
return getWriteFlusher().getCallbackInvocationType();
}
@Override
public void run()
{
@ -123,6 +142,23 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
private final Runnable _runCompleteWriteFillable = new RunnableCloseable("runCompleteWriteFillable")
{
@Override
public InvocationType getInvocationType()
{
InvocationType fillT = getFillInterest().getCallbackInvocationType();
InvocationType flushT = getWriteFlusher().getCallbackInvocationType();
if (fillT==flushT)
return fillT;
if (fillT==InvocationType.EITHER && flushT==InvocationType.NON_BLOCKING)
return InvocationType.EITHER;
if (fillT==InvocationType.NON_BLOCKING && flushT==InvocationType.EITHER)
return InvocationType.EITHER;
return InvocationType.BLOCKING;
}
@Override
public void run()
{
@ -302,37 +338,20 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
_desiredInterestOps = newInterestOps;
}
boolean readable = (readyOps & SelectionKey.OP_READ) != 0;
boolean writable = (readyOps & SelectionKey.OP_WRITE) != 0;
boolean fillable = (readyOps & SelectionKey.OP_READ) != 0;
boolean flushable = (readyOps & SelectionKey.OP_WRITE) != 0;
if (LOG.isDebugEnabled())
LOG.debug("onSelected {}->{} r={} w={} for {}", oldInterestOps, newInterestOps, readable, writable, this);
// Run non-blocking code immediately.
// This producer knows that this non-blocking code is special
// and that it must be run in this thread and not fed to the
// ExecutionStrategy, which could not have any thread to run these
// tasks (or it may starve forever just after having run them).
if (readable && getFillInterest().isCallbackNonBlocking())
{
if (LOG.isDebugEnabled())
LOG.debug("Direct readable run {}",this);
_runFillable.run();
readable = false;
}
if (writable && getWriteFlusher().isCallbackNonBlocking())
{
if (LOG.isDebugEnabled())
LOG.debug("Direct writable run {}",this);
_runCompleteWrite.run();
writable = false;
}
LOG.debug("onSelected {}->{} r={} w={} for {}", oldInterestOps, newInterestOps, fillable, flushable, this);
// return task to complete the job
Runnable task= readable ? (writable ? _runCompleteWriteFillable : _runFillable)
: (writable ? _runCompleteWrite : null);
Runnable task= fillable
? (flushable
? _runCompleteWriteFillable
: _runFillable)
: (flushable
? _runCompleteWrite
: null);
if (LOG.isDebugEnabled())
LOG.debug("task {}",task);

View File

@ -28,6 +28,8 @@ import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
/**
* A Utility class to help implement {@link EndPoint#fillInterested(Callback)}
@ -105,10 +107,10 @@ public abstract class FillInterest
return _interested.get() != null;
}
public boolean isCallbackNonBlocking()
public InvocationType getCallbackInvocationType()
{
Callback callback = _interested.get();
return callback!=null && callback.isNonBlocking();
return Invocable.getInvocationType(callback);
}
/**

View File

@ -43,8 +43,12 @@ import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume;
import org.eclipse.jetty.util.thread.strategy.ProduceExecuteConsume;
/**
* <p>{@link ManagedSelector} wraps a {@link Selector} simplifying non-blocking operations on channels.</p>
@ -52,7 +56,7 @@ import org.eclipse.jetty.util.thread.Scheduler;
* happen for registered channels. When events happen, it notifies the {@link EndPoint} associated
* with the channel.</p>
*/
public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable
public class ManagedSelector extends AbstractLifeCycle implements Dumpable
{
private static final Logger LOG = Log.getLogger(ManagedSelector.class);
@ -62,24 +66,90 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
private final SelectorManager _selectorManager;
private final int _id;
private final ExecutionStrategy _strategy;
private final ExecutionStrategy _lowPriorityStrategy;
private Selector _selector;
private final Runnable _runStrategy = new Runnable()
{
@Override
public void run()
{
_strategy.produce();
}
};
private final Runnable _runLowPriorityStrategy = new Runnable()
{
@Override
public void run()
{
Thread current = Thread.currentThread();
String name = current.getName();
int priority = current.getPriority();
try
{
while (isRunning())
{
try
{
current.setPriority(Thread.MIN_PRIORITY);
current.setName(name+"-lowPrioSelector");
_lowPriorityStrategy.produce();
}
catch (Throwable th)
{
LOG.warn(th);
}
}
}
finally
{
current.setPriority(priority);
current.setName(name);
}
}
};
public ManagedSelector(SelectorManager selectorManager, int id)
{
this(selectorManager, id, ExecutionStrategy.Factory.getDefault());
}
public ManagedSelector(SelectorManager selectorManager, int id, ExecutionStrategy.Factory executionFactory)
{
_selectorManager = selectorManager;
_id = id;
_strategy = executionFactory.newExecutionStrategy(new SelectorProducer(), selectorManager.getExecutor());
setStopTimeout(5000);
}
SelectorProducer producer = new SelectorProducer();
_strategy = new ExecuteProduceConsume(producer, selectorManager.getExecutor(), Invocable.InvocationType.BLOCKING);
_lowPriorityStrategy = new ProduceExecuteConsume(producer, selectorManager.getExecutor(), Invocable.InvocationType.BLOCKING)
{
@Override
protected boolean execute(Runnable task)
{
try
{
Invocable.InvocationType invocation=Invocable.getInvocationType(task);
if (LOG.isDebugEnabled())
LOG.debug("Low Prio Selector execute {} {}",invocation,task);
switch (Invocable.getInvocationType(task))
{
case NON_BLOCKING:
task.run();
return true;
public ExecutionStrategy getExecutionStrategy()
{
return _strategy;
case EITHER:
Invocable.invokeNonBlocking(task);
return true;
default:
}
return super.execute(task);
}
finally
{
// Allow opportunity for main strategy to take over
Thread.yield();
}
}
};
setStopTimeout(5000);
}
@Override
@ -87,7 +157,8 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
{
super.doStart();
_selector = _selectorManager.newSelector();
_selectorManager.execute(this);
_selectorManager.execute(_runStrategy);
_selectorManager.execute(_runLowPriorityStrategy);
}
public int size()
@ -135,12 +206,6 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
selector.wakeup();
}
@Override
public void run()
{
_strategy.execute();
}
/**
* A {@link Selectable} is an {@link EndPoint} that wish to be
* notified of non-blocking events by the {@link ManagedSelector}.
@ -168,7 +233,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
private Iterator<SelectionKey> _cursor = Collections.emptyIterator();
@Override
public Runnable produce()
public synchronized Runnable produce()
{
while (true)
{
@ -176,7 +241,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
if (task != null)
return task;
Runnable action = runActions();
Runnable action = nextAction();
if (action != null)
return action;
@ -187,7 +252,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
}
}
private Runnable runActions()
private Runnable nextAction()
{
while (true)
{
@ -203,25 +268,20 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
}
}
if (action instanceof Product)
if (Invocable.getInvocationType(action)==InvocationType.BLOCKING)
return action;
// Running the change may queue another action.
runChange(action);
}
}
private void runChange(Runnable change)
{
try
{
if (LOG.isDebugEnabled())
LOG.debug("Running change {}", change);
change.run();
}
catch (Throwable x)
{
LOG.debug("Could not run change " + change, x);
try
{
if (LOG.isDebugEnabled())
LOG.debug("Running action {}", action);
// Running the change may queue another action.
action.run();
}
catch (Throwable x)
{
LOG.debug("Could not run action " + action, x);
}
}
}
@ -334,9 +394,14 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
((Selectable)attachment).updateKey();
}
}
private interface Product extends Runnable
private abstract static class NonBlockingAction implements Runnable, Invocable
{
@Override
public final InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
}
private Runnable processConnect(SelectionKey key, final Connect connect)
@ -427,7 +492,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
public void destroyEndPoint(final EndPoint endPoint)
{
final Connection connection = endPoint.getConnection();
submit((Product)() ->
submit((Runnable)() ->
{
if (LOG.isDebugEnabled())
LOG.debug("Destroyed {}", endPoint);
@ -518,7 +583,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
}
}
class Acceptor implements Runnable
class Acceptor extends NonBlockingAction
{
private final SelectableChannel _channel;
@ -544,7 +609,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
}
}
class Accept implements Runnable, Closeable
class Accept extends NonBlockingAction implements Closeable
{
private final SelectableChannel channel;
private final Object attachment;
@ -578,7 +643,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
}
}
private class CreateEndPoint implements Product, Closeable
private class CreateEndPoint implements Runnable, Closeable
{
private final SelectableChannel channel;
private final SelectionKey key;
@ -617,7 +682,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
}
}
class Connect implements Runnable
class Connect extends NonBlockingAction
{
private final AtomicBoolean failed = new AtomicBoolean();
private final SelectableChannel channel;
@ -655,7 +720,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
}
}
private class ConnectTimeout implements Runnable
private class ConnectTimeout extends NonBlockingAction
{
private final Connect connect;
@ -677,7 +742,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
}
}
private class CloseEndPoints implements Runnable
private class CloseEndPoints extends NonBlockingAction
{
private final CountDownLatch _latch = new CountDownLatch(1);
private CountDownLatch _allClosed;
@ -724,7 +789,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
}
}
private class EndPointCloser implements Product
private class EndPointCloser implements Runnable
{
private final EndPoint _endPoint;
private final CountDownLatch _latch;
@ -743,7 +808,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
}
}
private class CloseSelector implements Runnable
private class CloseSelector extends NonBlockingAction
{
private CountDownLatch _latch = new CountDownLatch(1);

View File

@ -51,7 +51,6 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump
private final Scheduler scheduler;
private final ManagedSelector[] _selectors;
private long _connectTimeout = DEFAULT_CONNECT_TIMEOUT;
private ExecutionStrategy.Factory _executionFactory = ExecutionStrategy.Factory.getDefault();
private long _selectorIndex;
protected SelectorManager(Executor executor, Scheduler scheduler)
@ -98,43 +97,6 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump
_connectTimeout = milliseconds;
}
/**
* @return the {@link ExecutionStrategy.Factory} used by {@link ManagedSelector}
*/
public ExecutionStrategy.Factory getExecutionStrategyFactory()
{
return _executionFactory;
}
/**
* @param _executionFactory the {@link ExecutionStrategy.Factory} used by {@link ManagedSelector}
*/
public void setExecutionStrategyFactory(ExecutionStrategy.Factory _executionFactory)
{
if (isRunning())
throw new IllegalStateException("Cannot change " + ExecutionStrategy.Factory.class.getSimpleName() + " after start()");
this._executionFactory = _executionFactory;
}
/**
* @return the selector priority delta
* @deprecated not implemented
*/
@Deprecated
public int getSelectorPriorityDelta()
{
return 0;
}
/**
* @param selectorPriorityDelta the selector priority delta
* @deprecated not implemented
*/
@Deprecated
public void setSelectorPriorityDelta(int selectorPriorityDelta)
{
}
/**
* Executes the given task in a different thread.
*
@ -286,7 +248,7 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump
*/
protected ManagedSelector newSelector(int id)
{
return new ManagedSelector(this, id, getExecutionStrategyFactory());
return new ManagedSelector(this, id);
}
@Override

View File

@ -32,6 +32,8 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
/**
@ -269,16 +271,18 @@ abstract public class WriteFlusher
_callback.succeeded();
}
boolean isCallbackNonBlocking()
InvocationType getCallbackInvocationType()
{
return _callback!=null && _callback.isNonBlocking();
return Invocable.getInvocationType(_callback);
}
}
public boolean isCallbackNonBlocking()
public InvocationType getCallbackInvocationType()
{
State s = _state.get();
return (s instanceof PendingState) && ((PendingState)s).isCallbackNonBlocking();
return (s instanceof PendingState)
?((PendingState)s).getCallbackInvocationType()
:Invocable.InvocationType.BLOCKING;
}
/**

View File

@ -36,10 +36,11 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.util.BlockingCallback;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.SharedBlockingCallback;
import org.eclipse.jetty.util.SharedBlockingCallback.Blocker;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
@ -644,19 +645,21 @@ public class WriteFlusherTest
}
};
BlockingCallback callback = new BlockingCallback();
writeFlusher.write(callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow."));
exchange.exchange(0);
try(Blocker blocker = new SharedBlockingCallback().acquire())
{
writeFlusher.write(blocker,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow."));
exchange.exchange(0);
Assert.assertThat(endp.takeOutputString(StandardCharsets.US_ASCII),Matchers.equalTo("How now br"));
Assert.assertThat(endp.takeOutputString(StandardCharsets.US_ASCII),Matchers.equalTo("How now br"));
exchange.exchange(1);
exchange.exchange(0);
exchange.exchange(1);
exchange.exchange(0);
Assert.assertThat(endp.takeOutputString(StandardCharsets.US_ASCII),Matchers.equalTo("o"));
Assert.assertThat(endp.takeOutputString(StandardCharsets.US_ASCII),Matchers.equalTo("o"));
exchange.exchange(8);
callback.block();
exchange.exchange(8);
blocker.block();
}
Assert.assertThat(endp.takeOutputString(StandardCharsets.US_ASCII),Matchers.equalTo("wn cow."));

View File

@ -151,7 +151,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
private String _defaultProtocol;
private ConnectionFactory _defaultConnectionFactory;
private String _name;
private int _acceptorPriorityDelta;
private int _acceptorPriorityDelta=-2;
/**

View File

@ -44,6 +44,7 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
/**
* <p>A {@link Connection} that handles the HTTP protocol.</p>
@ -609,11 +610,11 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
@Override
public boolean isNonBlocking()
public InvocationType getInvocationType()
{
// This callback does not block, rather it wakes up the
// thread that is blocked waiting on the read.
return true;
return InvocationType.NON_BLOCKING;
}
}
@ -652,9 +653,9 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
@Override
public boolean isNonBlocking()
public InvocationType getInvocationType()
{
return _callback.isNonBlocking();
return _callback.getInvocationType();
}
private boolean reset(MetaData.Response info, boolean head, ByteBuffer content, boolean last, Callback callback)

View File

@ -36,6 +36,7 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
/**
* {@link HttpInput} provides an implementation of {@link ServletInputStream} for {@link HttpChannel}.
@ -708,12 +709,11 @@ public class HttpInput extends ServletInputStream implements Runnable
}
@Override
public boolean isNonBlocking()
public InvocationType getInvocationType()
{
return true;
return InvocationType.NON_BLOCKING;
}
public ByteBuffer getContent()
{
return _content;

View File

@ -250,26 +250,6 @@ public class ServerConnector extends AbstractNetworkConnector
return channel!=null && channel.isOpen();
}
/**
* @return the selector priority delta
* @deprecated not implemented
*/
@Deprecated
public int getSelectorPriorityDelta()
{
return _manager.getSelectorPriorityDelta();
}
/**
* @param selectorPriorityDelta the selector priority delta
* @deprecated not implemented
*/
@Deprecated
public void setSelectorPriorityDelta(int selectorPriorityDelta)
{
_manager.setSelectorPriorityDelta(selectorPriorityDelta);
}
/**
* @return whether this connector uses a channel inherited from the JVM.
* @see System#inheritedChannel()
@ -478,22 +458,6 @@ public class ServerConnector extends AbstractNetworkConnector
_reuseAddress = reuseAddress;
}
/**
* @return the ExecutionStrategy factory to use for SelectorManager
*/
public ExecutionStrategy.Factory getExecutionStrategyFactory()
{
return _manager.getExecutionStrategyFactory();
}
/**
* @param executionFactory the ExecutionStrategy factory to use for SelectorManager
*/
public void setExecutionStrategyFactory(ExecutionStrategy.Factory executionFactory)
{
_manager.setExecutionStrategyFactory(executionFactory);
}
protected class ServerConnectorManager extends SelectorManager
{
public ServerConnectorManager(Executor executor, Scheduler scheduler, int selectors)

View File

@ -18,50 +18,48 @@
package org.eclipse.jetty.server;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume;
import org.eclipse.jetty.util.thread.strategy.ProduceExecuteConsume;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
public class ThreadStarvationTest
{
final static int BUFFER_SIZE=1024*1024;
final static int BUFFERS=64;
final static int CLIENTS=10;
final static int THREADS=5;
@Rule
public TestTracker tracker = new TestTracker();
private QueuedThreadPool _threadPool;
private Server _server;
private ServerConnector _connector;
private int _availableThreads;
private Server prepareServer(Handler handler)
{
int threads = 4;
_threadPool = new QueuedThreadPool();
_threadPool.setMinThreads(threads);
_threadPool.setMaxThreads(threads);
_threadPool.setMinThreads(THREADS);
_threadPool.setMaxThreads(THREADS);
_threadPool.setDetailedDump(true);
_server = new Server(_threadPool);
int acceptors = 1;
@ -69,7 +67,6 @@ public class ThreadStarvationTest
_connector = new ServerConnector(_server, acceptors, selectors);
_server.addConnector(_connector);
_server.setHandler(handler);
_availableThreads = threads - acceptors - selectors;
return _server;
}
@ -83,47 +80,36 @@ public class ThreadStarvationTest
public void testReadInput() throws Exception
{
prepareServer(new ReadHandler()).start();
try(Socket client = new Socket("localhost", _connector.getLocalPort()))
{
client.setSoTimeout(10000);
OutputStream os = client.getOutputStream();
InputStream is = client.getInputStream();
Socket client = new Socket("localhost", _connector.getLocalPort());
String request = "" +
"GET / HTTP/1.0\r\n" +
"Host: localhost\r\n" +
"Content-Length: 10\r\n" +
"\r\n" +
"0123456789\r\n";
os.write(request.getBytes(StandardCharsets.UTF_8));
os.flush();
OutputStream os = client.getOutputStream();
InputStream is = client.getInputStream();
String request = "" +
"GET / HTTP/1.0\r\n" +
"Host: localhost\r\n" +
"Content-Length: 10\r\n" +
"\r\n" +
"0123456789\r\n";
os.write(request.getBytes(StandardCharsets.UTF_8));
os.flush();
String response = IO.toString(is);
assertEquals(-1, is.read());
assertThat(response, containsString("200 OK"));
assertThat(response, containsString("Read Input 10"));
String response = IO.toString(is);
assertEquals(-1, is.read());
assertThat(response, containsString("200 OK"));
assertThat(response, containsString("Read Input 10"));
}
}
@Test
public void testEPCStarvation() throws Exception
{
testStarvation(new ExecuteProduceConsume.Factory());
}
@Test
public void testPECStarvation() throws Exception
{
testStarvation(new ProduceExecuteConsume.Factory());
}
private void testStarvation(ExecutionStrategy.Factory executionFactory) throws Exception
public void testReadStarvation() throws Exception
{
prepareServer(new ReadHandler());
_connector.setExecutionStrategyFactory(executionFactory);
_server.start();
System.err.println(_threadPool.dump());
Socket[] client = new Socket[_availableThreads + 1];
Socket[] client = new Socket[CLIENTS];
OutputStream[] os = new OutputStream[client.length];
InputStream[] is = new InputStream[client.length];
@ -146,7 +132,6 @@ public class ThreadStarvationTest
}
Thread.sleep(500);
System.err.println(_threadPool.dump());
for (int i = 0; i < client.length; i++)
{
@ -155,7 +140,6 @@ public class ThreadStarvationTest
}
Thread.sleep(500);
System.err.println(_threadPool.dump());
for (int i = 0; i < client.length; i++)
{
@ -166,84 +150,6 @@ public class ThreadStarvationTest
}
}
@Test
public void testEPCExitsLowThreadsMode() throws Exception
{
prepareServer(new ReadHandler());
_threadPool.setMaxThreads(5);
_connector.setExecutionStrategyFactory(new ExecuteProduceConsume.Factory());
_server.start();
System.err.println(_server.dump());
// Three idle threads in the pool here.
// The server will accept the socket in normal mode.
Socket client = new Socket("localhost", _connector.getLocalPort());
client.setSoTimeout(10000);
Thread.sleep(500);
// Now steal two threads.
CountDownLatch[] latches = new CountDownLatch[2];
for (int i = 0; i < latches.length; ++i)
{
CountDownLatch latch = latches[i] = new CountDownLatch(1);
_threadPool.execute(() ->
{
try
{
latch.await();
}
catch (InterruptedException ignored)
{
}
});
}
InputStream is = client.getInputStream();
OutputStream os = client.getOutputStream();
String request = "" +
"PUT / HTTP/1.0\r\n" +
"Host: localhost\r\n" +
"Content-Length: 10\r\n" +
"\r\n" +
"1";
os.write(request.getBytes(StandardCharsets.UTF_8));
os.flush();
Thread.sleep(500);
System.err.println(_threadPool.dump());
// Request did not send the whole body, Handler
// is blocked reading, zero idle threads here,
// EPC is in low threads mode.
for (ManagedSelector selector : _connector.getSelectorManager().getBeans(ManagedSelector.class))
{
ExecuteProduceConsume executionStrategy = (ExecuteProduceConsume)selector.getExecutionStrategy();
assertTrue(executionStrategy.isLowOnThreads());
}
// Release the stolen threads.
for (CountDownLatch latch : latches)
latch.countDown();
Thread.sleep(500);
// Send the rest of the body to unblock the reader thread.
// This will be run directly by the selector thread,
// which then will exit the low threads mode.
os.write("234567890".getBytes(StandardCharsets.UTF_8));
os.flush();
Thread.sleep(500);
System.err.println(_threadPool.dump());
for (ManagedSelector selector : _connector.getSelectorManager().getBeans(ManagedSelector.class))
{
ExecuteProduceConsume executionStrategy = (ExecuteProduceConsume)selector.getExecutionStrategy();
assertFalse(executionStrategy.isLowOnThreads());
}
}
protected static class ReadHandler extends AbstractHandler
{
@Override
@ -256,11 +162,152 @@ public class ThreadStarvationTest
int r = 0;
while (r < l)
{
if (request.getInputStream().read() >= 0)
r++;
if (request.getInputStream().read() < 0)
break;
r++;
}
response.getOutputStream().write(("Read Input " + r + "\r\n").getBytes());
}
}
@Test
public void testWriteStarvation() throws Exception
{
prepareServer(new WriteHandler());
_server.start();
Socket[] client = new Socket[CLIENTS];
OutputStream[] os = new OutputStream[client.length];
final InputStream[] is = new InputStream[client.length];
for (int i = 0; i < client.length; i++)
{
client[i] = new Socket("localhost", _connector.getLocalPort());
client[i].setSoTimeout(10000);
os[i] = client[i].getOutputStream();
is[i] = client[i].getInputStream();
String request =
"GET / HTTP/1.0\r\n" +
"host: localhost\r\n" +
"\r\n";
os[i].write(request.getBytes(StandardCharsets.UTF_8));
os[i].flush();
}
Thread.sleep(100);
final AtomicLong total=new AtomicLong();
final CountDownLatch latch=new CountDownLatch(client.length);
for (int i = client.length; i-->0;)
{
final int c=i;
new Thread()
{
@Override
public void run()
{
byte[] content=new byte[BUFFER_SIZE];
int content_length=0;
String header= "No HEADER!";
try
{
// Read an initial content buffer
int len=0;
while (len<BUFFER_SIZE)
{
int l=is[c].read(content,len,content.length-len);
if (l<0)
throw new IllegalStateException();
len+=l;
content_length+=l;
}
// Look for the end of the header
int state=0;
loop: for(int j=0;j<len;j++)
{
content_length--;
switch(content[j])
{
case '\r':
state++;
break;
case '\n':
switch(state)
{
case 1:
state=2;
break;
case 3:
header=new String(content,0,j,StandardCharsets.ISO_8859_1);
assertThat(header,containsString(" 200 OK"));
break loop;
}
break;
default:
state=0;
break;
}
}
// Read the rest of the body
while(len>0)
{
len=is[c].read(content);
if (len>0)
content_length+=len;
}
// System.err.printf("client %d cl=%d %n%s%n",c,content_length,header);
total.addAndGet(content_length);
}
catch(Exception e)
{
e.printStackTrace();
}
finally
{
latch.countDown();
}
}
}.start();
}
latch.await();
assertEquals(CLIENTS*BUFFERS*BUFFER_SIZE,total.get());
}
protected static class WriteHandler extends AbstractHandler
{
byte[] content=new byte[BUFFER_SIZE];
{
Arrays.fill(content,(byte)'x');
}
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
response.setStatus(200);
OutputStream out = response.getOutputStream();
for (int i=0;i<BUFFERS;i++)
{
out.write(content);
out.flush();
}
}
}
}

View File

@ -1,100 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* An implementation of Callback that blocks until success or failure.
*/
@Deprecated
public class BlockingCallback implements Callback.NonBlocking
{
private static final Logger LOG = Log.getLogger(BlockingCallback.class);
private static Throwable SUCCEEDED = new Throwable()
{
@Override
public String toString() { return "SUCCEEDED"; }
};
private final CountDownLatch _latch = new CountDownLatch(1);
private final AtomicReference<Throwable> _state = new AtomicReference<>();
public BlockingCallback()
{
}
@Override
public void succeeded()
{
if (_state.compareAndSet(null,SUCCEEDED))
_latch.countDown();
}
@Override
public void failed(Throwable cause)
{
if (_state.compareAndSet(null,cause))
_latch.countDown();
}
/**
* Blocks until the Callback has succeeded or failed and
* after the return leave in the state to allow reuse.
* This is useful for code that wants to repeatable use a FutureCallback to convert
* an asynchronous API to a blocking API.
* @throws IOException if exception was caught during blocking, or callback was cancelled
*/
public void block() throws IOException
{
try
{
_latch.await();
Throwable state=_state.get();
if (state==SUCCEEDED)
return;
if (state instanceof IOException)
throw (IOException) state;
if (state instanceof CancellationException)
throw (CancellationException) state;
throw new IOException(state);
}
catch (final InterruptedException e)
{
throw new InterruptedIOException(){{initCause(e);}};
}
finally
{
_state.set(null);
}
}
@Override
public String toString()
{
return String.format("%s@%x{%s}",BlockingCallback.class.getSimpleName(),hashCode(),_state.get());
}
}

View File

@ -20,13 +20,15 @@ package org.eclipse.jetty.util;
import java.util.concurrent.CompletableFuture;
import org.eclipse.jetty.util.thread.Invocable;
/**
* <p>A callback abstraction that handles completed/failed events of asynchronous operations.</p>
*
* <p>Semantically this is equivalent to an optimise Promise&lt;Void&gt;, but callback is a more meaningful
* name than EmptyPromise</p>
*/
public interface Callback
public interface Callback extends Invocable
{
/**
* Instance of Adapter that can be used when the callback methods need an empty
@ -53,14 +55,6 @@ public interface Callback
{
}
/**
* @return True if the callback is known to never block the caller
*/
default boolean isNonBlocking()
{
return false;
}
/**
* <p>Creates a non-blocking callback from the given incomplete CompletableFuture.</p>
* <p>When the callback completes, either succeeding or failing, the
@ -73,7 +67,7 @@ public interface Callback
*/
static Callback from(CompletableFuture<?> completable)
{
return from(completable, false);
return from(completable, InvocationType.NON_BLOCKING);
}
/**
@ -81,10 +75,10 @@ public interface Callback
* with the given {@code blocking} characteristic.</p>
*
* @param completable the CompletableFuture to convert into a callback
* @param blocking whether the callback is blocking
* @param invocation whether the callback is blocking
* @return a callback that when completed, completes the given CompletableFuture
*/
static Callback from(CompletableFuture<?> completable, boolean blocking)
static Callback from(CompletableFuture<?> completable, InvocationType invocation)
{
if (completable instanceof Callback)
return (Callback)completable;
@ -104,25 +98,13 @@ public interface Callback
}
@Override
public boolean isNonBlocking()
public InvocationType getInvocationType()
{
return !blocking;
return invocation;
}
};
}
/**
* Callback interface that declares itself as non-blocking
*/
interface NonBlocking extends Callback
{
@Override
default boolean isNonBlocking()
{
return true;
}
}
class Nested implements Callback
{
private final Callback callback;
@ -150,9 +132,9 @@ public interface Callback
}
@Override
public boolean isNonBlocking()
public InvocationType getInvocationType()
{
return callback.isNonBlocking();
return callback.getInvocationType();
}
}
/**
@ -160,16 +142,16 @@ public interface Callback
*/
class Completable extends CompletableFuture<Void> implements Callback
{
private final boolean blocking;
private final InvocationType invocation;
public Completable()
{
this(false);
this(Invocable.InvocationType.NON_BLOCKING);
}
public Completable(boolean blocking)
public Completable(InvocationType invocation)
{
this.blocking = blocking;
this.invocation = invocation;
}
@Override
@ -185,9 +167,9 @@ public interface Callback
}
@Override
public boolean isNonBlocking()
public InvocationType getInvocationType()
{
return !blocking;
return invocation;
}
}
}

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.util;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
/* ------------------------------------------------------------ */
/** Iterating Nested Callback.
@ -48,9 +49,9 @@ public abstract class IteratingNestedCallback extends IteratingCallback
}
@Override
public boolean isNonBlocking()
public InvocationType getInvocationType()
{
return _callback.isNonBlocking();
return _callback.getInvocationType();
}
@Override

View File

@ -29,6 +29,7 @@ import java.util.concurrent.locks.ReentrantLock;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
/**
* Provides a reusable {@link Callback} that can block the thread
@ -128,13 +129,19 @@ public class SharedBlockingCallback
* callback do not blocak, rather they wakeup the thread that is blocked
* in {@link #block()}
*/
public class Blocker implements Callback.NonBlocking, Closeable
public class Blocker implements Callback, Closeable
{
private Throwable _state = IDLE;
protected Blocker()
{
}
@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
@Override
public void succeeded()

View File

@ -35,32 +35,25 @@ import org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume;
* execute tasks until the producer continues to produce them.</p>
*/
public interface ExecutionStrategy
{
{
/**
* <p>Initiates (or resumes) the task production and execution.</p>
* <p>Initiates (or resumes) the task production and consumption.</p>
* <p>This method guarantees that the task is never run by the
* thread that called this method.</p>
*
* @see #execute()
* TODO review the need for this (only used by HTTP2 push)
* @see #produce()
*/
public void dispatch();
/**
* <p>Initiates (or resumes) the task production and execution.</p>
* <p>Initiates (or resumes) the task production and consumption.</p>
* <p>The produced task may be run by the same thread that called
* this method.</p>
*
* @see #dispatch()
*/
public void execute();
/**
* A task that can handle {@link RejectedExecutionException}
*/
public interface Rejectable
{
public void reject();
}
public void produce();
/**
* <p>A producer of {@link Runnable} tasks to run.</p>

View File

@ -0,0 +1,160 @@
//
// ========================================================================
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util.thread;
import java.util.concurrent.Callable;
/**
* An object (typically either a {@link Runnable} or {@link Callable}
* that can declare how it will behaive when invoked: blocking, non-blocking
* or either.
*
*/
public interface Invocable
{
enum InvocationType { BLOCKING, NON_BLOCKING, EITHER };
static ThreadLocal<Boolean> __nonBlocking = new ThreadLocal<Boolean>()
{
@Override
protected Boolean initialValue()
{
return Boolean.FALSE;
}
};
public static boolean isNonBlockingInvocation()
{
return __nonBlocking.get();
}
public static void invokeNonBlocking(Runnable task)
{
// a Choice exists, so we must indicate NonBlocking
Boolean was_non_blocking = __nonBlocking.get();
try
{
__nonBlocking.set(Boolean.TRUE);
task.run();
}
finally
{
__nonBlocking.set(was_non_blocking);
}
}
public static void invokeOnlyNonBlocking(Runnable task)
{
switch(getInvocationType(task))
{
case BLOCKING:
throw new IllegalArgumentException("Cannot invoke nonblocking: "+task);
case NON_BLOCKING:
task.run();
break;
case EITHER:
// a Choice exists, so we must indicate NonBlocking
invokeNonBlocking(task);
break;
}
}
public static void invokePreferNonBlocking(Runnable task)
{
switch(getInvocationType(task))
{
case BLOCKING:
case NON_BLOCKING:
task.run();
break;
case EITHER:
// a Choice exists, so we must indicate NonBlocking
invokeNonBlocking(task);
break;
}
}
public static void invokePreferred(Runnable task, InvocationType preferredInvocationType)
{
switch(getInvocationType(task))
{
case BLOCKING:
case NON_BLOCKING:
task.run();
break;
case EITHER:
if (getInvocationType(task)==InvocationType.EITHER && preferredInvocationType==InvocationType.NON_BLOCKING)
invokeNonBlocking(task);
else
task.run();
break;
}
}
public static Runnable asPreferred(Runnable task, InvocationType preferredInvocationType)
{
switch(getInvocationType(task))
{
case BLOCKING:
case NON_BLOCKING:
break;
case EITHER:
if (getInvocationType(task)==InvocationType.EITHER && preferredInvocationType==InvocationType.NON_BLOCKING)
return new Runnable()
{
@Override
public void run()
{
invokeNonBlocking(task);
}
};
break;
}
return task;
}
public static void invokePreferBlocking(Runnable task)
{
task.run();
}
public static InvocationType getInvocationType(Object o)
{
if (o instanceof Invocable)
return ((Invocable)o).getInvocationType();
return InvocationType.BLOCKING;
}
default InvocationType getInvocationType()
{
return InvocationType.BLOCKING;
}
}

View File

@ -39,6 +39,7 @@ import org.eclipse.jetty.util.annotation.Name;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.component.DumpableCollection;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -549,7 +550,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
jobs = new ArrayList<>(getQueue());
ContainerLifeCycle.dumpObject(out, this);
ContainerLifeCycle.dump(out, indent, threads, jobs);
ContainerLifeCycle.dump(out, indent, threads, Collections.singletonList(new DumpableCollection("jobs",jobs)));
}
@Override

View File

@ -18,15 +18,15 @@
package org.eclipse.jetty.util.thread.strategy;
import java.io.Closeable;
import java.util.concurrent.Executor;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Locker.Lock;
import org.eclipse.jetty.util.thread.ThreadPool;
/**
* <p>A strategy where the thread that produces will always run the resulting task.</p>
@ -46,30 +46,27 @@ public class ExecuteProduceConsume extends ExecutingExecutionStrategy implements
private static final Logger LOG = Log.getLogger(ExecuteProduceConsume.class);
private final Locker _locker = new Locker();
private final Runnable _runExecute = new RunExecute();
private final Runnable _runProduce = new RunProduce();
private final Producer _producer;
private final ThreadPool _threadPool;
private boolean _idle = true;
private boolean _execute;
private boolean _producing;
private boolean _pending;
private boolean _lowThreads;
public ExecuteProduceConsume(Producer producer, Executor executor)
{
super(executor);
this._producer = producer;
_threadPool = executor instanceof ThreadPool ? (ThreadPool)executor : null;
this(producer,executor,InvocationType.BLOCKING);
}
@Deprecated
public ExecuteProduceConsume(Producer producer, Executor executor, ExecutionStrategy lowResourceStrategy)
public ExecuteProduceConsume(Producer producer, Executor executor, InvocationType preferred )
{
this(producer, executor);
super(executor,preferred);
this._producer = producer;
}
@Override
public void execute()
public void produce()
{
if (LOG.isDebugEnabled())
LOG.debug("{} execute", this);
@ -114,7 +111,7 @@ public class ExecuteProduceConsume extends ExecutingExecutionStrategy implements
_execute = true;
}
if (dispatch)
execute(_runExecute);
execute(_runProduce);
}
@Override
@ -137,105 +134,6 @@ public class ExecuteProduceConsume extends ExecutingExecutionStrategy implements
}
private void produceConsume()
{
if (_threadPool != null && _threadPool.isLowOnThreads())
{
// If we are low on threads we must not produce and consume
// in the same thread, but produce and execute to consume.
if (!produceExecuteConsume())
return;
}
executeProduceConsume();
}
public boolean isLowOnThreads()
{
return _lowThreads;
}
/**
* @return true if we are still producing
*/
private boolean produceExecuteConsume()
{
if (LOG.isDebugEnabled())
LOG.debug("{} enter low threads mode", this);
_lowThreads = true;
try
{
boolean idle = false;
while (_threadPool.isLowOnThreads())
{
Runnable task = _producer.produce();
if (LOG.isDebugEnabled())
LOG.debug("{} produced {}", _producer, task);
if (task == null)
{
// No task, so we are now idle
try (Lock locked = _locker.lock())
{
if (_execute)
{
_execute = false;
_producing = true;
_idle = false;
continue;
}
_producing = false;
idle = _idle = true;
break;
}
}
// Execute the task.
executeProduct(task);
}
return !idle;
}
finally
{
_lowThreads = false;
if (LOG.isDebugEnabled())
LOG.debug("{} exit low threads mode", this);
}
}
/**
* <p>Only called when in {@link #isLowOnThreads() low threads mode}
* to execute the task produced by the producer.</p>
* <p>Because </p>
* <p>If the task implements {@link Rejectable}, then {@link Rejectable#reject()}
* is immediately called on the task object. If the task also implements
* {@link Closeable}, then {@link Closeable#close()} is called on the task object.</p>
* <p>If the task does not implement {@link Rejectable}, then it is
* {@link #execute(Runnable) executed}.</p>
*
* @param task the produced task to execute
*/
protected void executeProduct(Runnable task)
{
if (task instanceof Rejectable)
{
try
{
((Rejectable)task).reject();
if (task instanceof Closeable)
((Closeable)task).close();
}
catch (Throwable x)
{
LOG.debug(x);
}
}
else
{
execute(task);
}
}
private void executeProduceConsume()
{
if (LOG.isDebugEnabled())
LOG.debug("{} produce enter", this);
@ -280,7 +178,7 @@ public class ExecuteProduceConsume extends ExecutingExecutionStrategy implements
if (!_pending)
{
// dispatch one
dispatch = _pending = true;
dispatch = _pending = Invocable.getInvocationType(task)!=InvocationType.NON_BLOCKING;
}
_execute = false;
@ -300,7 +198,7 @@ public class ExecuteProduceConsume extends ExecutingExecutionStrategy implements
if (LOG.isDebugEnabled())
LOG.debug("{} run {}", this, task);
if (task != null)
task.run();
invoke(task);
if (LOG.isDebugEnabled())
LOG.debug("{} ran {}", this, task);
@ -341,12 +239,12 @@ public class ExecuteProduceConsume extends ExecutingExecutionStrategy implements
return builder.toString();
}
private class RunExecute implements Runnable
private class RunProduce implements Runnable
{
@Override
public void run()
{
execute();
produce();
}
}

View File

@ -25,6 +25,7 @@ import java.util.concurrent.RejectedExecutionException;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Invocable;
/**
* <p>Base class for strategies that need to execute a task by submitting it to an {@link Executor}.</p>
@ -37,17 +38,33 @@ public abstract class ExecutingExecutionStrategy implements ExecutionStrategy
private static final Logger LOG = Log.getLogger(ExecutingExecutionStrategy.class);
private final Executor _executor;
private final Invocable.InvocationType _preferredInvocationType;
protected ExecutingExecutionStrategy(Executor executor)
protected ExecutingExecutionStrategy(Executor executor,Invocable.InvocationType preferred)
{
_executor=executor;
_preferredInvocationType=preferred;
}
public Invocable.InvocationType getPreferredInvocationType()
{
return _preferredInvocationType;
}
public void invoke(Runnable task)
{
if (LOG.isDebugEnabled())
LOG.debug("{} invoke {}", this, task);
Invocable.invokePreferred(task,_preferredInvocationType);
if (LOG.isDebugEnabled())
LOG.debug("{} invoked {}", this, task);
}
protected boolean execute(Runnable task)
{
try
{
_executor.execute(task);
_executor.execute(Invocable.asPreferred(task,_preferredInvocationType));
return true;
}
catch(RejectedExecutionException e)

View File

@ -23,6 +23,7 @@ import java.util.concurrent.Executor;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.Locker;
/**
@ -45,7 +46,7 @@ public class ProduceConsume implements ExecutionStrategy, Runnable
}
@Override
public void execute()
public void produce()
{
try (Locker.Lock lock = _locker.lock())
{
@ -89,7 +90,7 @@ public class ProduceConsume implements ExecutionStrategy, Runnable
}
// Run the task.
task.run();
Invocable.invokePreferNonBlocking(task);
}
}
@ -102,7 +103,7 @@ public class ProduceConsume implements ExecutionStrategy, Runnable
@Override
public void run()
{
execute();
produce();
}
public static class Factory implements ExecutionStrategy.Factory

View File

@ -23,6 +23,7 @@ import java.util.concurrent.Executor;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Locker.Lock;
@ -40,12 +41,17 @@ public class ProduceExecuteConsume extends ExecutingExecutionStrategy implements
public ProduceExecuteConsume(Producer producer, Executor executor)
{
super(executor);
this(producer,executor,Invocable.InvocationType.NON_BLOCKING);
}
public ProduceExecuteConsume(Producer producer, Executor executor, Invocable.InvocationType preferred)
{
super(executor,preferred);
this._producer = producer;
}
@Override
public void execute()
public void produce()
{
try (Lock locked = _locker.lock())
{
@ -96,7 +102,7 @@ public class ProduceExecuteConsume extends ExecutingExecutionStrategy implements
@Override
public void dispatch()
{
execute();
produce();
}
public static class Factory implements ExecutionStrategy.Factory

View File

@ -1,123 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
public class BlockingCallbackTest
{
public BlockingCallbackTest()
{
}
@Test
public void testDone() throws Exception
{
final BlockingCallback fcb= new BlockingCallback();
fcb.succeeded();
long start=System.currentTimeMillis();
fcb.block();
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(500L));
}
@Test
public void testGetDone() throws Exception
{
final BlockingCallback fcb= new BlockingCallback();
final CountDownLatch latch = new CountDownLatch(1);
new Thread(new Runnable()
{
@Override
public void run()
{
latch.countDown();
try{TimeUnit.MILLISECONDS.sleep(100);}catch(Exception e){e.printStackTrace();}
fcb.succeeded();
}
}).start();
latch.await();
long start=System.currentTimeMillis();
fcb.block();
Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L));
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(1000L));
}
@Test
public void testFailed() throws Exception
{
final BlockingCallback fcb= new BlockingCallback();
Exception ex=new Exception("FAILED");
fcb.failed(ex);
long start=System.currentTimeMillis();
try
{
fcb.block();
Assert.fail();
}
catch(IOException ee)
{
Assert.assertEquals(ex,ee.getCause());
}
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(100L));
}
@Test
public void testGetFailed() throws Exception
{
final BlockingCallback fcb= new BlockingCallback();
final Exception ex=new Exception("FAILED");
final CountDownLatch latch = new CountDownLatch(1);
new Thread(new Runnable()
{
@Override
public void run()
{
latch.countDown();
try{TimeUnit.MILLISECONDS.sleep(100);}catch(Exception e){e.printStackTrace();}
fcb.failed(ex);
}
}).start();
latch.await();
long start=System.currentTimeMillis();
try
{
fcb.block();
Assert.fail();
}
catch(IOException ee)
{
Assert.assertEquals(ex,ee.getCause());
}
Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L));
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(1000L));
}
}

View File

@ -85,7 +85,7 @@ public class ExecuteProduceConsumeTest
public void testIdle()
{
_produce.add(NULLTASK);
_ewyk.execute();
_ewyk.produce();
}
@Test
@ -94,7 +94,7 @@ public class ExecuteProduceConsumeTest
Task t0 = new Task();
_produce.add(t0);
_produce.add(NULLTASK);
_ewyk.execute();
_ewyk.produce();
Assert.assertThat(t0.hasRun(), Matchers.equalTo(true));
Assert.assertEquals(_ewyk,_executions.poll());
}
@ -109,7 +109,7 @@ public class ExecuteProduceConsumeTest
_produce.add(tasks[i]);
}
_produce.add(NULLTASK);
_ewyk.execute();
_ewyk.produce();
for (Task task : tasks)
Assert.assertThat(task.hasRun(), Matchers.equalTo(true));
@ -127,7 +127,7 @@ public class ExecuteProduceConsumeTest
{
_produce.add(t0);
_produce.add(NULLTASK);
_ewyk.execute();
_ewyk.produce();
}
};
thread.start();
@ -160,7 +160,7 @@ public class ExecuteProduceConsumeTest
{
_produce.add(t0);
_produce.add(NULLTASK);
_ewyk.execute();
_ewyk.produce();
}
};
thread.start();
@ -191,7 +191,7 @@ public class ExecuteProduceConsumeTest
public void run()
{
_produce.add(t0);
_ewyk.execute();
_ewyk.produce();
}
};
thread0.start();
@ -218,7 +218,7 @@ public class ExecuteProduceConsumeTest
_ewyk.run();
// ditto with execute
_ewyk.execute();
_ewyk.produce();
// Now if unblock the production by the dispatched thread
final Task t1 = new Task(true);
@ -264,7 +264,7 @@ public class ExecuteProduceConsumeTest
public void run()
{
_produce.add(t0);
_ewyk.execute();
_ewyk.produce();
}
};
thread0.start();
@ -280,7 +280,7 @@ public class ExecuteProduceConsumeTest
_produce.add(NULLTASK);
// execute will return immediately because it did not yet see the idle.
_ewyk.execute();
_ewyk.produce();
// When we unblock t0, thread1 will see the idle,
t0.unblock();

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.SharedBlockingCallback;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
import org.eclipse.jetty.websocket.api.WriteCallback;
/**
@ -38,7 +39,7 @@ public class BlockingWriteCallback extends SharedBlockingCallback
return new WriteBlocker(acquire());
}
public static class WriteBlocker implements WriteCallback, Callback.NonBlocking, AutoCloseable
public static class WriteBlocker implements WriteCallback, Callback, AutoCloseable
{
private final Blocker blocker;
@ -46,6 +47,13 @@ public class BlockingWriteCallback extends SharedBlockingCallback
{
this.blocker=blocker;
}
@Override
public InvocationType getInvocationType()
{
// The callback does not block, only the writer blocks
return InvocationType.NON_BLOCKING;
}
@Override
public void writeFailed(Throwable x)