Merge remote-tracking branch 'eclipse/jetty-9.4.x' into jetty-9.4.x-2206-OperatorPrecedence

This commit is contained in:
Lachlan Roberts 2018-02-27 14:56:33 +11:00
commit 27a0cbfacd
14 changed files with 796 additions and 257 deletions

View File

@ -603,7 +603,7 @@ public class IdleTimeoutTest extends AbstractTest
{
long idleTimeout = 2000;
// Use a small thread pool to cause request queueing.
QueuedThreadPool serverExecutor = new QueuedThreadPool(4);
QueuedThreadPool serverExecutor = new QueuedThreadPool(5);
serverExecutor.setName("server");
server = new Server(serverExecutor);
HTTP2ServerConnectionFactory h2 = new HTTP2ServerConnectionFactory(new HttpConfiguration());

View File

@ -425,7 +425,7 @@ public class StreamResetTest extends AbstractTest
public void testClientResetConsumesQueuedRequestWithData() throws Exception
{
// Use a small thread pool.
QueuedThreadPool serverExecutor = new QueuedThreadPool(4);
QueuedThreadPool serverExecutor = new QueuedThreadPool(5);
serverExecutor.setName("server");
serverExecutor.setDetailedDump(true);
server = new Server(serverExecutor);

View File

@ -42,7 +42,10 @@ import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill;
public class HTTP2Connection extends AbstractConnection implements WriteFlusher.Listener
{
protected static final Logger LOG = Log.getLogger(HTTP2Connection.class);
// TODO remove this once we are sure EWYK is OK for http2
private static final boolean PEC_MODE = Boolean.getBoolean("org.eclipse.jetty.http2.PEC_MODE");
private final Queue<Runnable> tasks = new ArrayDeque<>();
private final HTTP2Producer producer = new HTTP2Producer();
private final AtomicLong bytesIn = new AtomicLong();
@ -59,8 +62,9 @@ public class HTTP2Connection extends AbstractConnection implements WriteFlusher.
this.parser = parser;
this.session = session;
this.bufferSize = bufferSize;
// TODO HTTP2 cannot use EWYK without fix for #1803
this.strategy = new EatWhatYouKill(producer, new TryExecutor.NoTryExecutor(executor));
if (PEC_MODE)
executor = new TryExecutor.NoTryExecutor(executor);
this.strategy = new EatWhatYouKill(producer, executor);
LifeCycle.start(strategy);
}
@ -283,9 +287,7 @@ public class HTTP2Connection extends AbstractConnection implements WriteFlusher.
@Override
public InvocationType getInvocationType()
{
// TODO: see also AbstractHTTP2ServerConnectionFactory.reservedThreads.
// TODO: it's non blocking here because reservedThreads=0.
return InvocationType.NON_BLOCKING;
return InvocationType.EITHER;
}
}
}

View File

@ -142,7 +142,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
/**
* @return -1
* @deprecated
* @deprecated feature removed, no replacement
*/
@Deprecated
public int getReservedThreads()
@ -152,7 +152,8 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
/**
* @param threads ignored
* @deprecated
* @deprecated feature removed, no replacement
* @throws UnsupportedOperationException when invoked
*/
@Deprecated
public void setReservedThreads(int threads)

View File

