Jetty 9.4.x 1803 proposal 0+2 (#2199)

Merging this PR for #2199 as we currently think it is not worse than previous and the code base is definitely simpler.   While JMH has shown significant benefits for this approach, we have yet been able to demonstrate them in full scale integration tests - however that is likely due to GC and thread pool issues dominating.

A `org.eclipse.jetty.http2.PEC_MODE` System property has been added to allow the EWYK scheduling to be disabled for HTTP/2 if need be.


* Implementation of #1803 proposal 2 - EITHER dispatch type for EWYK
* made code more readable
* increase small threadpools in tests for extra reserved thread
* clean up
* minor code simplifications
* Work in progress to simplify reserved thread pool
* use a single ReservedThreadPool built into the QueuedThreadPool
* fixed javadoc
* removed more old reserved thread references
* disable EWYK for h2
* fixes from review
* use EWYK for HTTP2
* Fixed javadocs, imports and QueuedThreadPool constructors.
* fix javadoc
* EWYK avoid unnecessary pendings
* after review
* fixed merge with jetty-threadpool.xml
* alternate EWYK implementations
* added jetty copyright headers
* Simplified EWYK code after review
* fixed bad merge
* Code cleanups.
* Improved Javadocs for deprecated property "reservedThreads".
* Improved Javadocs for deprecated property "reservedThreads".
* added a system property to enable only PEC for HTTP/2

Signed-off-by: Greg Wilkins <gregw@webtide.com>
Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Greg Wilkins 2018-02-27 14:29:59 +11:00 committed by GitHub
parent eb42a9e71f
commit 0cb4f5629d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 796 additions and 257 deletions

View File

@ -603,7 +603,7 @@ public class IdleTimeoutTest extends AbstractTest
{
long idleTimeout = 2000;
// Use a small thread pool to cause request queueing.
QueuedThreadPool serverExecutor = new QueuedThreadPool(4);
QueuedThreadPool serverExecutor = new QueuedThreadPool(5);
serverExecutor.setName("server");
server = new Server(serverExecutor);
HTTP2ServerConnectionFactory h2 = new HTTP2ServerConnectionFactory(new HttpConfiguration());

View File

@ -425,7 +425,7 @@ public class StreamResetTest extends AbstractTest
public void testClientResetConsumesQueuedRequestWithData() throws Exception
{
// Use a small thread pool.
QueuedThreadPool serverExecutor = new QueuedThreadPool(4);
QueuedThreadPool serverExecutor = new QueuedThreadPool(5);
serverExecutor.setName("server");
serverExecutor.setDetailedDump(true);
server = new Server(serverExecutor);

View File

@ -42,7 +42,10 @@ import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill;
public class HTTP2Connection extends AbstractConnection implements WriteFlusher.Listener
{
protected static final Logger LOG = Log.getLogger(HTTP2Connection.class);
// TODO remove this once we are sure EWYK is OK for http2
private static final boolean PEC_MODE = Boolean.getBoolean("org.eclipse.jetty.http2.PEC_MODE");
private final Queue<Runnable> tasks = new ArrayDeque<>();
private final HTTP2Producer producer = new HTTP2Producer();
private final AtomicLong bytesIn = new AtomicLong();
@ -59,8 +62,9 @@ public class HTTP2Connection extends AbstractConnection implements WriteFlusher.
this.parser = parser;
this.session = session;
this.bufferSize = bufferSize;
// TODO HTTP2 cannot use EWYK without fix for #1803
this.strategy = new EatWhatYouKill(producer, new TryExecutor.NoTryExecutor(executor));
if (PEC_MODE)
executor = new TryExecutor.NoTryExecutor(executor);
this.strategy = new EatWhatYouKill(producer, executor);
LifeCycle.start(strategy);
}
@ -283,9 +287,7 @@ public class HTTP2Connection extends AbstractConnection implements WriteFlusher.
@Override
public InvocationType getInvocationType()
{
// TODO: see also AbstractHTTP2ServerConnectionFactory.reservedThreads.
// TODO: it's non blocking here because reservedThreads=0.
return InvocationType.NON_BLOCKING;
return InvocationType.EITHER;
}
}
}

View File

@ -142,7 +142,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
/**
* @return -1
* @deprecated
* @deprecated feature removed, no replacement
*/
@Deprecated
public int getReservedThreads()
@ -152,7 +152,8 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
/**
* @param threads ignored
* @deprecated
* @deprecated feature removed, no replacement
* @throws UnsupportedOperationException when invoked
*/
@Deprecated
public void setReservedThreads(int threads)

View File

@ -423,6 +423,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
catch (Throwable x)
{
closeNoExceptions(_selector);
_selector = null;
if (isRunning())
LOG.warn(x);
else
@ -490,17 +491,14 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
// This will update only those keys whose selection did not cause an
// updateKeys update to be submitted.
for (SelectionKey key : _keys)
updateKey(key);
{
Object attachment = key.attachment();
if (attachment instanceof Selectable)
((Selectable)attachment).updateKey();
}
_keys.clear();
}
private void updateKey(SelectionKey key)
{
Object attachment = key.attachment();
if (attachment instanceof Selectable)
((Selectable)attachment).updateKey();
}
@Override
public String toString()
{

View File

@ -48,11 +48,54 @@
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<extensions>true</extensions>
<configuration>
<instructions>
<Require-Capability>osgi.serviceloader; filter:="(osgi.serviceloader=org.eclipse.jetty.util.security.CredentialProvider)";resolution:=optional;cardinality:=multiple, osgi.extender; filter:="(osgi.extender=osgi.serviceloader.processor)"</Require-Capability>
</instructions>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<instructions>
<Require-Capability>osgi.serviceloader; filter:="(osgi.serviceloader=org.eclipse.jetty.util.security.CredentialProvider)";resolution:=optional;cardinality:=multiple, osgi.extender; filter:="(osgi.extender=osgi.serviceloader.processor)"</Require-Capability>
</instructions>
<finalName>${jmhjar.name}</finalName>
<shadeTestJar>true</shadeTestJar>
<artifactSet>
<includes>
<include>org.openjdk.jmh:jmh-core</include>
</includes>
</artifactSet>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.openjdk.jmh.Main</mainClass>
</transformer>
</transformers>
<filters>
<filter>
<artifact>org.openjdk.jmh:jmh-core</artifact>
<includes>
<include>**</include>
</includes>
</filter>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
@ -102,5 +145,17 @@
<version>${slf4j-version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>${jmh.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>${jmh.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -18,14 +18,7 @@
package org.eclipse.jetty.util.thread;
import java.io.Closeable;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
/**
* <p>A task (typically either a {@link Runnable} or {@link Callable}
@ -49,14 +42,7 @@ public interface Invocable
BLOCKING, NON_BLOCKING, EITHER
}
static ThreadLocal<Boolean> __nonBlocking = new ThreadLocal<Boolean>()
{
@Override
protected Boolean initialValue()
{
return Boolean.FALSE;
}
};
static ThreadLocal<Boolean> __nonBlocking = new ThreadLocal<>();
/**
* Test if the current thread has been tagged as non blocking
@ -65,7 +51,7 @@ public interface Invocable
*/
public static boolean isNonBlockingInvocation()
{
return __nonBlocking.get();
return Boolean.TRUE.equals(__nonBlocking.get());
}
/**
@ -75,7 +61,6 @@ public interface Invocable
*/
public static void invokeNonBlocking(Runnable task)
{
// a Choice exists, so we must indicate NonBlocking
Boolean was_non_blocking = __nonBlocking.get();
try
{
@ -88,88 +73,10 @@ public interface Invocable
}
}
/**
* Invoke a task with the calling thread.
* If the task is an {@link Invocable} of {@link InvocationType#EITHER}
* then it is invoked with {@link #invokeNonBlocking(Runnable)}, to
* indicate the type of invocation that has been assumed.
* @param task The task to invoke.
*/
public static void invokePreferNonBlocking(Runnable task)
{
switch (getInvocationType(task))
{
case BLOCKING:
case NON_BLOCKING:
task.run();
break;
case EITHER:
// a Choice exists, so we must indicate NonBlocking
invokeNonBlocking(task);
break;
}
}
/**
* Invoke a task with the calling thread.
* If the task is an {@link Invocable} of {@link InvocationType#EITHER}
* and the preferredInvocationType is not {@link InvocationType#BLOCKING}
* then it is invoked with {@link #invokeNonBlocking(Runnable)}.
* @param task The task to invoke.
* @param preferredInvocationType The invocation type to use if the task
* does not indicate a preference.
*/
public static void invokePreferred(Runnable task, InvocationType preferredInvocationType)
{
switch (getInvocationType(task))
{
case BLOCKING:
case NON_BLOCKING:
task.run();
break;
case EITHER:
if (getInvocationType(task) == InvocationType.EITHER && preferredInvocationType == InvocationType.NON_BLOCKING)
invokeNonBlocking(task);
else
task.run();
break;
}
}
/**
* wrap a task with the to indicate invocation type.
* If the task is an {@link Invocable} of {@link InvocationType#EITHER}
* and the preferredInvocationType is not {@link InvocationType#BLOCKING}
* then it is wrapped with an invocation of {@link #invokeNonBlocking(Runnable)}.
* otherwise the task itself is returned.
* @param task The task to invoke.
* @param preferredInvocationType The invocation type to use if the task
* does not indicate a preference.
* @return A Runnable that invokes the task in the declared or preferred type.
*/
public static Runnable asPreferred(Runnable task, InvocationType preferredInvocationType)
{
switch (getInvocationType(task))
{
case BLOCKING:
case NON_BLOCKING:
break;
case EITHER:
if (preferredInvocationType == InvocationType.NON_BLOCKING)
return () -> invokeNonBlocking(task);
break;
}
return task;
}
/**
* Get the invocation type of an Object.
* @param o The object to check the invocation type of.
* @return If the object is a {@link Invocable}, it is coerced and the {@link #getInvocationType()}
* @return If the object is an Invocable, it is coerced and the {@link #getInvocationType()}
* used, otherwise {@link InvocationType#BLOCKING} is returned.
*/
public static InvocationType getInvocationType(Object o)
@ -186,14 +93,4 @@ public interface Invocable
{
return InvocationType.BLOCKING;
}
public abstract class NonBlocking implements Runnable, Invocable
{
@Override
public final InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
}
}

View File

@ -33,7 +33,6 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
import org.eclipse.jetty.util.thread.TryExecutor;
/**
@ -66,15 +65,26 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
{
private static final Logger LOG = Log.getLogger(EatWhatYouKill.class);
private enum State { IDLE, PENDING, PRODUCING, REPRODUCING }
private enum State { IDLE, PRODUCING, REPRODUCING }
private final LongAdder _nonBlocking = new LongAdder();
private final LongAdder _blocking = new LongAdder();
private final LongAdder _executed = new LongAdder();
/* The modes this strategy can work in */
private enum Mode
{
PRODUCE_CONSUME,
PRODUCE_INVOKE_CONSUME, // This is PRODUCE_CONSUME an EITHER task with NON_BLOCKING invocation
PRODUCE_EXECUTE_CONSUME,
EXECUTE_PRODUCE_CONSUME // Eat What You Kill!
}
private final LongAdder _pcMode = new LongAdder();
private final LongAdder _picMode = new LongAdder();
private final LongAdder _pecMode = new LongAdder();
private final LongAdder _epcMode = new LongAdder();
private final Producer _producer;
private final Executor _executor;
private final TryExecutor _tryExecutor;
private State _state = State.IDLE;
private boolean _pending;
public EatWhatYouKill(Producer producer, Executor executor)
{
@ -96,8 +106,11 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
switch(_state)
{
case IDLE:
execute = true;
_state = State.PENDING;
if (!_pending)
{
_pending = true;
execute = true;
}
break;
case PRODUCING:
@ -116,166 +129,274 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
@Override
public void run()
{
if (LOG.isDebugEnabled())
LOG.debug("{} run", this);
produce();
{
tryProduce(true);
}
@Override
public void produce()
{
if (LOG.isDebugEnabled())
LOG.debug("{} produce", this);
if (tryProduce())
doProduce();
tryProduce(false);
}
private boolean tryProduce()
private void tryProduce(boolean wasPending)
{
boolean producing = false;
if (LOG.isDebugEnabled())
LOG.debug("{} tryProduce {}", this, wasPending);
synchronized(this)
{
if (wasPending)
_pending = false;
switch (_state)
{
case IDLE:
case PENDING:
// Enter PRODUCING
_state = State.PRODUCING;
producing = true;
break;
case PRODUCING:
// Keep other Thread producing
_state = State.REPRODUCING;
return;
default:
return;
}
}
final boolean non_blocking = Invocable.isNonBlockingInvocation();
while(isRunning())
{
try
{
if (doProduce(non_blocking))
continue;
return;
}
catch (Throwable th)
{
LOG.warn(th);
}
}
}
public boolean doProduce(boolean non_blocking)
{
Runnable task = produceTask();
if (task==null)
{
synchronized(this)
{
// Could another task just have been queued with a produce call?
switch (_state)
{
case PRODUCING:
_state = State.IDLE;
return false;
case REPRODUCING:
_state = State.PRODUCING;
return true;
default:
throw new IllegalStateException(toStringLocked());
}
}
}
Mode mode;
if (non_blocking)
{
// The calling thread cannot block, so we only have a choice between PC and PEC modes,
// based on the invocation type of the task
switch(Invocable.getInvocationType(task))
{
case NON_BLOCKING:
mode = Mode.PRODUCE_CONSUME;
break;
case EITHER:
mode = Mode.PRODUCE_INVOKE_CONSUME;
break;
default:
break;
mode = Mode.PRODUCE_EXECUTE_CONSUME;
}
}
return producing;
}
private void doProduce()
{
boolean producing = true;
while (isRunning() && producing)
else
{
// If we got here, then we are the thread that is producing.
Runnable task = null;
try
// The calling thread can block, so we can choose between PC, PEC and EPC: modes,
// based on the invocation type of the task and if a reserved thread is available
switch(Invocable.getInvocationType(task))
{
task = _producer.produce();
}
catch (Throwable e)
{
LOG.warn(e);
}
if (task==null)
{
synchronized(this)
{
// Could another task just have been queued with a produce call?
switch (_state)
{
case PRODUCING:
_state = State.IDLE;
producing = false;
break;
case REPRODUCING:
_state = State.PRODUCING;
break;
default:
throw new IllegalStateException(toStringLocked());
}
}
}
else
{
boolean consume;
if (Invocable.getInvocationType(task) == InvocationType.NON_BLOCKING)
{
// PRODUCE CONSUME
consume = true;
_nonBlocking.increment();
}
else
{
case NON_BLOCKING:
mode = Mode.PRODUCE_CONSUME;
break;
case BLOCKING:
// The task is blocking, so PC is not an option. Thus we choose
// between EPC and PEC based on the availability of a reserved thread.
synchronized(this)
{
if (_tryExecutor.tryExecute(this))
if (_pending)
{
// EXECUTE PRODUCE CONSUME!
// We have executed a new Producer, so we can EWYK consume
_state = State.PENDING;
producing = false;
consume = true;
_blocking.increment();
_state = State.IDLE;
mode = Mode.EXECUTE_PRODUCE_CONSUME;
}
else if (_tryExecutor.tryExecute(this))
{
_pending = true;
_state = State.IDLE;
mode = Mode.EXECUTE_PRODUCE_CONSUME;
}
else
{
// PRODUCE EXECUTE CONSUME!
consume = false;
_executed.increment();
}
mode = Mode.PRODUCE_EXECUTE_CONSUME;
}
break;
}
case EITHER:
{
// The task may be non blocking, so PC is an option. Thus we choose
// between EPC and PC based on the availability of a reserved thread.
synchronized(this)
{
if (_pending)
{
_state = State.IDLE;
mode = Mode.EXECUTE_PRODUCE_CONSUME;
}
else if (_tryExecutor.tryExecute(this))
{
_pending = true;
_state = State.IDLE;
mode = Mode.EXECUTE_PRODUCE_CONSUME;
}
else
{
// PC mode, but we must consume with non-blocking invocation
// as we may be the last thread and we cannot block
mode = Mode.PRODUCE_INVOKE_CONSUME;
}
}
}
break;
if (LOG.isDebugEnabled())
LOG.debug("{} p={} c={} t={}/{}", this, producing, consume, task,Invocable.getInvocationType(task));
// Consume or execute task
default:
throw new IllegalStateException();
}
}
if (LOG.isDebugEnabled())
LOG.debug("{} m={} t={}/{}", this, mode, task,Invocable.getInvocationType(task));
// Consume or execute task
switch(mode)
{
case PRODUCE_CONSUME:
_pcMode.increment();
task.run();
return true;
case PRODUCE_INVOKE_CONSUME:
_picMode.increment();
Invocable.invokeNonBlocking(task);
return true;
case PRODUCE_EXECUTE_CONSUME:
_pecMode.increment();
execute(task);
return true;
case EXECUTE_PRODUCE_CONSUME:
_epcMode.increment();
task.run();
// Try to produce again?
synchronized(this)
{
if (_state==State.IDLE)
{
// We beat the pending producer, so we will become the producer instead
_state = State.PRODUCING;
return true;
}
}
return false;
default:
throw new IllegalStateException();
}
}
private Runnable produceTask()
{
try
{
return _producer.produce();
}
catch (Throwable e)
{
LOG.warn(e);
}
return null;
}
private void execute(Runnable task)
{
try
{
_executor.execute(task);
}
catch (RejectedExecutionException e)
{
if (isRunning())
LOG.warn(e);
else
LOG.ignore(e);
if (task instanceof Closeable)
{
try
{
if (consume)
task.run();
else
_executor.execute(task);
((Closeable)task).close();
}
catch (RejectedExecutionException e)
catch (Throwable e2)
{
if (isRunning())
LOG.warn(e);
else
LOG.ignore(e);
if (task instanceof Closeable)
{
try
{
((Closeable)task).close();
}
catch (Throwable e2)
{
LOG.ignore(e2);
}
}
}
catch (Throwable e)
{
LOG.warn(e);
LOG.ignore(e2);
}
}
}
}
@ManagedAttribute(value = "number of non blocking tasks consumed", readonly = true)
public long getNonBlockingTasksConsumed()
@ManagedAttribute(value = "number of tasks consumed with PC mode", readonly = true)
public long getPCTasksConsumed()
{
return _nonBlocking.longValue();
return _pcMode.longValue();
}
@ManagedAttribute(value = "number of tasks executed with PIC mode", readonly = true)
public long getPICTasksExecuted()
{
return _picMode.longValue();
}
@ManagedAttribute(value = "number of tasks executed with PEC mode", readonly = true)
public long getPECTasksExecuted()
{
return _pecMode.longValue();
}
@ManagedAttribute(value = "number of blocking tasks consumed", readonly = true)
public long getBlockingTasksConsumed()
@ManagedAttribute(value = "number of tasks consumed with EPC mode", readonly = true)
public long getEPCTasksConsumed()
{
return _blocking.longValue();
}
@ManagedAttribute(value = "number of blocking tasks executed", readonly = true)
public long getBlockingTasksExecuted()
{
return _executed.longValue();
return _epcMode.longValue();
}
@ManagedAttribute(value = "whether this execution strategy is idle", readonly = true)
@ -290,9 +411,10 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
@ManagedOperation(value = "resets the task counts", impact = "ACTION")
public void reset()
{
_nonBlocking.reset();
_blocking.reset();
_executed.reset();
_pcMode.reset();
_epcMode.reset();
_pecMode.reset();
_picMode.reset();
}
public String toString()
@ -324,14 +446,18 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
private void getState(StringBuilder builder)
{
builder.append(_state);
builder.append("/p=");
builder.append(_pending);
builder.append('/');
builder.append(_tryExecutor);
builder.append("[nb=");
builder.append(getNonBlockingTasksConsumed());
builder.append(",c=");
builder.append(getBlockingTasksConsumed());
builder.append(",e=");
builder.append(getBlockingTasksExecuted());
builder.append("[pc=");
builder.append(getPCTasksConsumed());
builder.append(",pic=");
builder.append(getPICTasksExecuted());
builder.append(",pec=");
builder.append(getPECTasksExecuted());
builder.append(",epc=");
builder.append(getEPCTasksConsumed());
builder.append("]");
builder.append("@");
builder.append(DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));

View File

@ -90,7 +90,7 @@ public class ProduceConsume implements ExecutionStrategy, Runnable
}
// Run the task.
Invocable.invokePreferNonBlocking(task);
task.run();
}
}

View File

@ -23,6 +23,7 @@ import java.util.concurrent.Executor;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Locker.Lock;
@ -91,7 +92,10 @@ public class ProduceExecuteConsume implements ExecutionStrategy
}
// Execute the task.
_executor.execute(task);
if (Invocable.getInvocationType(task)==InvocationType.NON_BLOCKING)
task.run();
else
_executor.execute(task);
}
}

View File

@ -0,0 +1,190 @@
//
// ========================================================================
// Copyright (c) 1995-2018 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util.thread.strategy.jmh;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill;
import org.eclipse.jetty.util.thread.strategy.ProduceExecuteConsume;
import org.eclipse.jetty.util.thread.strategy.ProduceConsume;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import org.openjdk.jmh.runner.options.TimeValue;
@State(Scope.Benchmark)
public class EWYKBenchmark
{
static TestServer server;
static ReservedThreadExecutor reserved;
static File directory;
@Param({"PC","PEC","EWYK"})
public static String strategyName;
@Param({"true","false"})
public static boolean sleeping;
@Param({"true","false"})
public static boolean nonBlocking;
@Setup(Level.Trial)
public static void setupServer() throws Exception
{
// Make a test directory
directory = File.createTempFile("ewyk","dir");
if (directory.exists())
directory.delete();
directory.mkdir();
directory.deleteOnExit();
// Make some test files
for (int i=0;i<75;i++)
{
File file =new File(directory,i+".txt");
file.createNewFile();
file.deleteOnExit();
}
server=new TestServer(directory);
server.start();
reserved = new ReservedThreadExecutor(server,20);
reserved.start();
}
@TearDown(Level.Trial)
public static void stopServer() throws Exception
{
reserved.stop();
server.stop();
}
@State(Scope.Thread)
public static class ThreadState implements Runnable
{
final TestConnection connection=new TestConnection(server,sleeping);
final ExecutionStrategy strategy;
{
switch(strategyName)
{
case "PC":
strategy = new ProduceConsume(connection,server);
break;
case "PEC":
strategy = new ProduceExecuteConsume(connection,server);
break;
case "EWYK":
strategy = new EatWhatYouKill(connection,server);
break;
default:
throw new IllegalStateException();
}
LifeCycle.start(strategy);
}
@Override
public void run()
{
strategy.produce();
}
}
@Benchmark
@BenchmarkMode({Mode.Throughput})
public long testStrategy(ThreadState state) throws Exception
{
int r;
switch(server.getRandom(8))
{
case 0:
r = 4;
break;
case 1:
case 2:
r = 2;
break;
default:
r = 1;
break;
}
List<CompletableFuture<String>> results = new ArrayList<>(r);
for (int i=0;i<r;i++)
{
CompletableFuture<String> result = new CompletableFuture<String>();
results.add(result);
state.connection.submit(result);
}
if (nonBlocking)
Invocable.invokeNonBlocking(state);
else
state.run();
long hash = 0;
for (CompletableFuture<String> result : results)
hash ^= result.get().hashCode();
return hash;
}
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(EWYKBenchmark.class.getSimpleName())
.warmupIterations(2)
.measurementIterations(3)
.forks(1)
.threads(400)
// .syncIterations(true) // Don't start all threads at same time
.warmupTime(new TimeValue(10000,TimeUnit.MILLISECONDS))
.measurementTime(new TimeValue(10000,TimeUnit.MILLISECONDS))
// .addProfiler(CompilerProfiler.class)
// .addProfiler(LinuxPerfProfiler.class)
// .addProfiler(LinuxPerfNormProfiler.class)
// .addProfiler(LinuxPerfAsmProfiler.class)
// .resultFormat(ResultFormatType.CSV)
.build();
new Runner(opt).run();
}
}

