Issue #4495 ReservedThreadExecutor optimise (#4499)

* Issue #4495 ReservedThreadExecutor optimise

Use synchronousQueue for task handoff

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4495 ReservedThreadExecutor optimise

updates from review

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4495 ReservedThreadExecutor optimise

Use a linked queue rather than a deque(as a stack).  This should be simpler, better optimised and less contended.  Idling has been simplified so that a reserve thread is always dropped every idle period.

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4495 ReservedThreadExecutor optimise

reverted RTE and added a JMH benchmark

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* More variants and longer tests

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Added LQ

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* removed SQ2

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4495 ReservedThreadExecutor optimise

Replaced real implementation with SQ

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4495 RTE optimise

Removed alternate implementations

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4495 RTE optimise

updates from review

Signed-off-by: Greg Wilkins <gregw@webtide.com>
This commit is contained in:
Greg Wilkins 2020-01-31 19:54:44 +01:00 committed by GitHub
parent b6f2fd2432
commit 4dbf8a3a9e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 171 additions and 37 deletions

View File

@ -0,0 +1,147 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util.thread.jmh;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
import org.eclipse.jetty.util.thread.TryExecutor;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
@State(Scope.Benchmark)
@Warmup(iterations = 3, time = 2000, timeUnit = TimeUnit.MILLISECONDS)
@Measurement(iterations = 3, time = 2000, timeUnit = TimeUnit.MILLISECONDS)
public class ReservedThreadPoolBenchmark
{
public enum Type
{
RTP
}
@Param({"RTP"})
Type type;
@Param({"0", "8", "32"})
int size;
QueuedThreadPool qtp;
TryExecutor pool;
@Setup // (Level.Iteration)
public void buildPool()
{
qtp = new QueuedThreadPool();
switch (type)
{
case RTP:
{
ReservedThreadExecutor pool = new ReservedThreadExecutor(qtp, size);
pool.setIdleTimeout(1, TimeUnit.SECONDS);
this.pool = pool;
break;
}
}
LifeCycle.start(qtp);
LifeCycle.start(pool);
}
@TearDown // (Level.Iteration)
public void shutdownPool()
{
LifeCycle.stop(pool);
LifeCycle.stop(qtp);
pool = null;
qtp = null;
}
@Benchmark
@BenchmarkMode(Mode.Throughput)
@Threads(1)
public void testFew() throws Exception
{
doJob();
}
@Benchmark
@BenchmarkMode(Mode.Throughput)
@Threads(8)
public void testSome() throws Exception
{
doJob();
}
@Benchmark
@BenchmarkMode(Mode.Throughput)
@Threads(200)
public void testMany() throws Exception
{
doJob();
}
void doJob() throws Exception
{
CountDownLatch latch = new CountDownLatch(1);
Runnable task = () ->
{
Blackhole.consumeCPU(1);
Thread.yield();
Blackhole.consumeCPU(1);
latch.countDown();
Blackhole.consumeCPU(1);
};
if (!pool.tryExecute(task))
qtp.execute(task);
latch.await();
}
public static void main(String[] args) throws RunnerException
{
Options opt = new OptionsBuilder()
.include(ReservedThreadPoolBenchmark.class.getSimpleName())
.forks(1)
// .threads(400)
// .syncIterations(true) // Don't start all threads at same time
// .addProfiler(CompilerProfiler.class)
// .addProfiler(LinuxPerfProfiler.class)
// .addProfiler(LinuxPerfNormProfiler.class)
// .addProfiler(LinuxPerfAsmProfiler.class)
// .resultFormat(ResultFormatType.CSV)
.build();
new Runner(opt).run();
}
}

View File

@ -21,9 +21,9 @@ package org.eclipse.jetty.util.thread;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import org.eclipse.jetty.util.ProcessorUtils;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
@ -217,7 +217,8 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
}
int size = _size.decrementAndGet();
thread.offer(task);
if (!thread.offer(task))
return false;
if (size == 0 && task != STOP)
startReservedThread();
@ -264,20 +265,25 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
private class ReservedThread implements Runnable
{
private final Locker _locker = new Locker();
private final Condition _wakeup = _locker.newCondition();
private final SynchronousQueue<Runnable> _task = new SynchronousQueue<>();
private boolean _starting = true;
private Runnable _task = null;
public void offer(Runnable task)
public boolean offer(Runnable task)
{
if (LOG.isDebugEnabled())
LOG.debug("{} offer {}", this, task);
try (Locker.Lock lock = _locker.lock())
try
{
_task = task;
_wakeup.signal();
_task.put(task);
return true;
}
catch (Throwable e)
{
LOG.ignore(e);
_size.getAndIncrement();
_stack.addFirst(this);
return false;
}
}
@ -296,37 +302,14 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
if (!isRunning())
return STOP;
boolean idle = false;
try (Locker.Lock lock = _locker.lock())
try
{
if (_task == null)
{
try
{
if (_idleTime == 0)
_wakeup.await();
else
idle = !_wakeup.await(_idleTime, _idleTimeUnit);
}
catch (InterruptedException e)
{
LOG.ignore(e);
}
}
else
{
Runnable task = _task;
_task = null;
if (LOG.isDebugEnabled())
LOG.debug("{} task={}", this, task);
Runnable task = _idleTime <= 0 ? _task.take() : _task.poll(_idleTime, _idleTimeUnit);
if (LOG.isDebugEnabled())
LOG.debug("{} task={}", this, task);
if (task != null)
return task;
}
}
if (idle)
{
// Because threads are held in a stack, excess threads will be
// idle. However, we cannot remove threads from the bottom of
// the stack, so we submit a poison pill job to stop the thread
@ -336,6 +319,10 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
LOG.debug("{} IDLE", this);
tryExecute(STOP);
}
catch (InterruptedException e)
{
LOG.ignore(e);
}
}
}