@ -423,6 +423,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
catch (Throwable x)
{
closeNoExceptions(_selector);
_selector = null;
if (isRunning())
LOG.warn(x);
else
@ -490,17 +491,14 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
// This will update only those keys whose selection did not cause an
// updateKeys update to be submitted.
for (SelectionKey key : _keys)
updateKey(key);
{
Object attachment = key.attachment();
if (attachment instanceof Selectable)
((Selectable)attachment).updateKey();
}
_keys.clear();
}
private void updateKey(SelectionKey key)
{
Object attachment = key.attachment();
if (attachment instanceof Selectable)
((Selectable)attachment).updateKey();
}
@Override
public String toString()
{

View File

@ -48,11 +48,54 @@
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<extensions>true</extensions>
<configuration>
<instructions>
<Require-Capability>osgi.serviceloader; filter:="(osgi.serviceloader=org.eclipse.jetty.util.security.CredentialProvider)";resolution:=optional;cardinality:=multiple, osgi.extender; filter:="(osgi.extender=osgi.serviceloader.processor)"</Require-Capability>
</instructions>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<instructions>
<Require-Capability>osgi.serviceloader; filter:="(osgi.serviceloader=org.eclipse.jetty.util.security.CredentialProvider)";resolution:=optional;cardinality:=multiple, osgi.extender; filter:="(osgi.extender=osgi.serviceloader.processor)"</Require-Capability>
</instructions>
<finalName>${jmhjar.name}</finalName>
<shadeTestJar>true</shadeTestJar>
<artifactSet>
<includes>
<include>org.openjdk.jmh:jmh-core</include>
</includes>
</artifactSet>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.openjdk.jmh.Main</mainClass>
</transformer>
</transformers>
<filters>
<filter>
<artifact>org.openjdk.jmh:jmh-core</artifact>
<includes>
<include>**</include>
</includes>
</filter>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
@ -102,5 +145,17 @@
<version>${slf4j-version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>${jmh.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>${jmh.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -18,14 +18,7 @@
package org.eclipse.jetty.util.thread;
import java.io.Closeable;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
/**
* <p>A task (typically either a {@link Runnable} or {@link Callable}
@ -49,14 +42,7 @@ public interface Invocable
BLOCKING, NON_BLOCKING, EITHER
}
static ThreadLocal<Boolean> __nonBlocking = new ThreadLocal<Boolean>()
{
@Override
protected Boolean initialValue()
{
return Boolean.FALSE;
}
};
static ThreadLocal<Boolean> __nonBlocking = new ThreadLocal<>();
/**
* Test if the current thread has been tagged as non blocking
@ -65,7 +51,7 @@ public interface Invocable
*/
public static boolean isNonBlockingInvocation()
{
return __nonBlocking.get();
return Boolean.TRUE.equals(__nonBlocking.get());
}
/**
@ -75,7 +61,6 @@ public interface Invocable
*/
public static void invokeNonBlocking(Runnable task)
{
// a Choice exists, so we must indicate NonBlocking
Boolean was_non_blocking = __nonBlocking.get();
try
{
@ -88,88 +73,10 @@ public interface Invocable
}
}
/**
* Invoke a task with the calling thread.
* If the task is an {@link Invocable} of {@link InvocationType#EITHER}
* then it is invoked with {@link #invokeNonBlocking(Runnable)}, to
* indicate the type of invocation that has been assumed.
* @param task The task to invoke.
*/
public static void invokePreferNonBlocking(Runnable task)
{
switch (getInvocationType(task))
{
case BLOCKING:
case NON_BLOCKING:
task.run();
break;
case EITHER:
// a Choice exists, so we must indicate NonBlocking
invokeNonBlocking(task);
break;
}
}
/**
* Invoke a task with the calling thread.
* If the task is an {@link Invocable} of {@link InvocationType#EITHER}
* and the preferredInvocationType is not {@link InvocationType#BLOCKING}
* then it is invoked with {@link #invokeNonBlocking(Runnable)}.
* @param task The task to invoke.
* @param preferredInvocationType The invocation type to use if the task
* does not indicate a preference.
*/
public static void invokePreferred(Runnable task, InvocationType preferredInvocationType)
{
switch (getInvocationType(task))
{
case BLOCKING:
case NON_BLOCKING:
task.run();
break;
case EITHER:
if (getInvocationType(task) == InvocationType.EITHER && preferredInvocationType == InvocationType.NON_BLOCKING)
invokeNonBlocking(task);
else
task.run();
break;
}
}
/**
* wrap a task with the to indicate invocation type.
* If the task is an {@link Invocable} of {@link InvocationType#EITHER}
* and the preferredInvocationType is not {@link InvocationType#BLOCKING}
* then it is wrapped with an invocation of {@link #invokeNonBlocking(Runnable)}.
* otherwise the task itself is returned.
* @param task The task to invoke.
* @param preferredInvocationType The invocation type to use if the task
* does not indicate a preference.
* @return A Runnable that invokes the task in the declared or preferred type.
*/
public static Runnable asPreferred(Runnable task, InvocationType preferredInvocationType)
{
switch (getInvocationType(task))
{
case BLOCKING:
case NON_BLOCKING:
break;
case EITHER:
if (preferredInvocationType == InvocationType.NON_BLOCKING)
return () -> invokeNonBlocking(task);
break;
}
return task;
}
/**
* Get the invocation type of an Object.
* @param o The object to check the invocation type of.
* @return If the object is a {@link Invocable}, it is coerced and the {@link #getInvocationType()}
* @return If the object is an Invocable, it is coerced and the {@link #getInvocationType()}
* used, otherwise {@link InvocationType#BLOCKING} is returned.
*/
public static InvocationType getInvocationType(Object o)
@ -186,14 +93,4 @@ public interface Invocable
{
return InvocationType.BLOCKING;
}
public abstract class NonBlocking implements Runnable, Invocable
{
@Override
public final InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
}
}

View File

@ -33,7 +33,6 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
import org.eclipse.jetty.util.thread.TryExecutor;
/**
@ -66,15 +65,26 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
{
private static final Logger LOG = Log.getLogger(EatWhatYouKill.class);
private enum State { IDLE, PENDING, PRODUCING, REPRODUCING }
private enum State { IDLE, PRODUCING, REPRODUCING }
private final LongAdder _nonBlocking = new LongAdder();
private final LongAdder _blocking = new LongAdder();
private final LongAdder _executed = new LongAdder();
/* The modes this strategy can work in */
private enum Mode
{
PRODUCE_CONSUME,
PRODUCE_INVOKE_CONSUME, // This is PRODUCE_CONSUME an EITHER task with NON_BLOCKING invocation
PRODUCE_EXECUTE_CONSUME,
EXECUTE_PRODUCE_CONSUME // Eat What You Kill!
}
private final LongAdder _pcMode = new LongAdder();
private final LongAdder _picMode = new LongAdder();
private final LongAdder _pecMode = new LongAdder();
private final LongAdder _epcMode = new LongAdder();
private final Producer _producer;
private final Executor _executor;
private final TryExecutor _tryExecutor;
private State _state = State.IDLE;
private boolean _pending;
public EatWhatYouKill(Producer producer, Executor executor)
{
@ -96,8 +106,11 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
switch(_state)
{
case IDLE:
execute = true;
_state = State.PENDING;
if (!_pending)
{
_pending = true;
execute = true;
}
break;
case PRODUCING:
@ -116,166 +129,274 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
@Override
public void run()
{
if (LOG.isDebugEnabled())
LOG.debug("{} run", this);
produce();
{
tryProduce(true);
}
@Override
public void produce()
{
if (LOG.isDebugEnabled())
LOG.debug("{} produce", this);
if (tryProduce())
doProduce();
tryProduce(false);
}
private boolean tryProduce()
private void tryProduce(boolean wasPending)
{
boolean producing = false;
if (LOG.isDebugEnabled())
LOG.debug("{} tryProduce {}", this, wasPending);
synchronized(this)
{
if (wasPending)
_pending = false;
switch (_state)
{
case IDLE:
case PENDING:
// Enter PRODUCING
_state = State.PRODUCING;
producing = true;
break;
case PRODUCING:
// Keep other Thread producing
_state = State.REPRODUCING;
return;
default:
return;
}
}
final boolean non_blocking = Invocable.isNonBlockingInvocation();
while(isRunning())
{
try
{
if (doProduce(non_blocking))
continue;
return;
}
catch (Throwable th)
{
LOG.warn(th);
}
}
}
public boolean doProduce(boolean non_blocking)
{
Runnable task = produceTask();
if (task==null)
{
synchronized(this)
{
// Could another task just have been queued with a produce call?
switch (_state)
{
case PRODUCING:
_state = State.IDLE;
return false;
case REPRODUCING:
_state = State.PRODUCING;
return true;
default:
throw new IllegalStateException(toStringLocked());
}
}
}
Mode mode;
if (non_blocking)
{
// The calling thread cannot block, so we only have a choice between PC and PEC modes,
// based on the invocation type of the task
switch(Invocable.getInvocationType(task))
{
case NON_BLOCKING:
mode = Mode.PRODUCE_CONSUME;
break;
case EITHER:
mode = Mode.PRODUCE_INVOKE_CONSUME;
break;
default:
break;
mode = Mode.PRODUCE_EXECUTE_CONSUME;
}
}
return producing;
}
private void doProduce()
{
boolean producing = true;
while (isRunning() && producing)
else
{
// If we got here, then we are the thread that is producing.
Runnable task = null;
try
// The calling thread can block, so we can choose between PC, PEC and EPC: modes,
// based on the invocation type of the task and if a reserved thread is available
switch(Invocable.getInvocationType(task))
{
task = _producer.produce();
}
catch (Throwable e)
{
LOG.warn(e);
}
if (task==null)
{
synchronized(this)
{
// Could another task just have been queued with a produce call?
switch (_state)
{
case PRODUCING:
_state = State.IDLE;
producing = false;
break;
case REPRODUCING:
_state = State.PRODUCING;
break;
default:
throw new IllegalStateException(toStringLocked());
}
}
}
else
{
boolean consume;
if (Invocable.getInvocationType(task) == InvocationType.NON_BLOCKING)
{
// PRODUCE CONSUME
consume = true;
_nonBlocking.increment();
}
else
{
case NON_BLOCKING:
mode = Mode.PRODUCE_CONSUME;
break;
case BLOCKING:
// The task is blocking, so PC is not an option. Thus we choose
// between EPC and PEC based on the availability of a reserved thread.
synchronized(this)
{
if (_tryExecutor.tryExecute(this))
if (_pending)
{
// EXECUTE PRODUCE CONSUME!
// We have executed a new Producer, so we can EWYK consume
_state = State.PENDING;
producing = false;
consume = true;
_blocking.increment();
_state = State.IDLE;
mode = Mode.EXECUTE_PRODUCE_CONSUME;
}
else if (_tryExecutor.tryExecute(this))
{
_pending = true;
_state = State.IDLE;
mode = Mode.EXECUTE_PRODUCE_CONSUME;
}
else
{
// PRODUCE EXECUTE CONSUME!
consume = false;
_executed.increment();
}
mode = Mode.PRODUCE_EXECUTE_CONSUME;
}
break;
}
case EITHER:
{
// The task may be non blocking, so PC is an option. Thus we choose
// between EPC and PC based on the availability of a reserved thread.
synchronized(this)
{
if (_pending)
{
_state = State.IDLE;
mode = Mode.EXECUTE_PRODUCE_CONSUME;
}
else if (_tryExecutor.tryExecute(this))
{
_pending = true;
_state = State.IDLE;
mode = Mode.EXECUTE_PRODUCE_CONSUME;
}
else
{
// PC mode, but we must consume with non-blocking invocation
// as we may be the last thread and we cannot block
mode = Mode.PRODUCE_INVOKE_CONSUME;
}
}
}
break;
if (LOG.isDebugEnabled())
LOG.debug("{} p={} c={} t={}/{}", this, producing, consume, task,Invocable.getInvocationType(task));
// Consume or execute task
default:
throw new IllegalStateException();
}
}
if (LOG.isDebugEnabled())
LOG.debug("{} m={} t={}/{}", this, mode, task,Invocable.getInvocationType(task));
// Consume or execute task
switch(mode)
{
case PRODUCE_CONSUME:
_pcMode.increment();
task.run();
return true;
case PRODUCE_INVOKE_CONSUME:
_picMode.increment();
Invocable.invokeNonBlocking(task);
return true;
case PRODUCE_EXECUTE_CONSUME:
_pecMode.increment();
execute(task);
return true;
case EXECUTE_PRODUCE_CONSUME:
_epcMode.increment();
task.run();
// Try to produce again?
synchronized(this)
{
if (_state==State.IDLE)
{
// We beat the pending producer, so we will become the producer instead
_state = State.PRODUCING;
return true;
}
}
return false;
default:
throw new IllegalStateException();
}
}
private Runnable produceTask()
{
try
{
return _producer.produce();
}
catch (Throwable e)
{
LOG.warn(e);
}
return null;
}
private void execute(Runnable task)
{
try
{
_executor.execute(task);
}
catch (RejectedExecutionException e)
{
if (isRunning())
LOG.warn(e);
else
LOG.ignore(e);
if (task instanceof Closeable)
{
try
{
if (consume)
task.run();
else
_executor.execute(task);
((Closeable)task).close();
}
catch (RejectedExecutionException e)
catch (Throwable e2)
{
if (isRunning())
LOG.warn(e);
else
LOG.ignore(e);
if (task instanceof Closeable)
{
try
{
((Closeable)task).close();
}
catch (Throwable e2)
{
LOG.ignore(e2);
}
}
}
catch (Throwable e)
{
LOG.warn(e);
LOG.ignore(e2);
}
}
}
}
@ManagedAttribute(value = "number of non blocking tasks consumed", readonly = true)
public long getNonBlockingTasksConsumed()
@ManagedAttribute(value = "number of tasks consumed with PC mode", readonly = true)
public long getPCTasksConsumed()
{
return _nonBlocking.longValue();
return _pcMode.longValue();
}
@ManagedAttribute(value = "number of tasks executed with PIC mode", readonly = true)
public long getPICTasksExecuted()
{
return _picMode.longValue();
}
@ManagedAttribute(value = "number of tasks executed with PEC mode", readonly = true)
public long getPECTasksExecuted()
{
return _pecMode.longValue();
}
@ManagedAttribute(value = "number of blocking tasks consumed", readonly = true)
public long getBlockingTasksConsumed()
@ManagedAttribute(value = "number of tasks consumed with EPC mode", readonly = true)
public long getEPCTasksConsumed()
{
return _blocking.longValue();
}
@ManagedAttribute(value = "number of blocking tasks executed", readonly = true)
public long getBlockingTasksExecuted()
{
return _executed.longValue();
return _epcMode.longValue();
}
@ManagedAttribute(value = "whether this execution strategy is idle", readonly = true)
@ -290,9 +411,10 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
@ManagedOperation(value = "resets the task counts", impact = "ACTION")
public void reset()
{
_nonBlocking.reset();
_blocking.reset();
_executed.reset();
_pcMode.reset();
_epcMode.reset();
_pecMode.reset();
_picMode.reset();
}
public String toString()
@ -324,14 +446,18 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
private void getState(StringBuilder builder)
{
builder.append(_state);
builder.append("/p=");
builder.append(_pending);
builder.append('/');
builder.append(_tryExecutor);
builder.append("[nb=");
builder.append(getNonBlockingTasksConsumed());
builder.append(",c=");
builder.append(getBlockingTasksConsumed());
builder.append(",e=");
builder.append(getBlockingTasksExecuted());
builder.append("[pc=");
builder.append(getPCTasksConsumed());
builder.append(",pic=");
builder.append(getPICTasksExecuted());
builder.append(",pec=");
builder.append(getPECTasksExecuted());
builder.append(",epc=");
builder.append(getEPCTasksConsumed());
builder.append("]");
builder.append("@");
builder.append(DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));

View File

@ -90,7 +90,7 @@ public class ProduceConsume implements ExecutionStrategy, Runnable
}
// Run the task.
Invocable.invokePreferNonBlocking(task);
task.run();
}
}

View File

@ -23,6 +23,7 @@ import java.util.concurrent.Executor;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Locker.Lock;
@ -91,7 +92,10 @@ public class ProduceExecuteConsume implements ExecutionStrategy
}
// Execute the task.
_executor.execute(task);
if (Invocable.getInvocationType(task)==InvocationType.NON_BLOCKING)
task.run();
else
_executor.execute(task);
}
}