View File

@ -0,0 +1,167 @@
//
// ========================================================================
// Copyright (c) 1995-2018 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util.thread.strategy.jmh;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.eclipse.jetty.util.thread.ExecutionStrategy.Producer;
import org.eclipse.jetty.util.thread.Invocable;
import org.openjdk.jmh.infra.Blackhole;
public class TestConnection implements Producer
{
private final TestServer _server;
private final String _sessionid;
private final boolean _sleeping;
private final Queue<CompletableFuture<String>> _queue = new ConcurrentLinkedQueue<>();
public TestConnection(TestServer server, boolean sleeping)
{
_server = server;
_sessionid = "SESSION-"+server.getRandom(100000000);
_sleeping = sleeping;
}
@Override
public Runnable produce()
{
CompletableFuture<String> futureResult = _queue.poll();
if (futureResult==null)
return null;
// The map will represent the request object
Map<String,String> request = new HashMap<>();
request.put("sessionid",_sessionid);
int random = _server.getRandom(1000);
int uri = random%100;
boolean blocking = (random/10)>2;
int delay = (blocking && uri%4==1)?random/2:0;
request.put("uri",uri+".txt"); // one of 100 resources on server
request.put("blocking",blocking?"True":"False"); // one of 100 resources on server
request.put("delay",Integer.toString(delay)); // random processing delay 0-100ms on 25% of requests
Blackhole.consumeCPU(_server.getRandom(500)); // random CPU
Handler handler = new Handler(request,futureResult);
return handler;
}
private class Handler implements Runnable, Invocable
{
private final Map<String,String> _request;
private final CompletableFuture<String> _futureResult;
private final boolean _blocking;
public Handler(Map<String, String> request, CompletableFuture<String> futureResult)
{
_request = request;
_futureResult = futureResult;
_blocking = Boolean.valueOf(request.get("blocking"));
}
@Override
public InvocationType getInvocationType()
{
return _blocking?InvocationType.BLOCKING:InvocationType.NON_BLOCKING;
}
@Override
public void run()
{
// Build a response
StringBuilder response = new StringBuilder(4096);
try
{
// Get the request
String uri = _request.get("uri");
// Obtain the session
Map<String,String> session = _server.getSession(_request.get("sessionid"));
// Check we are authenticated
String userid;
synchronized (session)
{
userid = session.get("userid");
Blackhole.consumeCPU(100);
if (userid==null)
{
userid="USER-"+Math.abs(session.hashCode());
session.put("userid",userid);
}
}
// simulate processing delay, blocking, etc.
int delay = Integer.parseInt(_request.get("delay"));
if (delay>0)
{
if (_sleeping)
{
try
{
Thread.sleep(delay/8);
}
catch(InterruptedException e)
{}
}
else
Blackhole.consumeCPU(delay*150);
}
// get the uri
response.append("URI: ").append(uri).append(System.lineSeparator());
// look for a file
File file = _server.getFile(uri);
if (file.exists())
{
response.append("contentType: ").append("file").append(System.lineSeparator());
response.append("lastModified: ").append(Long.toString(file.lastModified())).append(System.lineSeparator());
response.append("contentLength: ").append(Long.toString(file.length())).append(System.lineSeparator());
response.append("content: ").append("This should be content from a file, but lets pretend it was cached").append(System.lineSeparator());
}
else
{
response.append("contentType: ").append("dynamic").append(System.lineSeparator());
response.append("This is content for ").append(uri)
.append(" generated for ").append(userid)
.append(" with session ").append(_request.get("sessionid"))
.append(" for user ").append(userid)
.append(" on thread ").append(Thread.currentThread());
}
Blackhole.consumeCPU(1000);
}
finally
{
_futureResult.complete(response.toString());
}
}
}
public void submit(CompletableFuture<String> futureResult)
{
_queue.offer(futureResult);
}
}

