Merged branch 'jetty-9.4.x' into 'master'.

This commit is contained in:
Simone Bordet 2016-05-12 00:22:56 +02:00
commit 1937701127
6 changed files with 65 additions and 56 deletions

View File

@ -22,7 +22,6 @@ import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Locale; import java.util.Locale;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -42,17 +41,14 @@ import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.ResetFrame; import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory;
import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise; import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -75,25 +71,6 @@ public class SmallThreadPoolLoadTest extends AbstractTest
public void testConcurrentWithSmallServerThreadPool() throws Exception public void testConcurrentWithSmallServerThreadPool() throws Exception
{ {
start(new LoadServlet()); start(new LoadServlet());
AbstractHTTP2ServerConnectionFactory factory = connector.getBean(AbstractHTTP2ServerConnectionFactory.class);
factory.setExecutionStrategyFactory(new ExecuteProduceConsume.Factory()
{
@Override
public ExecutionStrategy newExecutionStrategy(ExecutionStrategy.Producer producer, Executor executor)
{
return new ExecuteProduceConsume(producer, executor)
{
@Override
protected void executeTask(Runnable task)
{
if (task instanceof Rejectable)
((Rejectable)task).reject();
else
super.executeTask(task);
}
};
}
});
// Only one connection to the server. // Only one connection to the server.
Session session = newClient(new Session.Listener.Adapter()); Session session = newClient(new Session.Listener.Adapter());

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.io; package org.eclipse.jetty.io;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel; import java.nio.channels.ByteChannel;
@ -28,7 +29,6 @@ import java.nio.channels.SelectionKey;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy.Rejectable;
import org.eclipse.jetty.util.thread.Locker; import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
@ -61,8 +61,9 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
private abstract class RunnableTask implements Runnable private abstract class RunnableTask implements Runnable
{ {
final String _operation; private final String _operation;
RunnableTask(String op)
protected RunnableTask(String op)
{ {
_operation=op; _operation=op;
} }
@ -74,19 +75,19 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
} }
} }
private abstract class RejectableRunnable extends RunnableTask implements Rejectable private abstract class RunnableCloseable extends RunnableTask implements Closeable
{ {
RejectableRunnable(String op) protected RunnableCloseable(String op)
{ {
super(op); super(op);
} }
@Override @Override
public void reject() public void close()
{ {
try try
{ {
close(); ChannelEndPoint.this.close();
} }
catch (Throwable x) catch (Throwable x)
{ {
@ -104,7 +105,7 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
} }
}; };
private final Runnable _runFillable = new RejectableRunnable("runFillable") private final Runnable _runFillable = new RunnableCloseable("runFillable")
{ {
@Override @Override
public void run() public void run()
@ -113,7 +114,7 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
} }
}; };
private final Runnable _runCompleteWrite = new RejectableRunnable("runCompleteWrite") private final Runnable _runCompleteWrite = new RunnableCloseable("runCompleteWrite")
{ {
@Override @Override
public void run() public void run()
@ -122,7 +123,7 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
} }
}; };
private final Runnable _runCompleteWriteFillable = new RejectableRunnable("runCompleteWriteFillable") private final Runnable _runCompleteWriteFillable = new RunnableCloseable("runCompleteWriteFillable")
{ {
@Override @Override
public void run() public void run()

View File

@ -43,7 +43,6 @@ import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy; import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.ExecutionStrategy.Rejectable;
import org.eclipse.jetty.util.thread.Locker; import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
@ -548,7 +547,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
} }
} }
class Accept implements Runnable, Rejectable class Accept implements Runnable, Closeable
{ {
private final SelectableChannel channel; private final SelectableChannel channel;
private final Object attachment; private final Object attachment;
@ -560,9 +559,9 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
} }
@Override @Override
public void reject() public void close()
{ {
LOG.debug("rejected accept {}",channel); LOG.debug("closed accept of {}", channel);
closeNoExceptions(channel); closeNoExceptions(channel);
} }
@ -582,7 +581,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
} }
} }
private class CreateEndPoint implements Product, Rejectable private class CreateEndPoint implements Product, Closeable
{ {
private final SelectableChannel channel; private final SelectableChannel channel;
private final SelectionKey key; private final SelectionKey key;
@ -608,9 +607,9 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
} }
@Override @Override
public void reject() public void close()
{ {
LOG.debug("rejected create {}",channel); LOG.debug("closed creation of {}", channel);
closeNoExceptions(channel); closeNoExceptions(channel);
} }

View File