View File

@ -0,0 +1,190 @@
//
// ========================================================================
// Copyright (c) 1995-2018 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util.thread.strategy.jmh;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill;
import org.eclipse.jetty.util.thread.strategy.ProduceExecuteConsume;
import org.eclipse.jetty.util.thread.strategy.ProduceConsume;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import org.openjdk.jmh.runner.options.TimeValue;
@State(Scope.Benchmark)
public class EWYKBenchmark
{
static TestServer server;
static ReservedThreadExecutor reserved;
static File directory;
@Param({"PC","PEC","EWYK"})
public static String strategyName;
@Param({"true","false"})
public static boolean sleeping;
@Param({"true","false"})
public static boolean nonBlocking;
@Setup(Level.Trial)
public static void setupServer() throws Exception
{
// Make a test directory
directory = File.createTempFile("ewyk","dir");
if (directory.exists())
directory.delete();
directory.mkdir();
directory.deleteOnExit();
// Make some test files
for (int i=0;i<75;i++)
{
File file =new File(directory,i+".txt");
file.createNewFile();
file.deleteOnExit();
}
server=new TestServer(directory);
server.start();
reserved = new ReservedThreadExecutor(server,20);
reserved.start();
}
@TearDown(Level.Trial)
public static void stopServer() throws Exception
{
reserved.stop();
server.stop();
}
@State(Scope.Thread)
public static class ThreadState implements Runnable
{
final TestConnection connection=new TestConnection(server,sleeping);
final ExecutionStrategy strategy;
{
switch(strategyName)
{
case "PC":
strategy = new ProduceConsume(connection,server);
break;
case "PEC":
strategy = new ProduceExecuteConsume(connection,server);
break;
case "EWYK":
strategy = new EatWhatYouKill(connection,server);
break;
default:
throw new IllegalStateException();
}
LifeCycle.start(strategy);
}
@Override
public void run()
{
strategy.produce();
}
}
@Benchmark
@BenchmarkMode({Mode.Throughput})
public long testStrategy(ThreadState state) throws Exception
{
int r;
switch(server.getRandom(8))
{
case 0:
r = 4;
break;
case 1:
case 2:
r = 2;
break;
default:
r = 1;
break;
}
List<CompletableFuture<String>> results = new ArrayList<>(r);
for (int i=0;i<r;i++)
{
CompletableFuture<String> result = new CompletableFuture<String>();
results.add(result);
state.connection.submit(result);
}
if (nonBlocking)
Invocable.invokeNonBlocking(state);
else
state.run();
long hash = 0;
for (CompletableFuture<String> result : results)
hash ^= result.get().hashCode();
return hash;
}
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(EWYKBenchmark.class.getSimpleName())
.warmupIterations(2)
.measurementIterations(3)
.forks(1)
.threads(400)
// .syncIterations(true) // Don't start all threads at same time
.warmupTime(new TimeValue(10000,TimeUnit.MILLISECONDS))
.measurementTime(new TimeValue(10000,TimeUnit.MILLISECONDS))
// .addProfiler(CompilerProfiler.class)
// .addProfiler(LinuxPerfProfiler.class)
// .addProfiler(LinuxPerfNormProfiler.class)
// .addProfiler(LinuxPerfAsmProfiler.class)
// .resultFormat(ResultFormatType.CSV)
.build();
new Runner(opt).run();
}
}