View File

@ -0,0 +1,97 @@
//
// ========================================================================
// Copyright (c) 1995-2018 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util.thread.strategy.jmh;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadLocalRandom;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.TryExecutor;
public class TestServer implements Executor, TryExecutor
{
private final ConcurrentMap<String,Map<String,String>> _sessions= new ConcurrentHashMap<>();
private final QueuedThreadPool _threadpool = new QueuedThreadPool(200);
private final File _docroot;
TestServer(File docroot)
{
_threadpool.setReservedThreads(20);
_docroot=docroot;
}
TestServer()
{
this(new File("/tmp/"));
}
public Map<String,String> getSession(String sessionid)
{
Map<String,String> session = _sessions.get(sessionid);
if (session==null)
{
session = new HashMap<>();
session.put("id",sessionid);
Map<String,String> s =_sessions.putIfAbsent(sessionid,session);
if (s!=null)
session=s;
}
return session;
}
public int getRandom(int max)
{
return ThreadLocalRandom.current().nextInt(max);
}
@Override
public void execute(Runnable task)
{
_threadpool.execute(task);
}
@Override
public boolean tryExecute(Runnable task)
{
return _threadpool.tryExecute(task);
}
public void start() throws Exception
{
_threadpool.start();
}
public void stop() throws Exception
{
_threadpool.stop();
}
public File getFile(String path)
{
return new File(_docroot,path);
}
}

View File

@ -25,6 +25,8 @@
<alpn.version>undefined</alpn.version>
<conscrypt.version>1.0.0.RC11</conscrypt.version>
<asm.version>6.0</asm.version>
<jmh.version>1.20</jmh.version>
<jmhjar.name>benchmarks</jmhjar.name>
</properties>
<licenses>