Issue #1970 - ManagedSelector loses selector thread (#1971)

* Issue #1970 - ManagedSelector loses selector thread.

Removed broken data structure ConcurrentStack (ABA problem).

Made ReservedThreadExecutor use a ConcurrentLinkedDeque
instead of ConcurrentStack.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2017-11-14 10:48:33 +01:00 committed by GitHub
parent 47d46ec60e
commit 0f07c6518e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 46 additions and 145 deletions

View File

@ -74,7 +74,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
_id = id;
SelectorProducer producer = new SelectorProducer();
Executor executor = selectorManager.getExecutor();
_strategy = new EatWhatYouKill(producer,executor,_selectorManager.getBean(ReservedThreadExecutor.class));
_strategy = new EatWhatYouKill(producer,executor,_selectorManager.getBean(ReservedThreadExecutor.class));
addBean(_strategy,true);
setStopTimeout(5000);
}
@ -218,6 +218,14 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
submit(new DestroyEndPoint(endPoint));
}
private int getActionSize()
{
try (Locker.Lock lock = _locker.lock())
{
return _actions.size();
}
}
@Override
public String dump()
{
@ -248,11 +256,12 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
public String toString()
{
Selector selector = _selector;
return String.format("%s id=%s keys=%d selected=%d",
return String.format("%s id=%s keys=%d selected=%d actions=%d",
super.toString(),
_id,
selector != null && selector.isOpen() ? selector.keys().size() : -1,
selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1);
selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1,
getActionSize());
}
/**

View File

@ -1,91 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util;
import java.util.concurrent.atomic.AtomicReference;
/**
* ConcurrentStack
*
* Nonblocking stack using variation of Treiber's algorithm
* that allows for reduced garbage
*/
public class ConcurrentStack<I>
{
private final NodeStack<Holder> stack = new NodeStack<>();
public void push(I item)
{
stack.push(new Holder(item));
}
public I pop()
{
Holder<I> holder = stack.pop();
if (holder==null)
return null;
return holder.item;
}
private static class Holder<I> extends Node
{
final I item;
Holder(I item)
{
this.item = item;
}
}
public static class Node
{
Node next;
}
public static class NodeStack<N extends Node>
{
AtomicReference<Node> stack = new AtomicReference<Node>();
public void push(N node)
{
while(true)
{
Node top = stack.get();
node.next = top;
if (stack.compareAndSet(top,node))
break;
}
}
public N pop()
{
while (true)
{
Node top = stack.get();
if (top==null)
return null;
if (stack.compareAndSet(top,top.next))
{
top.next = null;
return (N)top;
}
}
}
}
}

View File

@ -18,13 +18,13 @@
package org.eclipse.jetty.util.thread;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import org.eclipse.jetty.util.ConcurrentStack;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
@ -49,7 +49,8 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
{
@Override
public void run()
{}
{
}
@Override
public String toString()
@ -60,10 +61,9 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
private final Executor _executor;
private final int _capacity;
private final ConcurrentStack.NodeStack<ReservedThread> _stack;
private final ConcurrentLinkedDeque<ReservedThread> _stack;
private final AtomicInteger _size = new AtomicInteger();
private final AtomicInteger _pending = new AtomicInteger();
private final AtomicInteger _waiting = new AtomicInteger();
private ThreadPoolBudget.Lease _lease;
private Object _owner;
@ -96,7 +96,7 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
{
_executor = executor;
_capacity = reservedThreads(executor,capacity);
_stack = new ConcurrentStack.NodeStack<>();
_stack = new ConcurrentLinkedDeque<>();
_owner = owner;
LOG.debug("{}",this);
@ -145,12 +145,6 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
return _pending.get();
}
@ManagedAttribute(value = "waiting reserved threads", readonly = true)
public int getWaiting()
{
return _waiting.get();
}
@ManagedAttribute(value = "idletimeout in MS", readonly = true)
public long getIdleTimeoutMs()
{
@ -186,17 +180,13 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
_lease.close();
while(true)
{
ReservedThread thread = _stack.pop();
if (thread==null)
{
super.doStop();
return;
}
ReservedThread thread = _stack.pollFirst();
if (thread == null)
break;
_size.decrementAndGet();
thread.stop();
}
super.doStop();
}
@Override
@ -218,7 +208,7 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
if (task==null)
return false;
ReservedThread thread = _stack.pop();
ReservedThread thread = _stack.pollFirst();
if (thread==null)
{
if (task!=STOP)
@ -263,16 +253,16 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
@Override
public String toString()
{
return String.format("%s@%s{s=%d/%d,p=%d,w=%d}",
return String.format("%s@%x{s=%d/%d,p=%d}@%s",
getClass().getSimpleName(),
_owner != null ? _owner : Integer.toHexString(hashCode()),
hashCode(),
_size.get(),
_capacity,
_pending.get(),
_waiting.get());
_owner);
}
private class ReservedThread extends ConcurrentStack.Node implements Runnable
private class ReservedThread implements Runnable
{
private final Locker _locker = new Locker();
private final Condition _wakeup = _locker.newCondition();
@ -302,7 +292,7 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
LOG.debug("{} waiting", this);
Runnable task = null;
while (isRunning() && task==null)
while (task==null)
{
boolean idle = false;
@ -312,7 +302,6 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
{
try
{
_waiting.incrementAndGet();
if (_idleTime == 0)
_wakeup.await();
else
@ -322,10 +311,6 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
{
LOG.ignore(e);
}
finally
{
_waiting.decrementAndGet();
}
}
task = _task;
_task = null;
@ -347,7 +332,7 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
if (LOG.isDebugEnabled())
LOG.debug("{} task={}", this, task);
return task==null?STOP:task;
return task;
}
@Override
@ -355,8 +340,6 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
{
while (isRunning())
{
Runnable task = null;
// test and increment size BEFORE decrementing pending,
// so that we don't have a race starting new pending.
while(true)
@ -384,10 +367,10 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
// Insert ourselves in the stack. Size is already incremented, but
// that only effects the decision to keep other threads reserved.
_stack.push(this);
_stack.offerFirst(this);
// Wait for a task
task = reservedWait();
Runnable task = reservedWait();
if (task==STOP)
// return on STOP poison pill

View File

@ -37,25 +37,25 @@ import org.eclipse.jetty.util.thread.Locker.Lock;
import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
/**
* <p>A strategy where the thread that produces will run the resulting task if it
* <p>A strategy where the thread that produces will run the resulting task if it
* is possible to do so without thread starvation.</p>
*
*
* <p>This strategy preemptively dispatches a thread as a pending producer, so that
* when a thread produces a task it can immediately run the task and let the pending
* producer thread take over producing. If necessary another thread will be dispatched
* to replace the pending producing thread. When operating in this pattern, the
* to replace the pending producing thread. When operating in this pattern, the
* sub-strategy is called Execute Produce Consume (EPC)
* </p>
* <p>However, if the task produced uses the {@link Invocable} API to indicate that
* it will not block, then the strategy will run it directly, regardless of the
* presence of a pending producing thread and then resume producing after the
* <p>However, if the task produced uses the {@link Invocable} API to indicate that
* it will not block, then the strategy will run it directly, regardless of the
* presence of a pending producing thread and then resume producing after the
* task has completed. This sub-strategy is also used if the strategy has been
* configured with a maximum of 0 pending threads and the thread currently producing
* does not use the {@link Invocable} API to indicate that it will not block.
* When operating in this pattern, the sub-strategy is called
* ProduceConsume (PC).
* </p>
* <p>If there is no pending producer thread available and if the task has not
* <p>If there is no pending producer thread available and if the task has not
* indicated it is non-blocking, then this strategy will dispatch the execution of
* the task and immediately continue producing. When operating in this pattern, the
* sub-strategy is called ProduceExecuteConsume (PEC).
@ -67,7 +67,7 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
private static final Logger LOG = Log.getLogger(EatWhatYouKill.class);
private enum State { IDLE, PRODUCING, REPRODUCING }
private final Locker _locker = new Locker();
private final LongAdder _nonBlocking = new LongAdder();
private final LongAdder _blocking = new LongAdder();
@ -86,7 +86,7 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
{
this(producer,executor,new ReservedThreadExecutor(executor,maxReserved));
}
public EatWhatYouKill(Producer producer, Executor executor, ReservedThreadExecutor producers)
{
_producer = producer;
@ -108,11 +108,11 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
case IDLE:
execute = true;
break;
case PRODUCING:
_state = State.REPRODUCING;
break;
default:
break;
}
@ -153,13 +153,13 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
_state = State.PRODUCING;
producing = true;
break;
case PRODUCING:
// Keep other Thread producing
if (reproduce)
_state = State.REPRODUCING;
break;
default:
break;
}
@ -170,7 +170,7 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
public boolean doProduce()
{
boolean producing = true;
while (isRunning() && producing)
while (isRunning() && producing)
{
// If we got here, then we are the thread that is producing.
Runnable task = null;
@ -269,7 +269,7 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
}
}
}
return producing;
}
@ -323,7 +323,7 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
getState(builder);
return builder.toString();
}
private void getString(StringBuilder builder)
{
builder.append(getClass().getSimpleName());