View File

@ -0,0 +1,167 @@
//
// ========================================================================
// Copyright (c) 1995-2018 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util.thread.strategy.jmh;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.eclipse.jetty.util.thread.ExecutionStrategy.Producer;
import org.eclipse.jetty.util.thread.Invocable;
import org.openjdk.jmh.infra.Blackhole;
public class TestConnection implements Producer
{
private final TestServer _server;
private final String _sessionid;
private final boolean _sleeping;
private final Queue<CompletableFuture<String>> _queue = new ConcurrentLinkedQueue<>();
public TestConnection(TestServer server, boolean sleeping)
{
_server = server;
_sessionid = "SESSION-"+server.getRandom(100000000);
_sleeping = sleeping;
}
@Override
public Runnable produce()
{
CompletableFuture<String> futureResult = _queue.poll();
if (futureResult==null)
return null;
// The map will represent the request object
Map<String,String> request = new HashMap<>();
request.put("sessionid",_sessionid);
int random = _server.getRandom(1000);
int uri = random%100;
boolean blocking = (random/10)>2;
int delay = (blocking && uri%4==1)?random/2:0;
request.put("uri",uri+".txt"); // one of 100 resources on server
request.put("blocking",blocking?"True":"False"); // one of 100 resources on server
request.put("delay",Integer.toString(delay)); // random processing delay 0-100ms on 25% of requests
Blackhole.consumeCPU(_server.getRandom(500)); // random CPU
Handler handler = new Handler(request,futureResult);
return handler;
}
private class Handler implements Runnable, Invocable
{
private final Map<String,String> _request;
private final CompletableFuture<String> _futureResult;
private final boolean _blocking;
public Handler(Map<String, String> request, CompletableFuture<String> futureResult)
{
_request = request;
_futureResult = futureResult;
_blocking = Boolean.valueOf(request.get("blocking"));
}
@Override
public InvocationType getInvocationType()
{
return _blocking?InvocationType.BLOCKING:InvocationType.NON_BLOCKING;
}
@Override
public void run()
{
// Build a response
StringBuilder response = new StringBuilder(4096);
try
{
// Get the request
String uri = _request.get("uri");
// Obtain the session
Map<String,String> session = _server.getSession(_request.get("sessionid"));
// Check we are authenticated
String userid;
synchronized (session)
{
userid = session.get("userid");
Blackhole.consumeCPU(100);
if (userid==null)
{
userid="USER-"+Math.abs(session.hashCode());
session.put("userid",userid);
}
}
// simulate processing delay, blocking, etc.
int delay = Integer.parseInt(_request.get("delay"));
if (delay>0)
{
if (_sleeping)
{
try
{
Thread.sleep(delay/8);
}
catch(InterruptedException e)
{}
}
else
Blackhole.consumeCPU(delay*150);
}
// get the uri
response.append("URI: ").append(uri).append(System.lineSeparator());
// look for a file
File file = _server.getFile(uri);
if (file.exists())
{
response.append("contentType: ").append("file").append(System.lineSeparator());
response.append("lastModified: ").append(Long.toString(file.lastModified())).append(System.lineSeparator());
response.append("contentLength: ").append(Long.toString(file.length())).append(System.lineSeparator());
response.append("content: ").append("This should be content from a file, but lets pretend it was cached").append(System.lineSeparator());
}
else
{
response.append("contentType: ").append("dynamic").append(System.lineSeparator());
response.append("This is content for ").append(uri)
.append(" generated for ").append(userid)
.append(" with session ").append(_request.get("sessionid"))
.append(" for user ").append(userid)
.append(" on thread ").append(Thread.currentThread());
}
Blackhole.consumeCPU(1000);
}
finally
{
_futureResult.complete(response.toString());
}
}
}
public void submit(CompletableFuture<String> futureResult)
{
_queue.offer(futureResult);
}
}

