Merge remote-tracking branch 'origin/jetty-9.4.x'

This commit is contained in:
Greg Wilkins 2017-09-28 12:56:30 +10:00
commit 0564b47657
5 changed files with 485 additions and 104 deletions

View File

@ -36,17 +36,33 @@ import org.eclipse.jetty.server.LocalConnector.LocalEndPoint;
import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.eclipse.jetty.toolchain.test.AdvancedRunner;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
@RunWith(AdvancedRunner.class) @RunWith(AdvancedRunner.class)
public class NotAcceptingTest public class NotAcceptingTest
{ {
Server server;
@Before
public void before()
{
server = new Server();
}
@After
public void after() throws Exception
{
server.stop();
server=null;
}
@Test @Test
public void testServerConnectorBlockingAccept() throws Exception public void testServerConnectorBlockingAccept() throws Exception
{ {
Server server = new Server();
ServerConnector connector = new ServerConnector(server,1,1); ServerConnector connector = new ServerConnector(server,1,1);
connector.setPort(0); connector.setPort(0);
connector.setIdleTimeout(500); connector.setIdleTimeout(500);
@ -121,7 +137,6 @@ public class NotAcceptingTest
@Test @Test
public void testLocalConnector() throws Exception public void testLocalConnector() throws Exception
{ {
Server server = new Server();
LocalConnector connector = new LocalConnector(server); LocalConnector connector = new LocalConnector(server);
connector.setIdleTimeout(500); connector.setIdleTimeout(500);
server.addConnector(connector); server.addConnector(connector);
@ -174,7 +189,7 @@ public class NotAcceptingTest
{ {
// Can we accept the original? // Can we accept the original?
connector.setAccepting(true); connector.setAccepting(true);
uri = handler.exchange.exchange("delayed connection"); uri = handler.exchange.exchange("delayed connection",10,TimeUnit.SECONDS);
assertThat(uri,is("/four")); assertThat(uri,is("/four"));
response = HttpTester.parseResponse(client2.getResponse()); response = HttpTester.parseResponse(client2.getResponse());
assertThat(response.getStatus(),is(200)); assertThat(response.getStatus(),is(200));
@ -188,7 +203,6 @@ public class NotAcceptingTest
@Test @Test
public void testServerConnectorAsyncAccept() throws Exception public void testServerConnectorAsyncAccept() throws Exception
{ {
Server server = new Server();
ServerConnector connector = new ServerConnector(server,0,1); ServerConnector connector = new ServerConnector(server,0,1);
connector.setPort(0); connector.setPort(0);
connector.setIdleTimeout(500); connector.setIdleTimeout(500);

View File

@ -0,0 +1,91 @@
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util;
import java.util.concurrent.atomic.AtomicReference;
/**
* ConcurrentStack
*
* Nonblocking stack using variation of Treiber's algorithm
* that allows for reduced garbage
*/
public class ConcurrentStack<I>
{
private final NodeStack<Holder> stack = new NodeStack<>();
public void push(I item)
{
stack.push(new Holder(item));
}
public I pop()
{
Holder<I> holder = stack.pop();
if (holder==null)
return null;
return holder.item;
}
private static class Holder<I> extends Node
{
final I item;
Holder(I item)
{
this.item = item;
}
}
public static class Node
{
Node next;
}
public static class NodeStack<N extends Node>
{
AtomicReference<Node> stack = new AtomicReference<Node>();
public void push(N node)
{
while(true)
{
Node top = stack.get();
node.next = top;
if (stack.compareAndSet(top,node))
break;
}
}
public N pop()
{
while (true)
{
Node top = stack.get();
if (top==null)
return null;
if (stack.compareAndSet(top,top.next))
{
top.next = null;
return (N)top;
}
}
}
}
}

View File

@ -20,8 +20,11 @@ package org.eclipse.jetty.util.thread;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import org.eclipse.jetty.util.ConcurrentStack;
import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.AbstractLifeCycle; import org.eclipse.jetty.util.component.AbstractLifeCycle;
@ -32,21 +35,39 @@ import org.eclipse.jetty.util.log.Logger;
* An Executor using preallocated/reserved Threads from a wrapped Executor. * An Executor using preallocated/reserved Threads from a wrapped Executor.
* <p>Calls to {@link #execute(Runnable)} on a {@link ReservedThreadExecutor} will either succeed * <p>Calls to {@link #execute(Runnable)} on a {@link ReservedThreadExecutor} will either succeed
* with a Thread immediately being assigned the Runnable task, or fail if no Thread is * with a Thread immediately being assigned the Runnable task, or fail if no Thread is
* available. Threads are preallocated up to the capacity from a wrapped {@link Executor}. * available.
* <p>Threads are reserved lazily, with a new reserved thread being allocated from a
* wrapped {@link Executor} when an execution fails. If the {@link #setIdleTimeout(long, TimeUnit)}
* is set to non zero (default 1 minute), then the reserved thread pool will shrink by 1 thread
* whenever it has been idle for that period.
*/ */
@ManagedObject("A pool for reserved threads") @ManagedObject("A pool for reserved threads")
public class ReservedThreadExecutor extends AbstractLifeCycle implements Executor public class ReservedThreadExecutor extends AbstractLifeCycle implements Executor
{ {
private static final Logger LOG = Log.getLogger(ReservedThreadExecutor.class); private static final Logger LOG = Log.getLogger(ReservedThreadExecutor.class);
private static final Runnable STOP = new Runnable()
{
@Override
public void run()
{}
@Override
public String toString()
{
return "STOP!";
}
};
private final Executor _executor; private final Executor _executor;
private final Locker _locker = new Locker(); private final int _capacity;
private final ReservedThread[] _queue; private final ConcurrentStack.NodeStack<ReservedThread> _stack;
private int _head; private final AtomicInteger _size = new AtomicInteger();
private int _size; private final AtomicInteger _pending = new AtomicInteger();
private int _pending;
private ThreadBudget.Lease _lease; private ThreadBudget.Lease _lease;
private Object _owner; private Object _owner;
private long _idleTime = 1L;
private TimeUnit _idleTimeUnit = TimeUnit.MINUTES;
public ReservedThreadExecutor(Executor executor) public ReservedThreadExecutor(Executor executor)
{ {
@ -59,7 +80,7 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
* is calculated based on a heuristic from the number of available processors and * is calculated based on a heuristic from the number of available processors and
* thread pool size. * thread pool size.
*/ */
public ReservedThreadExecutor(Executor executor,int capacity) public ReservedThreadExecutor(Executor executor, int capacity)
{ {
this(executor,capacity,null); this(executor,capacity,null);
} }
@ -73,10 +94,12 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
public ReservedThreadExecutor(Executor executor,int capacity, Object owner) public ReservedThreadExecutor(Executor executor,int capacity, Object owner)
{ {
_executor = executor; _executor = executor;
_queue = new ReservedThread[reservedThreads(executor,capacity)]; _capacity = reservedThreads(executor,capacity);
_stack = new ConcurrentStack.NodeStack<>();
_owner = owner; _owner = owner;
}
LOG.debug("{}",this);
}
/** /**
* @param executor The executor to use to obtain threads * @param executor The executor to use to obtain threads
* @param capacity The number of threads to preallocate, If less than 0 then capacity * @param capacity The number of threads to preallocate, If less than 0 then capacity
@ -106,31 +129,46 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
@ManagedAttribute(value = "max number of reserved threads", readonly = true) @ManagedAttribute(value = "max number of reserved threads", readonly = true)
public int getCapacity() public int getCapacity()
{ {
return _queue.length; return _capacity;
} }
@ManagedAttribute(value = "available reserved threads", readonly = true) @ManagedAttribute(value = "available reserved threads", readonly = true)
public int getAvailable() public int getAvailable()
{ {
try (Locker.Lock lock = _locker.lock()) return _size.get();
{
return _size;
}
} }
@ManagedAttribute(value = "pending reserved threads", readonly = true) @ManagedAttribute(value = "pending reserved threads", readonly = true)
public int getPending() public int getPending()
{ {
try (Locker.Lock lock = _locker.lock()) return _pending.get();
{
return _pending;
} }
@ManagedAttribute(value = "idletimeout in MS", readonly = true)
public long getIdleTimeoutMs()
{
if(_idleTimeUnit==null)
return 0;
return _idleTimeUnit.toMillis(_idleTime);
}
/**
* Set the idle timeout for shrinking the reserved thread pool
* @param idleTime Time to wait before shrinking, or 0 for no timeout.
* @param idleTimeUnit Time units for idle timeout
*/
public void setIdleTimeout(long idleTime, TimeUnit idleTimeUnit)
{
if (isRunning())
throw new IllegalStateException();
_idleTime = idleTime;
_idleTimeUnit = idleTimeUnit;
} }
@Override @Override
public void doStart() throws Exception public void doStart() throws Exception
{ {
_lease = ThreadBudget.leaseFrom(getExecutor(),this,_queue.length); _lease = ThreadBudget.leaseFrom(getExecutor(),this,_capacity);
super.doStart(); super.doStart();
} }
@ -139,16 +177,18 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
{ {
if (_lease!=null) if (_lease!=null)
_lease.close(); _lease.close();
try (Locker.Lock lock = _locker.lock()) while(true)
{ {
while (_size>0) ReservedThread thread = _stack.pop();
if (thread==null)
{ {
ReservedThread thread = _queue[_head]; super.doStop();
_queue[_head] = null; return;
_head = (_head+1)%_queue.length;
_size--;
thread._wakeup.signal();
} }
_size.decrementAndGet();
thread.stop();
} }
} }
@ -165,110 +205,178 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
*/ */
public boolean tryExecute(Runnable task) public boolean tryExecute(Runnable task)
{ {
if (LOG.isDebugEnabled())
LOG.debug("{} tryExecute {}",this ,task);
if (task==null) if (task==null)
return false; return false;
try (Locker.Lock lock = _locker.lock()) ReservedThread thread = _stack.pop();
if (thread==null && task!=STOP)
{ {
if (_size==0) startReservedThread();
{
if (_pending<_queue.length)
{
_executor.execute(new ReservedThread());
_pending++;
}
return false; return false;
} }
ReservedThread thread = _queue[_head]; int size = _size.decrementAndGet();
_queue[_head] = null; thread.offer(task);
_head = (_head+1)%_queue.length;
_size--;
if (_size==0 && _pending<_queue.length) if (size==0 && task!=STOP)
{ startReservedThread();
_executor.execute(new ReservedThread());
_pending++;
}
thread._task = task;
thread._wakeup.signal();
return true; return true;
} }
private void startReservedThread()
{
try
{
while (true)
{
int pending = _pending.get();
if (pending >= _capacity)
return;
if (_pending.compareAndSet(pending, pending + 1))
{
if (LOG.isDebugEnabled())
LOG.debug("{} startReservedThread p={}", this, pending + 1);
_executor.execute(new ReservedThread());
return;
}
}
}
catch(RejectedExecutionException e) catch(RejectedExecutionException e)
{ {
LOG.ignore(e); LOG.ignore(e);
return false;
} }
} }
@Override @Override
public String toString() public String toString()
{
try (Locker.Lock lock = _locker.lock())
{ {
if (_owner==null) if (_owner==null)
return String.format("%s@%x{s=%d,p=%d}",this.getClass().getSimpleName(),hashCode(),_size,_pending); return String.format("%s@%x{s=%d,p=%d}",this.getClass().getSimpleName(),hashCode(),_size.get(),_pending.get());
return String.format("%s@%s{s=%d,p=%d}",this.getClass().getSimpleName(),_owner,_size,_pending); return String.format("%s@%s{s=%d,p=%d}",this.getClass().getSimpleName(),_owner,_size.get(),_pending.get());
}
} }
private class ReservedThread implements Runnable private class ReservedThread extends ConcurrentStack.Node implements Runnable
{ {
private Condition _wakeup = null; private final Locker _locker = new Locker();
private final Condition _wakeup = _locker.newCondition();
private boolean _starting = true;
private Runnable _task = null; private Runnable _task = null;
private void reservedWait() throws InterruptedException public void offer(Runnable task)
{ {
_wakeup.await(); if (LOG.isDebugEnabled())
} LOG.debug("{} offer {}", this, task);
@Override
public void run()
{
while (true)
{
Runnable task = null;
try (Locker.Lock lock = _locker.lock()) try (Locker.Lock lock = _locker.lock())
{ {
// if this is our first loop, decrement pending count _task = task;
if (_wakeup==null) _wakeup.signal();
{ }
_pending--;
_wakeup = _locker.newCondition();
} }
// Exit if no longer running or there now too many preallocated threads public void stop()
if (!isRunning() || _size>=_queue.length)
break;
// Insert ourselves in the queue
_queue[(_head+_size++)%_queue.length] = this;
// Wait for a task, ignoring spurious wakeups
while (isRunning() && task==null)
{ {
try offer(STOP);
}
private Runnable reservedWait()
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} waiting", this); LOG.debug("{} waiting", this);
reservedWait();
if (LOG.isDebugEnabled()) Runnable task = null;
LOG.debug("{} woken up", this); while (isRunning() && task==null)
task = _task; {
_task = null; boolean idle = false;
try (Locker.Lock lock = _locker.lock())
{
if (_task == null)
{
try
{
if (_idleTime == 0)
_wakeup.await();
else
idle = !_wakeup.await(_idleTime, _idleTimeUnit);
} }
catch (InterruptedException e) catch (InterruptedException e)
{ {
LOG.ignore(e); LOG.ignore(e);
} }
} }
task = _task;
_task = null;
} }
// Run any task if (idle)
if (task!=null)
{ {
// Because threads are held in a stack, excess threads will be
// idle. However, we cannot remove threads from the bottom of
// the stack, so we submit a poison pill job to stop the thread
// on top of the stack (which unfortunately will be the most
// recently used)
if (LOG.isDebugEnabled())
LOG.debug("{} IDLE", this);
tryExecute(STOP);
}
}
if (LOG.isDebugEnabled())
LOG.debug("{} task={}", this, task);
return task==null?STOP:task;
}
@Override
public void run()
{
while (isRunning())
{
Runnable task = null;
// test and increment size BEFORE decrementing pending,
// so that we don't have a race starting new pending.
while(true)
{
int size = _size.get();
if (size>=_capacity)
{
if (LOG.isDebugEnabled())
LOG.debug("{} size {} > capacity", this, size, _capacity);
if (_starting)
_pending.decrementAndGet();
return;
}
if (_size.compareAndSet(size,size+1))
break;
}
if (_starting)
{
if (LOG.isDebugEnabled())
LOG.debug("{} started", this);
_pending.decrementAndGet();
_starting = false;
}
// Insert ourselves in the stack. Size is already incremented, but
// that only effects the decision to keep other threads reserved.
_stack.push(this);
// Wait for a task
task = reservedWait();
if (task==STOP)
// return on STOP poison pill
break;
// Run the task
try try
{ {
task.run(); task.run();
@ -278,7 +386,15 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
LOG.warn(e); LOG.warn(e);
} }
} }
if (LOG.isDebugEnabled())
LOG.debug("{} Exited", this);
} }
@Override
public String toString()
{
return String.format("%s@%x",ReservedThreadExecutor.this,hashCode());
} }
} }
} }

View File

@ -18,24 +18,44 @@
package org.eclipse.jetty.util.thread; package org.eclipse.jetty.util.thread;
import java.security.SecureRandom;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.Deque; import java.util.Deque;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.StreamSupport;
import org.eclipse.jetty.toolchain.test.annotation.Stress;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
public class ReservedThreadExecutorTest public class ReservedThreadExecutorTest
{ {
private static final int SIZE = 2; private static final int SIZE = 2;
private static final Runnable NOOP = () -> {}; private static final Runnable NOOP = new Runnable()
{
@Override
public void run()
{}
@Override
public String toString()
{
return "NOOP!";
}
};
private TestExecutor _executor; private TestExecutor _executor;
private ReservedThreadExecutor _reservedExecutor; private ReservedThreadExecutor _reservedExecutor;
@ -127,17 +147,87 @@ public class ReservedThreadExecutorTest
waitForAllAvailable(); waitForAllAvailable();
} }
protected void waitForAllAvailable() throws InterruptedException
@Test
public void testShrink() throws Exception
{
final long IDLE = 1000;
_reservedExecutor.stop();
_reservedExecutor.setIdleTimeout(IDLE,TimeUnit.MILLISECONDS);
_reservedExecutor.start();
// Reserved threads are lazily started.
assertThat(_executor._queue.size(), is(0));
assertThat(_reservedExecutor.tryExecute(NOOP),is(false));
_executor.execute();
waitForNoPending();
CountDownLatch latch = new CountDownLatch(1);
Runnable waitForLatch = ()->{try {latch.await();} catch(Exception e){}};
assertThat(_reservedExecutor.tryExecute(waitForLatch),is(true));
_executor.execute();
assertThat(_reservedExecutor.tryExecute(NOOP),is(false));
_executor.execute();
waitForNoPending();
latch.countDown();
waitForAvailable(2);
// Check that regular moderate activity keeps the pool a moderate size
TimeUnit.MILLISECONDS.sleep(IDLE/2);
assertThat(_reservedExecutor.tryExecute(NOOP),is(true));
waitForAvailable(2);
TimeUnit.MILLISECONDS.sleep(IDLE/2);
assertThat(_reservedExecutor.tryExecute(NOOP),is(true));
waitForAvailable(1);
TimeUnit.MILLISECONDS.sleep(IDLE/2);
assertThat(_reservedExecutor.tryExecute(NOOP),is(true));
waitForAvailable(1);
TimeUnit.MILLISECONDS.sleep(IDLE/2);
assertThat(_reservedExecutor.tryExecute(NOOP),is(true));
waitForAvailable(1);
TimeUnit.MILLISECONDS.sleep(IDLE/2);
assertThat(_reservedExecutor.tryExecute(NOOP),is(true));
waitForAvailable(1);
// check fully idle goes to zero
TimeUnit.MILLISECONDS.sleep(IDLE);
assertThat(_reservedExecutor.getAvailable(),is(0));
}
protected void waitForNoPending() throws InterruptedException
{ {
long started = System.nanoTime(); long started = System.nanoTime();
while (_reservedExecutor.getAvailable() < SIZE) while (_reservedExecutor.getPending() > 0)
{
long elapsed = System.nanoTime() - started;
if (elapsed > TimeUnit.SECONDS.toNanos(10))
Assert.fail("pending="+_reservedExecutor.getPending());
Thread.sleep(10);
}
assertThat(_reservedExecutor.getPending(), is(0));
}
protected void waitForAvailable(int size) throws InterruptedException
{
long started = System.nanoTime();
while (_reservedExecutor.getAvailable() < size)
{ {
long elapsed = System.nanoTime() - started; long elapsed = System.nanoTime() - started;
if (elapsed > TimeUnit.SECONDS.toNanos(10)) if (elapsed > TimeUnit.SECONDS.toNanos(10))
Assert.fail(); Assert.fail();
Thread.sleep(10); Thread.sleep(10);
} }
assertThat(_reservedExecutor.getAvailable(), is(SIZE)); assertThat(_reservedExecutor.getAvailable(), is(size));
}
protected void waitForAllAvailable() throws InterruptedException
{
waitForAvailable(SIZE);
} }
private static class TestExecutor implements Executor private static class TestExecutor implements Executor
@ -177,4 +267,73 @@ public class ReservedThreadExecutorTest
} }
} }
} }
@Ignore
@Test
public void stressTest() throws Exception
{
QueuedThreadPool pool = new QueuedThreadPool(20);
pool.setStopTimeout(10000);
pool.start();
ReservedThreadExecutor reserved = new ReservedThreadExecutor(pool,10);
reserved.setIdleTimeout(0,null);
reserved.start();
final int LOOPS = 1000000;
final Random random = new Random();
final AtomicInteger executions = new AtomicInteger(LOOPS);
final CountDownLatch executed = new CountDownLatch(executions.get());
final AtomicInteger usedReserved = new AtomicInteger(0);
final AtomicInteger usedPool = new AtomicInteger(0);
Runnable task = new Runnable()
{
public void run()
{
try
{
while (true)
{
int loops = executions.get();
if (loops <= 0)
return;
if (executions.compareAndSet(loops, loops - 1))
{
if (reserved.tryExecute(this))
{
usedReserved.incrementAndGet();
} else
{
usedPool.incrementAndGet();
pool.execute(this);
}
return;
}
}
}
finally
{
executed.countDown();
}
}
};
task.run();
task.run();
task.run();
task.run();
task.run();
task.run();
task.run();
task.run();
assertTrue(executed.await(60,TimeUnit.SECONDS));
reserved.stop();
pool.stop();
assertThat(usedReserved.get()+usedPool.get(),is(LOOPS));
System.err.printf("reserved=%d pool=%d total=%d%n",usedReserved.get(),usedPool.get(),LOOPS);
}
} }

View File

@ -2,3 +2,4 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
#org.eclipse.jetty.util.LEVEL=DEBUG #org.eclipse.jetty.util.LEVEL=DEBUG
#org.eclipse.jetty.util.PathWatcher.LEVEL=DEBUG #org.eclipse.jetty.util.PathWatcher.LEVEL=DEBUG
#org.eclipse.jetty.util.thread.ReservedThreadExecutor.LEVEL=DEBUG