@ -26,13 +26,11 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
/** /**
* An ChannelEndpoint that can be scheduled by {@link SelectorManager}. * @deprecated use {@link SocketChannelEndPoint} instead
*/ */
@Deprecated @Deprecated
public class SelectChannelEndPoint extends SocketChannelEndPoint implements ManagedSelector.Selectable public class SelectChannelEndPoint extends SocketChannelEndPoint
{ {
public static final Logger LOG = Log.getLogger(SelectChannelEndPoint.class);
public SelectChannelEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler, long idleTimeout) public SelectChannelEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler, long idleTimeout)
{ {
super(channel,selector,key,scheduler); super(channel,selector,key,scheduler);

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.util.thread.strategy; package org.eclipse.jetty.util.thread.strategy;
import java.io.Closeable;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
@ -28,9 +29,8 @@ import org.eclipse.jetty.util.thread.Locker.Lock;
import org.eclipse.jetty.util.thread.ThreadPool; import org.eclipse.jetty.util.thread.ThreadPool;
/** /**
* <p>A strategy where the thread calls produce will always run the resulting task * <p>A strategy where the thread that produces will always run the resulting task.</p>
* itself. The strategy may dispatches another thread to continue production. * <p>The strategy may then dispatch another thread to continue production.</p>
* </p>
* <p>The strategy is also known by the nickname 'eat what you kill', which comes from * <p>The strategy is also known by the nickname 'eat what you kill', which comes from
* the hunting ethic that says a person should not kill anything he or she does not * the hunting ethic that says a person should not kill anything he or she does not
* plan on eating. In this case, the phrase is used to mean that a thread should * plan on eating. In this case, the phrase is used to mean that a thread should
@ -39,8 +39,7 @@ import org.eclipse.jetty.util.thread.ThreadPool;
* down by running the task in the same core, with good chances of having a hot CPU * down by running the task in the same core, with good chances of having a hot CPU
* cache. It also avoids the creation of a queue of produced tasks that the system * cache. It also avoids the creation of a queue of produced tasks that the system
* does not yet have capacity to consume, which can save memory and exert back * does not yet have capacity to consume, which can save memory and exert back
* pressure on producers. * pressure on producers.</p>
* </p>
*/ */
public class ExecuteProduceConsume extends ExecutingExecutionStrategy implements ExecutionStrategy, Runnable public class ExecuteProduceConsume extends ExecutingExecutionStrategy implements ExecutionStrategy, Runnable
{ {
@ -191,7 +190,7 @@ public class ExecuteProduceConsume extends ExecutingExecutionStrategy implements
} }
// Execute the task. // Execute the task.
executeTask(task); executeProduct(task);
} }
return !idle; return !idle;
} }
@ -203,10 +202,38 @@ public class ExecuteProduceConsume extends ExecutingExecutionStrategy implements
} }
} }
protected void executeTask(Runnable task) /**
* <p>Only called when in {@link #isLowOnThreads() low threads mode}
* to execute the task produced by the producer.</p>
* <p>Because </p>
* <p>If the task implements {@link Rejectable}, then {@link Rejectable#reject()}
* is immediately called on the task object. If the task also implements
* {@link Closeable}, then {@link Closeable#close()} is called on the task object.</p>
* <p>If the task does not implement {@link Rejectable}, then it is
* {@link #execute(Runnable) executed}.</p>
*
* @param task the produced task to execute
*/
protected void executeProduct(Runnable task)
{
if (task instanceof Rejectable)
{
try
{
((Rejectable)task).reject();
if (task instanceof Closeable)
((Closeable)task).close();
}
catch (Throwable x)
{
LOG.debug(x);
}
}
else
{ {
execute(task); execute(task);
} }
}
private void executeProduceConsume() private void executeProduceConsume()
{ {

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.util.thread.strategy; package org.eclipse.jetty.util.thread.strategy;
import java.io.Closeable;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
@ -25,6 +26,12 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy; import org.eclipse.jetty.util.thread.ExecutionStrategy;
/**
* <p>Base class for strategies that need to execute a task by submitting it to an {@link Executor}.</p>
* <p>If the submission to the {@code Executor} is rejected (via a {@link RejectedExecutionException}),
* the task is tested whether it implements {@link Closeable}; if it does, then {@link Closeable#close()}
* is called on the task object.</p>
*/
public abstract class ExecutingExecutionStrategy implements ExecutionStrategy public abstract class ExecutingExecutionStrategy implements ExecutionStrategy
{ {
private static final Logger LOG = Log.getLogger(ExecutingExecutionStrategy.class); private static final Logger LOG = Log.getLogger(ExecutingExecutionStrategy.class);
@ -45,13 +52,13 @@ public abstract class ExecutingExecutionStrategy implements ExecutionStrategy
} }
catch(RejectedExecutionException e) catch(RejectedExecutionException e)
{ {
// If we cannot execute, then discard/reject the task and keep producing // If we cannot execute, then close the task and keep producing.
LOG.debug(e); LOG.debug(e);
LOG.warn("RejectedExecution {}",task); LOG.warn("Rejected execution of {}",task);
try try
{ {
if (task instanceof Rejectable) if (task instanceof Closeable)
((Rejectable)task).reject(); ((Closeable)task).close();
} }
catch (Exception x) catch (Exception x)
{ {