View File

@ -0,0 +1,97 @@
//
// ========================================================================
// Copyright (c) 1995-2018 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util.thread.strategy.jmh;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadLocalRandom;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.TryExecutor;
public class TestServer implements Executor, TryExecutor
{
private final ConcurrentMap<String,Map<String,String>> _sessions= new ConcurrentHashMap<>();
private final QueuedThreadPool _threadpool = new QueuedThreadPool(200);
private final File _docroot;
TestServer(File docroot)
{
_threadpool.setReservedThreads(20);
_docroot=docroot;
}
TestServer()
{
this(new File("/tmp/"));
}
public Map<String,String> getSession(String sessionid)
{
Map<String,String> session = _sessions.get(sessionid);
if (session==null)
{
session = new HashMap<>();
session.put("id",sessionid);
Map<String,String> s =_sessions.putIfAbsent(sessionid,session);
if (s!=null)
session=s;
}
return session;
}
public int getRandom(int max)
{
return ThreadLocalRandom.current().nextInt(max);
}
@Override
public void execute(Runnable task)
{
_threadpool.execute(task);
}
@Override
public boolean tryExecute(Runnable task)
{
return _threadpool.tryExecute(task);
}
public void start() throws Exception
{
_threadpool.start();
}
public void stop() throws Exception
{
_threadpool.stop();
}
public File getFile(String path)
{
return new File(_docroot,path);
}
}

View File

@ -25,6 +25,8 @@
<alpn.version>undefined</alpn.version>
<conscrypt.version>1.0.0.RC11</conscrypt.version>
<asm.version>6.0</asm.version>
<jmh.version>1.20</jmh.version>
<jmhjar.name>benchmarks</jmhjar.name>
</properties>
<licenses>