469633 - Make SpinLock behavior pluggable.

Renamed SpinLock to Locker, uses ReentrantLock by default, but can be
turned into a spin lock by setting the system property
org.eclipse.jetty.util.thread.Locker.spin=true.
This commit is contained in:
Simone Bordet 2015-06-08 17:15:54 +02:00
parent 87b1ae44f8
commit b18adb525f
12 changed files with 313 additions and 259 deletions

View File

@ -36,7 +36,7 @@ import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.SpinLock;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Sweeper;
public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
@ -44,7 +44,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
protected static final Logger LOG = Log.getLogger(ConnectionPool.class);
private final AtomicInteger connectionCount = new AtomicInteger();
private final SpinLock lock = new SpinLock();
private final Locker locker = new Locker();
private final Destination destination;
private final int maxConnections;
private final Callback requester;
@ -137,7 +137,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
protected void idleCreated(Connection connection)
{
boolean idle;
try (SpinLock.Lock lock = this.lock.lock())
try (Locker.Lock lock = this.locker.lock())
{
// Use "cold" new connections as last.
idle = idleConnections.offerLast(connection);
@ -149,7 +149,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
{
boolean acquired;
Connection connection;
try (SpinLock.Lock lock = this.lock.lock())
try (Locker.Lock lock = this.locker.lock())
{
connection = idleConnections.pollFirst();
if (connection == null)
@ -180,7 +180,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
public boolean release(Connection connection)
{
boolean idle;
try (SpinLock.Lock lock = this.lock.lock())
try (Locker.Lock lock = this.locker.lock())
{
if (!activeConnections.remove(connection))
return false;
@ -216,7 +216,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
{
boolean activeRemoved;
boolean idleRemoved;
try (SpinLock.Lock lock = this.lock.lock())
try (Locker.Lock lock = this.locker.lock())
{
activeRemoved = activeConnections.remove(connection);
idleRemoved = idleConnections.remove(connection);
@ -235,7 +235,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
public boolean isActive(Connection connection)
{
try (SpinLock.Lock lock = this.lock.lock())
try (Locker.Lock lock = this.locker.lock())
{
return activeConnections.contains(connection);
}
@ -243,7 +243,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
public boolean isIdle(Connection connection)
{
try (SpinLock.Lock lock = this.lock.lock())
try (Locker.Lock lock = this.locker.lock())
{
return idleConnections.contains(connection);
}
@ -258,7 +258,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
{
List<Connection> idles = new ArrayList<>();
List<Connection> actives = new ArrayList<>();
try (SpinLock.Lock lock = this.lock.lock())
try (Locker.Lock lock = this.locker.lock())
{
idles.addAll(idleConnections);
idleConnections.clear();
@ -286,7 +286,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
{
List<Connection> actives = new ArrayList<>();
List<Connection> idles = new ArrayList<>();
try (SpinLock.Lock lock = this.lock.lock())
try (Locker.Lock lock = this.locker.lock())
{
actives.addAll(activeConnections);
idles.addAll(idleConnections);
@ -299,7 +299,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
public boolean sweep()
{
List<Sweeper.Sweepable> toSweep = new ArrayList<>();
try (SpinLock.Lock lock = this.lock.lock())
try (Locker.Lock lock = this.locker.lock())
{
for (Connection connection : getActiveConnections())
{
@ -330,7 +330,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
{
int activeSize;
int idleSize;
try (SpinLock.Lock lock = this.lock.lock())
try (Locker.Lock lock = this.locker.lock())
{
activeSize = activeConnections.size();
idleSize = idleConnections.size();

View File

@ -21,13 +21,13 @@ package org.eclipse.jetty.client;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.SpinLock;
import org.eclipse.jetty.util.thread.Locker;
public abstract class HttpChannel
{
protected static final Logger LOG = Log.getLogger(HttpChannel.class);
private final SpinLock _lock = new SpinLock();
private final Locker _locker = new Locker();
private final HttpDestination _destination;
private HttpExchange _exchange;
@ -53,7 +53,7 @@ public abstract class HttpChannel
{
boolean result = false;
boolean abort = true;
try (SpinLock.Lock lock = _lock.lock())
try (Locker.Lock lock = _locker.lock())
{
if (_exchange == null)
{
@ -76,7 +76,7 @@ public abstract class HttpChannel
public boolean disassociate(HttpExchange exchange)
{
boolean result = false;
try (SpinLock.Lock lock = _lock.lock())
try (Locker.Lock lock = _locker.lock())
{
HttpExchange existing = _exchange;
_exchange = null;
@ -93,7 +93,7 @@ public abstract class HttpChannel
public HttpExchange getHttpExchange()
{
try (SpinLock.Lock lock = _lock.lock())
try (Locker.Lock lock = _locker.lock())
{
return _exchange;
}

View File

@ -24,7 +24,7 @@ import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.SpinLock;
import org.eclipse.jetty.util.thread.Locker;
public class HttpExchange
{
@ -34,7 +34,7 @@ public class HttpExchange
private final HttpRequest request;
private final List<Response.ResponseListener> listeners;
private final HttpResponse response;
private final SpinLock _lock = new SpinLock();
private final Locker _locker = new Locker();
private State requestState = State.PENDING;
private State responseState = State.PENDING;
private HttpChannel _channel;
@ -64,7 +64,7 @@ public class HttpExchange
public Throwable getRequestFailure()
{
try (SpinLock.Lock lock = _lock.lock())
try (Locker.Lock lock = _locker.lock())
{
return requestFailure;
}
@ -82,7 +82,7 @@ public class HttpExchange
public Throwable getResponseFailure()
{
try (SpinLock.Lock lock = _lock.lock())
try (Locker.Lock lock = _locker.lock())
{
return responseFailure;
}
@ -99,7 +99,7 @@ public class HttpExchange
{
boolean result = false;
boolean abort = false;
try (SpinLock.Lock lock = _lock.lock())
try (Locker.Lock lock = _locker.lock())
{
// Only associate if the exchange state is initial,
// as the exchange could be already failed.
@ -123,7 +123,7 @@ public class HttpExchange
void disassociate(HttpChannel channel)
{
boolean abort = false;
try (SpinLock.Lock lock = _lock.lock())
try (Locker.Lock lock = _locker.lock())
{
if (_channel != channel || requestState != State.TERMINATED || responseState != State.TERMINATED)
abort = true;
@ -136,7 +136,7 @@ public class HttpExchange
private HttpChannel getHttpChannel()
{
try (SpinLock.Lock lock = _lock.lock())
try (Locker.Lock lock = _locker.lock())
{
return _channel;
}
@ -144,7 +144,7 @@ public class HttpExchange
public boolean requestComplete(Throwable failure)
{
try (SpinLock.Lock lock = _lock.lock())
try (Locker.Lock lock = _locker.lock())
{
return completeRequest(failure);
}
@ -163,7 +163,7 @@ public class HttpExchange
public boolean responseComplete(Throwable failure)
{
try (SpinLock.Lock lock = _lock.lock())
try (Locker.Lock lock = _locker.lock())
{
return completeResponse(failure);
}
@ -183,7 +183,7 @@ public class HttpExchange
public Result terminateRequest()
{
Result result = null;
try (SpinLock.Lock lock = _lock.lock())
try (Locker.Lock lock = _locker.lock())
{
if (requestState == State.COMPLETED)
requestState = State.TERMINATED;
@ -200,7 +200,7 @@ public class HttpExchange
public Result terminateResponse()
{
Result result = null;
try (SpinLock.Lock lock = _lock.lock())
try (Locker.Lock lock = _locker.lock())
{
if (responseState == State.COMPLETED)
responseState = State.TERMINATED;
@ -220,7 +220,7 @@ public class HttpExchange
// This will avoid that this exchange can be associated to a channel.
boolean abortRequest;
boolean abortResponse;
try (SpinLock.Lock lock = _lock.lock())
try (Locker.Lock lock = _locker.lock())
{
abortRequest = completeRequest(failure);
abortResponse = completeResponse(failure);
@ -273,7 +273,7 @@ public class HttpExchange
public void resetResponse()
{
try (SpinLock.Lock lock = _lock.lock())
try (Locker.Lock lock = _locker.lock())
{
responseState = State.PENDING;
responseFailure = null;
@ -290,7 +290,7 @@ public class HttpExchange
@Override
public String toString()
{
try (SpinLock.Lock lock = _lock.lock())
try (Locker.Lock lock = _locker.lock())
{
return String.format("%s@%x req=%s/%s@%h res=%s/%s@%h",
HttpExchange.class.getSimpleName(),

View File

@ -31,8 +31,8 @@ import org.eclipse.jetty.util.ArrayQueue;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.SpinLock;
/* ------------------------------------------------------------ */
@ -51,10 +51,10 @@ public class ByteArrayEndPoint extends AbstractEndPoint
public void run()
{
getFillInterest().fillable();
}
}
};
private final SpinLock _lock = new SpinLock();
private final Locker _locker = new Locker();
private final Queue<ByteBuffer> _inQ = new ArrayQueue<>();
private ByteBuffer _out;
private boolean _ishut;
@ -73,7 +73,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
/* ------------------------------------------------------------ */
/**
* @param input the input bytes
* @param input the input bytes
* @param outputSize the output size
*/
public ByteArrayEndPoint(byte[] input, int outputSize)
@ -83,7 +83,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
/* ------------------------------------------------------------ */
/**
* @param input the input string (converted to bytes using default encoding charset)
* @param input the input string (converted to bytes using default encoding charset)
* @param outputSize the output size
*/
public ByteArrayEndPoint(String input, int outputSize)
@ -131,16 +131,16 @@ public class ByteArrayEndPoint extends AbstractEndPoint
{
new Thread(task,"BAEPoint-"+Integer.toHexString(hashCode())).start();
}
/* ------------------------------------------------------------ */
@Override
protected void needsFillInterest() throws IOException
{
try(SpinLock.Lock lock = _lock.lock())
try(Locker.Lock lock = _locker.lock())
{
if (_closed)
throw new ClosedChannelException();
ByteBuffer in = _inQ.peek();
if (BufferUtil.hasContent(in) || in==EOF)
execute(_runFillable);
@ -162,7 +162,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
public void addInput(ByteBuffer in)
{
boolean fillable=false;
try(SpinLock.Lock lock = _lock.lock())
try(Locker.Lock lock = _locker.lock())
{
if (_inQ.peek()==EOF)
throw new RuntimeIOException(new EOFException());
@ -185,7 +185,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
public void addInputAndExecute(ByteBuffer in)
{
boolean fillable=false;
try(SpinLock.Lock lock = _lock.lock())
try(Locker.Lock lock = _locker.lock())
{
if (_inQ.peek()==EOF)
throw new RuntimeIOException(new EOFException());
@ -294,7 +294,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
@Override
public boolean isOpen()
{
try(SpinLock.Lock lock = _lock.lock())
try(Locker.Lock lock = _locker.lock())
{
return !_closed;
}
@ -306,7 +306,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
@Override
public boolean isInputShutdown()
{
try(SpinLock.Lock lock = _lock.lock())
try(Locker.Lock lock = _locker.lock())
{
return _ishut||_closed;
}
@ -318,7 +318,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
@Override
public boolean isOutputShutdown()
{
try(SpinLock.Lock lock = _lock.lock())
try(Locker.Lock lock = _locker.lock())
{
return _oshut||_closed;
}
@ -328,7 +328,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
public void shutdownInput()
{
boolean close=false;
try(SpinLock.Lock lock = _lock.lock())
try(Locker.Lock lock = _locker.lock())
{
_ishut=true;
if (_oshut && !_closed)
@ -346,7 +346,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
public void shutdownOutput()
{
boolean close=false;
try(SpinLock.Lock lock = _lock.lock())
try(Locker.Lock lock = _locker.lock())
{
_oshut=true;
if (_ishut && !_closed)
@ -364,7 +364,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
public void close()
{
boolean close=false;
try(SpinLock.Lock lock = _lock.lock())
try(Locker.Lock lock = _locker.lock())
{
if (!_closed)
close=_closed=_ishut=_oshut=true;
@ -391,7 +391,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
{
int filled=0;
boolean close=false;
try(SpinLock.Lock lock = _lock.lock())
try(Locker.Lock lock = _locker.lock())
{
while(true)
{
@ -400,10 +400,10 @@ public class ByteArrayEndPoint extends AbstractEndPoint
if (_ishut)
return -1;
if (_inQ.isEmpty())
break;
ByteBuffer in= _inQ.peek();
if (in==EOF)
{
@ -413,7 +413,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
filled=-1;
break;
}
if (BufferUtil.hasContent(in))
{
filled=BufferUtil.append(buffer,in);
@ -424,7 +424,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
_inQ.poll();
}
}
if (close)
super.close();
if (filled>0)

View File

@ -44,8 +44,8 @@ import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.SpinLock;
/**
* <p>{@link ManagedSelector} wraps a {@link Selector} simplifying non-blocking operations on channels.</p>
@ -57,7 +57,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
{
private static final Logger LOG = Log.getLogger(ManagedSelector.class);
private final SpinLock _lock = new SpinLock();
private final Locker _locker = new Locker();
private boolean _selecting = false;
private final Queue<Runnable> _actions = new ArrayDeque<>();
private final SelectorManager _selectorManager;
@ -116,7 +116,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
LOG.debug("Queued change {} on {}", change, this);
Selector selector = null;
try (SpinLock.Lock lock = _lock.lock())
try (Locker.Lock lock = _locker.lock())
{
_actions.offer(change);
if (_selecting)
@ -187,7 +187,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
while (true)
{
Runnable action;
try (SpinLock.Lock lock = _lock.lock())
try (Locker.Lock lock = _locker.lock())
{
action = _actions.poll();
if (action == null)
@ -233,7 +233,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
if (LOG.isDebugEnabled())
LOG.debug("Selector loop woken up from select, {}/{} selected", selected, selector.keys().size());
try (SpinLock.Lock lock = _lock.lock())
try (Locker.Lock lock = _locker.lock())
{
// finished selecting
_selecting = false;

View File

@ -25,8 +25,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.SpinLock;
/**
* An ChannelEndpoint that can be scheduled by {@link SelectorManager}.
@ -35,7 +35,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel
{
public static final Logger LOG = Log.getLogger(SelectChannelEndPoint.class);
private final SpinLock _lock = new SpinLock();
private final Locker _locker = new Locker();
private boolean _updatePending;
/**
@ -88,7 +88,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel
int readyOps = _key.readyOps();
int oldInterestOps;
int newInterestOps;
try (SpinLock.Lock lock = _lock.lock())
try (Locker.Lock lock = _locker.lock())
{
_updatePending = true;
// Remove the readyOps, that here can only be OP_READ or OP_WRITE (or both).
@ -117,7 +117,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel
{
int oldInterestOps;
int newInterestOps;
try (SpinLock.Lock lock = _lock.lock())
try (Locker.Lock lock = _locker.lock())
{
_updatePending = false;
oldInterestOps = _currentInterestOps;
@ -154,7 +154,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel
int oldInterestOps;
int newInterestOps;
boolean pending;
try (SpinLock.Lock lock = _lock.lock())
try (Locker.Lock lock = _locker.lock())
{
pending = _updatePending;
oldInterestOps = _desiredInterestOps;

View File

@ -31,8 +31,8 @@ import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.ContextHandler.Context;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.SpinLock;
/**
* Implementation of AsyncContext interface that holds the state of request-response cycle.
@ -62,17 +62,17 @@ public class HttpChannelState
*/
public enum Action
{
REQUEST_DISPATCH, // handle a normal request dispatch
REQUEST_DISPATCH, // handle a normal request dispatch
ASYNC_DISPATCH, // handle an async request dispatch
ASYNC_EXPIRED, // handle an async timeout
WRITE_CALLBACK, // handle an IO write callback
READ_CALLBACK, // handle an IO read callback
WAIT, // Wait for further events
WAIT, // Wait for further events
COMPLETE // Complete the channel
}
/**
* The state of the servlet async API. This can lead or follow the
* The state of the servlet async API. This can lead or follow the
* channel dispatch state and also includes reasons such as expired,
* dispatched or completed.
*/
@ -86,7 +86,7 @@ public class HttpChannelState
}
private final boolean DEBUG=LOG.isDebugEnabled();
private final SpinLock _lock=new SpinLock();
private final Locker _locker=new Locker();
private final HttpChannel _channel;
private List<AsyncListener> _asyncListeners;
@ -109,7 +109,7 @@ public class HttpChannelState
public State getState()
{
try(SpinLock.Lock lock=_lock.lock())
try(Locker.Lock lock= _locker.lock())
{
return _state;
}
@ -117,7 +117,7 @@ public class HttpChannelState
public void addListener(AsyncListener listener)
{
try(SpinLock.Lock lock=_lock.lock())
try(Locker.Lock lock= _locker.lock())
{
if (_asyncListeners==null)
_asyncListeners=new ArrayList<>();
@ -127,7 +127,7 @@ public class HttpChannelState
public void setTimeout(long ms)
{
try(SpinLock.Lock lock=_lock.lock())
try(Locker.Lock lock= _locker.lock())
{
_timeoutMs=ms;
}
@ -135,7 +135,7 @@ public class HttpChannelState
public long getTimeout()
{
try(SpinLock.Lock lock=_lock.lock())
try(Locker.Lock lock= _locker.lock())
{
return _timeoutMs;
}
@ -143,7 +143,7 @@ public class HttpChannelState
public AsyncContextEvent getAsyncContextEvent()
{
try(SpinLock.Lock lock=_lock.lock())
try(Locker.Lock lock= _locker.lock())
{
return _event;
}
@ -152,7 +152,7 @@ public class HttpChannelState
@Override
public String toString()
{
try(SpinLock.Lock lock=_lock.lock())
try(Locker.Lock lock= _locker.lock())
{
return String.format("%s@%x{s=%s a=%s i=%b r=%s w=%b}",getClass().getSimpleName(),hashCode(),_state,_async,_initial,
_asyncReadPossible?(_asyncReadUnready?"PU":"P!U"):(_asyncReadUnready?"!PU":"!P!U"),
@ -162,7 +162,7 @@ public class HttpChannelState
public String getStatusString()
{
try(SpinLock.Lock lock=_lock.lock())
try(Locker.Lock lock= _locker.lock())
{
return String.format("s=%s i=%b a=%s",_state,_initial,_async);
}
@ -175,7 +175,7 @@ public class HttpChannelState
{
if(DEBUG)
LOG.debug("{} handling {}",this,_state);
try(SpinLock.Lock lock=_lock.lock())
try(Locker.Lock lock= _locker.lock())
{
switch(_state)
{
@ -197,7 +197,7 @@ public class HttpChannelState
_asyncReadUnready=false;
return Action.READ_CALLBACK;
}
// TODO refactor the same as read
if (_asyncWrite)
{
@ -205,7 +205,7 @@ public class HttpChannelState
_asyncWrite=false;
return Action.WRITE_CALLBACK;
}
if (_async!=null)
{
Async async=_async;
@ -228,7 +228,7 @@ public class HttpChannelState
return Action.WAIT;
}
}
return Action.WAIT;
default:
@ -240,12 +240,12 @@ public class HttpChannelState
public void startAsync(AsyncContextEvent event)
{
final List<AsyncListener> lastAsyncListeners;
try(SpinLock.Lock lock=_lock.lock())
try(Locker.Lock lock= _locker.lock())
{
if (_state!=State.DISPATCHED || _async!=null)
throw new IllegalStateException(this.getStatusString());
_async=Async.STARTED;
_event=event;
lastAsyncListeners=_asyncListeners;
@ -270,7 +270,7 @@ public class HttpChannelState
protected void error(Throwable th)
{
try(SpinLock.Lock lock=_lock.lock())
try(Locker.Lock lock= _locker.lock())
{
if (_event!=null)
_event.setThrowable(th);
@ -293,7 +293,7 @@ public class HttpChannelState
if(DEBUG)
LOG.debug("{} unhandle {}",this,_state);
try(SpinLock.Lock lock=_lock.lock())
try(Locker.Lock lock= _locker.lock())
{
switch(_state)
{
@ -340,9 +340,9 @@ public class HttpChannelState
_state=State.ASYNC_IO;
action = Action.WRITE_CALLBACK;
}
else
else
{
schedule_event=_event;
schedule_event=_event;
read_interested=_asyncReadUnready;
_state=State.ASYNC_WAIT;
action = Action.WAIT;
@ -378,17 +378,17 @@ public class HttpChannelState
public void dispatch(ServletContext context, String path)
{
boolean dispatch;
try(SpinLock.Lock lock=_lock.lock())
try(Locker.Lock lock= _locker.lock())
{
if (_async!=Async.STARTED && _async!=Async.EXPIRING)
throw new IllegalStateException("AsyncContext#dispath "+this.getStatusString());
_async=Async.DISPATCH;
if (context!=null)
_event.setDispatchContext(context);
if (path!=null)
_event.setDispatchPath(path);
switch(_state)
{
case DISPATCHED:
@ -418,7 +418,7 @@ public class HttpChannelState
{
final List<AsyncListener> aListeners;
AsyncContextEvent event;
try(SpinLock.Lock lock=_lock.lock())
try(Locker.Lock lock= _locker.lock())
{
if (_async!=Async.STARTED)
return;
@ -444,9 +444,9 @@ public class HttpChannelState
}
}
}
boolean dispatch=false;
try(SpinLock.Lock lock=_lock.lock())
try(Locker.Lock lock= _locker.lock())
{
if (_async==Async.EXPIRING)
{
@ -467,7 +467,7 @@ public class HttpChannelState
{
// just like resume, except don't set _dispatched=true;
boolean handle=false;
try(SpinLock.Lock lock=_lock.lock())
try(Locker.Lock lock= _locker.lock())
{
if (_async!=Async.STARTED && _async!=Async.EXPIRING)
throw new IllegalStateException(this.getStatusString());
@ -492,7 +492,7 @@ public class HttpChannelState
public void errorComplete()
{
try(SpinLock.Lock lock=_lock.lock())
try(Locker.Lock lock= _locker.lock())
{
_async=Async.COMPLETE;
_event.setDispatchContext(null);
@ -506,7 +506,7 @@ public class HttpChannelState
{
final List<AsyncListener> aListeners;
final AsyncContextEvent event;
try(SpinLock.Lock lock=_lock.lock())
try(Locker.Lock lock= _locker.lock())
{
switch(_state)
{
@ -554,7 +554,7 @@ public class HttpChannelState
protected void recycle()
{
cancelTimeout();
try(SpinLock.Lock lock=_lock.lock())
try(Locker.Lock lock= _locker.lock())
{
switch(_state)
{
@ -576,11 +576,11 @@ public class HttpChannelState
_event=null;
}
}
public void upgrade()
{
cancelTimeout();
try(SpinLock.Lock lock=_lock.lock())
try(Locker.Lock lock= _locker.lock())
{
switch(_state)
{
@ -617,8 +617,8 @@ public class HttpChannelState
protected void cancelTimeout()
{
final AsyncContextEvent event;
try(SpinLock.Lock lock=_lock.lock())
{
try(Locker.Lock lock= _locker.lock())
{
event=_event;
}
if (event!=null)
@ -627,15 +627,15 @@ public class HttpChannelState
public boolean isIdle()
{
try(SpinLock.Lock lock=_lock.lock())
try(Locker.Lock lock= _locker.lock())
{
return _state==State.IDLE;
}
}
public boolean isExpired()
{
try(SpinLock.Lock lock=_lock.lock())
try(Locker.Lock lock= _locker.lock())
{
return _async==Async.EXPIRED;
}
@ -643,7 +643,7 @@ public class HttpChannelState
public boolean isInitial()
{
try(SpinLock.Lock lock=_lock.lock())
try(Locker.Lock lock= _locker.lock())
{
return _initial;
}
@ -651,7 +651,7 @@ public class HttpChannelState
public boolean isSuspended()
{
try(SpinLock.Lock lock=_lock.lock())
try(Locker.Lock lock= _locker.lock())
{
return _state==State.ASYNC_WAIT || _state==State.DISPATCHED && _async==Async.STARTED;
}
@ -659,7 +659,7 @@ public class HttpChannelState
boolean isCompleting()
{
try(SpinLock.Lock lock=_lock.lock())
try(Locker.Lock lock= _locker.lock())
{
return _state==State.COMPLETING;
}
@ -667,7 +667,7 @@ public class HttpChannelState
boolean isCompleted()
{
try(SpinLock.Lock lock=_lock.lock())
try(Locker.Lock lock= _locker.lock())
{
return _state == State.COMPLETED;
}
@ -675,18 +675,18 @@ public class HttpChannelState
public boolean isAsyncStarted()
{
try(SpinLock.Lock lock=_lock.lock())
{
try(Locker.Lock lock= _locker.lock())
{
if (_state==State.DISPATCHED)
return _async!=null;
return _async==Async.STARTED || _async==Async.EXPIRING;
}
}
public boolean isAsync()
{
try(SpinLock.Lock lock=_lock.lock())
try(Locker.Lock lock= _locker.lock())
{
return !_initial || _async!=null;
}
@ -705,11 +705,11 @@ public class HttpChannelState
public ContextHandler getContextHandler()
{
final AsyncContextEvent event;
try(SpinLock.Lock lock=_lock.lock())
{
try(Locker.Lock lock= _locker.lock())
{
event=_event;
}
if (event!=null)
{
Context context=((Context)event.getServletContext());
@ -722,8 +722,8 @@ public class HttpChannelState
public ServletResponse getServletResponse()
{
final AsyncContextEvent event;
try(SpinLock.Lock lock=_lock.lock())
{
try(Locker.Lock lock= _locker.lock())
{
event=_event;
}
if (event!=null && event.getSuppliedResponse()!=null)
@ -746,7 +746,7 @@ public class HttpChannelState
_channel.getRequest().setAttribute(name,attribute);
}
/* ------------------------------------------------------------ */
/** Called to signal async read isReady() has returned false.
* This indicates that there is no content available to be consumed
@ -757,7 +757,7 @@ public class HttpChannelState
public void onReadUnready()
{
boolean interested=false;
try(SpinLock.Lock lock=_lock.lock())
try(Locker.Lock lock= _locker.lock())
{
// We were already unready, this is not a state change, so do nothing
if (!_asyncReadUnready)
@ -768,22 +768,22 @@ public class HttpChannelState
interested=true;
}
}
if (interested)
_channel.asyncReadFillInterested();
}
/* ------------------------------------------------------------ */
/** Called to signal that content is now available to read.
* If the channel is in ASYNC_WAIT state and unready (ie isReady() has
* returned false), then the state is changed to ASYNC_WOKEN and true
* returned false), then the state is changed to ASYNC_WOKEN and true
* is returned.
* @return True IFF the channel was unready and in ASYNC_WAIT state
*/
public boolean onReadPossible()
{
boolean woken=false;
try(SpinLock.Lock lock=_lock.lock())
try(Locker.Lock lock= _locker.lock())
{
_asyncReadPossible=true;
if (_state==State.ASYNC_WAIT && _asyncReadUnready)
@ -794,7 +794,7 @@ public class HttpChannelState
}
return woken;
}
/* ------------------------------------------------------------ */
/** Called to signal that the channel is ready for a callback.
* This is similar to calling {@link #onReadUnready()} followed by
@ -805,7 +805,7 @@ public class HttpChannelState
public boolean onReadReady()
{
boolean woken=false;
try(SpinLock.Lock lock=_lock.lock())
try(Locker.Lock lock= _locker.lock())
{
_asyncReadUnready=true;
_asyncReadPossible=true;
@ -817,20 +817,20 @@ public class HttpChannelState
}
return woken;
}
public boolean isReadPossible()
{
try(SpinLock.Lock lock=_lock.lock())
try(Locker.Lock lock= _locker.lock())
{
return _asyncReadPossible;
}
}
public boolean onWritePossible()
{
boolean handle=false;
try(SpinLock.Lock lock=_lock.lock())
try(Locker.Lock lock= _locker.lock())
{
_asyncWrite=true;
if (_state==State.ASYNC_WAIT)

View File

@ -61,9 +61,9 @@ import org.eclipse.jetty.util.component.Graceful;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ShutdownThread;
import org.eclipse.jetty.util.thread.SpinLock;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
@ -87,11 +87,11 @@ public class Server extends HandlerWrapper implements Attributes
private boolean _dumpAfterStart=false;
private boolean _dumpBeforeStop=false;
private RequestLog _requestLog;
private final SpinLock _dateLock = new SpinLock();
private final Locker _dateLocker = new Locker();
private volatile DateField _dateField;
/* ------------------------------------------------------------ */
public Server()
{
@ -113,11 +113,11 @@ public class Server extends HandlerWrapper implements Attributes
}
/* ------------------------------------------------------------ */
/**
/**
* Convenience constructor
* <p>
* Creates server and a {@link ServerConnector} at the passed address.
* @param addr the inet socket address to create the connector from
* @param addr the inet socket address to create the connector from
*/
public Server(@Name("address")InetSocketAddress addr)
{
@ -161,8 +161,8 @@ public class Server extends HandlerWrapper implements Attributes
{
return _stopAtShutdown;
}
/* ------------------------------------------------------------ */
/**
* Set a graceful stop time.
@ -309,10 +309,10 @@ public class Server extends HandlerWrapper implements Attributes
long now=System.currentTimeMillis();
long seconds = now/1000;
DateField df = _dateField;
if (df==null || df._seconds!=seconds)
{
try(SpinLock.Lock lock = _dateLock.lock())
try(Locker.Lock lock = _dateLocker.lock())
{
df = _dateField;
if (df==null || df._seconds!=seconds)
@ -338,7 +338,7 @@ public class Server extends HandlerWrapper implements Attributes
//Register the Server with the handler thread for receiving
//remote stop commands
ShutdownMonitor.register(this);
//Start a thread waiting to receive "stop" commands.
ShutdownMonitor.getInstance().start(); // initialize
@ -357,7 +357,7 @@ public class Server extends HandlerWrapper implements Attributes
{
if (connector instanceof AbstractConnector)
acceptors+=((AbstractConnector)connector).getAcceptors();
if (connector instanceof ServerConnector)
selectors+=((ServerConnector)connector).getSelectorManager().getSelectorCount();
}
@ -366,7 +366,7 @@ public class Server extends HandlerWrapper implements Attributes
int needed=1+selectors+acceptors;
if (max>0 && needed>max)
throw new IllegalStateException(String.format("Insufficient threads: max=%d < needed(acceptors=%d + selectors=%d + request=1)",max,acceptors,selectors));
try
{
super.doStart();
@ -380,7 +380,7 @@ public class Server extends HandlerWrapper implements Attributes
for (Connector connector : _connectors)
{
try
{
{
connector.start();
}
catch(Throwable e)
@ -388,7 +388,7 @@ public class Server extends HandlerWrapper implements Attributes
mex.add(e);
}
}
if (isDumpAfterStart())
dumpStdErr();
@ -422,7 +422,7 @@ public class Server extends HandlerWrapper implements Attributes
futures.add(connector.shutdown());
// Then tell the contexts that we are shutting down
Handler[] gracefuls = getChildHandlersByClass(Graceful.class);
for (Handler graceful : gracefuls)
futures.add(((Graceful)graceful).shutdown());
@ -480,11 +480,11 @@ public class Server extends HandlerWrapper implements Attributes
if (getStopAtShutdown())
ShutdownThread.deregister(this);
//Unregister the Server with the handler thread for receiving
//remote stop commands as we are stopped already
ShutdownMonitor.deregister(this);
mex.ifExceptionThrow();
}
@ -538,7 +538,7 @@ public class Server extends HandlerWrapper implements Attributes
final Request baseRequest=connection.getRequest();
final String path=event.getPath();
if (path!=null)
{
// this is a dispatch with a path
@ -724,6 +724,6 @@ public class Server extends HandlerWrapper implements Attributes
_seconds = seconds;
_dateField = dateField;
}
}
}

View File

@ -20,7 +20,7 @@ package org.eclipse.jetty.util;
import java.nio.channels.ClosedChannelException;
import org.eclipse.jetty.util.thread.SpinLock;
import org.eclipse.jetty.util.thread.Locker;
/**
* This specialized callback implements a pattern that allows
@ -72,31 +72,31 @@ public abstract class IteratingCallback implements Callback
* and set iterating to true.
*/
PROCESSING,
/**
* Waiting for a schedule callback
*/
PENDING,
/**
* Called by a schedule callback
*/
CALLED,
/**
* The overall job has succeeded as indicated by a {@link Action#SUCCEEDED} return
* The overall job has succeeded as indicated by a {@link Action#SUCCEEDED} return
* from {@link IteratingCallback#process()}
*/
SUCCEEDED,
/**
* The overall job has failed as indicated by a call to {@link IteratingCallback#failed(Throwable)}
*/
FAILED,
/**
* This callback has been closed and cannot be reset.
*/
*/
CLOSED
}
@ -118,28 +118,28 @@ public abstract class IteratingCallback implements Callback
* may have not yet been invoked.
*/
SCHEDULED,
/**
* Indicates that {@link #process()} has completed the overall job.
*/
SUCCEEDED
}
private SpinLock _lock = new SpinLock();
private Locker _locker = new Locker();
private State _state;
private boolean _iterate;
protected IteratingCallback()
{
_state = State.IDLE;
}
protected IteratingCallback(boolean needReset)
{
_state = needReset ? State.SUCCEEDED : State.IDLE;
}
/**
* Method called by {@link #iterate()} to process the sub task.
* <p>
@ -153,8 +153,8 @@ public abstract class IteratingCallback implements Callback
* has been started</li>
* <li>{@link Action#SUCCEEDED} when the overall job is completed</li>
* </ul>
*
* @return the appropriate Action
*
* @return the appropriate Action
*
* @throws Exception if the sub task processing throws
*/
@ -168,7 +168,7 @@ public abstract class IteratingCallback implements Callback
protected void onCompleteSuccess()
{
}
/**
* Invoked when the overall task has completed with a failure.
* @param cause the throwable to indicate cause of failure
@ -181,9 +181,9 @@ public abstract class IteratingCallback implements Callback
/**
* This method must be invoked by applications to start the processing
* of sub tasks. It can be called at any time by any thread, and it's
* of sub tasks. It can be called at any time by any thread, and it's
* contract is that when called, then the {@link #process()} method will
* be called during or soon after, either by the calling thread or by
* be called during or soon after, either by the calling thread or by
* another thread.
*/
public void iterate()
@ -192,7 +192,7 @@ public abstract class IteratingCallback implements Callback
loop: while (true)
{
try (SpinLock.Lock lock = _lock.lock())
try (Locker.Lock lock = _locker.lock())
{
switch (_state)
{
@ -206,7 +206,7 @@ public abstract class IteratingCallback implements Callback
process=true;
break loop;
case PROCESSING:
case PROCESSING:
_iterate=true;
break loop;
@ -224,13 +224,13 @@ public abstract class IteratingCallback implements Callback
processing();
}
private void processing()
private void processing()
{
// This should only ever be called when in processing state, however a failed or close call
// may happen concurrently, so state is not assumed.
boolean on_complete_success=false;
// While we are processing
processing: while (true)
{
@ -247,7 +247,7 @@ public abstract class IteratingCallback implements Callback
}
// acted on the action we have just received
try(SpinLock.Lock lock = _lock.lock())
try(Locker.Lock lock = _locker.lock())
{
switch (_state)
{
@ -272,7 +272,7 @@ public abstract class IteratingCallback implements Callback
}
case SCHEDULED:
{
{
// we won the race against the callback, so the callback has to process and we can break processing
_state=State.PENDING;
break processing;
@ -280,7 +280,7 @@ public abstract class IteratingCallback implements Callback
case SUCCEEDED:
{
// we lost the race against the callback,
// we lost the race against the callback,
_iterate=false;
_state=State.SUCCEEDED;
on_complete_success=true;
@ -288,7 +288,7 @@ public abstract class IteratingCallback implements Callback
}
default:
throw new IllegalStateException("state="+_state+" action="+action);
throw new IllegalStateException("state="+_state+" action="+action);
}
}
@ -297,14 +297,14 @@ public abstract class IteratingCallback implements Callback
switch (action)
{
case SCHEDULED:
{
{
// we lost the race, so we have to keep processing
_state=State.PROCESSING;
continue processing;
}
default:
throw new IllegalStateException("state="+_state+" action="+action);
throw new IllegalStateException("state="+_state+" action="+action);
}
}
@ -316,7 +316,7 @@ public abstract class IteratingCallback implements Callback
case IDLE:
case PENDING:
default:
throw new IllegalStateException("state="+_state+" action="+action);
throw new IllegalStateException("state="+_state+" action="+action);
}
}
}
@ -334,7 +334,7 @@ public abstract class IteratingCallback implements Callback
public void succeeded()
{
boolean process=false;
try(SpinLock.Lock lock = _lock.lock())
try(Locker.Lock lock = _locker.lock())
{
switch (_state)
{
@ -354,7 +354,7 @@ public abstract class IteratingCallback implements Callback
{
// Too late!
break;
}
}
default:
{
throw new IllegalStateException("state="+_state);
@ -374,7 +374,7 @@ public abstract class IteratingCallback implements Callback
public void failed(Throwable x)
{
boolean failure=false;
try(SpinLock.Lock lock = _lock.lock())
try(Locker.Lock lock = _locker.lock())
{
switch (_state)
{
@ -385,9 +385,9 @@ public abstract class IteratingCallback implements Callback
case CALLED:
// too late!.
break;
case PENDING:
case PROCESSING:
case PENDING:
case PROCESSING:
{
_state=State.FAILED;
failure=true;
@ -404,7 +404,7 @@ public abstract class IteratingCallback implements Callback
public void close()
{
boolean failure=false;
try(SpinLock.Lock lock = _lock.lock())
try(Locker.Lock lock = _locker.lock())
{
switch (_state)
{
@ -433,7 +433,7 @@ public abstract class IteratingCallback implements Callback
*/
boolean isIdle()
{
try(SpinLock.Lock lock = _lock.lock())
try(Locker.Lock lock = _locker.lock())
{
return _state == State.IDLE;
}
@ -441,18 +441,18 @@ public abstract class IteratingCallback implements Callback
public boolean isClosed()
{
try(SpinLock.Lock lock = _lock.lock())
try(Locker.Lock lock = _locker.lock())
{
return _state == State.CLOSED;
}
}
/**
* @return whether this callback has failed
*/
public boolean isFailed()
{
try(SpinLock.Lock lock = _lock.lock())
try(Locker.Lock lock = _locker.lock())
{
return _state == State.FAILED;
}
@ -463,7 +463,7 @@ public abstract class IteratingCallback implements Callback
*/
public boolean isSucceeded()
{
try(SpinLock.Lock lock = _lock.lock())
try(Locker.Lock lock = _locker.lock())
{
return _state == State.SUCCEEDED;
}
@ -480,7 +480,7 @@ public abstract class IteratingCallback implements Callback
*/
public boolean reset()
{
try(SpinLock.Lock lock = _lock.lock())
try(Locker.Lock lock = _locker.lock())
{
switch(_state)
{
@ -492,13 +492,13 @@ public abstract class IteratingCallback implements Callback
_iterate=false;
_state=State.IDLE;
return true;
default:
return false;
}
}
}
@Override
public String toString()
{

View File

@ -19,46 +19,79 @@
package org.eclipse.jetty.util.thread;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
/**
* <p>This is a lock designed to protect VERY short sections of
* critical code. Threads attempting to take the lock will spin
* forever until the lock is available, thus it is important that
* critical code. Threads attempting to take the lock will wait
* until the lock is available, thus it is important that
* the code protected by this lock is extremely simple and non
* blocking. The reason for this lock is that it prevents a thread
* from giving up a CPU core when contending for the lock.</p>
* blocking.</p>
* <pre>
* try(SpinLock.Lock lock = spinlock.lock())
* try(SpinLock.Lock lock = locker.lock())
* {
* // something very quick and non blocking
* }
* </pre>
*/
public class SpinLock
public class Locker
{
private final AtomicReference<Thread> _lock = new AtomicReference<>(null);
private static final boolean SPIN = Boolean.getBoolean(Locker.class.getName() + ".spin");
private final boolean _spin;
private final ReentrantLock _lock = new ReentrantLock();
private final AtomicReference<Thread> _spinLockState = new AtomicReference<>(null);
private final Lock _unlock = new Lock();
public Locker()
{
this(SPIN);
}
protected Locker(boolean spin)
{
this._spin = spin;
}
public Lock lock()
{
if (_spin)
spinLock();
else
concLock();
return _unlock;
}
private void spinLock()
{
Thread current = Thread.currentThread();
while (true)
{
// Using test-and-test-and-set for better performance.
Thread locker = _lock.get();
if (locker != null || !_lock.compareAndSet(null, current))
Thread locker = _spinLockState.get();
if (locker != null || !_spinLockState.compareAndSet(null, current))
{
if (locker == current)
throw new IllegalStateException("SpinLock is not reentrant");
throw new IllegalStateException("Locker is not reentrant");
continue;
}
return _unlock;
return;
}
}
private void concLock()
{
if (_lock.isHeldByCurrentThread())
throw new IllegalStateException("Locker is not reentrant");
_lock.lock();
}
public boolean isLocked()
{
return _lock.get() != null;
if (_spin)
return _spinLockState.get() != null;
else
return _lock.isLocked();
}
public class Lock implements AutoCloseable
@ -66,7 +99,10 @@ public class SpinLock
@Override
public void close()
{
_lock.set(null);
if (_spin)
_spinLockState.set(null);
else
_lock.unlock();
}
}
}

View File

@ -23,28 +23,28 @@ import java.util.concurrent.Executor;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.SpinLock;
import org.eclipse.jetty.util.thread.SpinLock.Lock;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Locker.Lock;
/**
* <p>A strategy where the thread calls produce will always run the resulting task
* itself. The strategy may dispatches another thread to continue production.
* </p>
* <p>The strategy is also known by the nickname 'eat what you kill', which comes from
* the hunting ethic that says a person should not kill anything he or she does not
* plan on eating. In this case, the phrase is used to mean that a thread should
* not produce a task that it does not intend to run. By making producers run the
* task that they have just produced avoids execution delays and avoids parallel slow
* down by running the task in the same core, with good chances of having a hot CPU
* cache. It also avoids the creation of a queue of produced tasks that the system
* does not yet have capacity to consume, which can save memory and exert back
* <p>The strategy is also known by the nickname 'eat what you kill', which comes from
* the hunting ethic that says a person should not kill anything he or she does not
* plan on eating. In this case, the phrase is used to mean that a thread should
* not produce a task that it does not intend to run. By making producers run the
* task that they have just produced avoids execution delays and avoids parallel slow
* down by running the task in the same core, with good chances of having a hot CPU
* cache. It also avoids the creation of a queue of produced tasks that the system
* does not yet have capacity to consume, which can save memory and exert back
* pressure on producers.
* </p>
*/
public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
{
private static final Logger LOG = Log.getLogger(ExecuteProduceConsume.class);
private final SpinLock _lock = new SpinLock();
private final Locker _locker = new Locker();
private final Runnable _runExecute = new RunExecute();
private final Producer _producer;
private final Executor _executor;
@ -65,14 +65,14 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
if (LOG.isDebugEnabled())
LOG.debug("{} execute",this);
boolean produce=false;
try (Lock locked = _lock.lock())
try (Lock locked = _locker.lock())
{
// If we are idle and a thread is not producing
if (_idle)
{
if (_producing)
throw new IllegalStateException();
// Then this thread will do the producing
produce=_producing=true;
// and we are no longer idle
@ -96,7 +96,7 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
if (LOG.isDebugEnabled())
LOG.debug("{} spawning",this);
boolean dispatch=false;
try (Lock locked = _lock.lock())
try (Lock locked = _locker.lock())
{
if (_idle)
dispatch=true;
@ -113,7 +113,7 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
if (LOG.isDebugEnabled())
LOG.debug("{} run",this);
boolean produce=false;
try (Lock locked = _lock.lock())
try (Lock locked = _locker.lock())
{
_pending=false;
if (!_idle && !_producing)
@ -125,33 +125,33 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
if (produce)
produceAndRun();
}
private void produceAndRun()
{
if (LOG.isDebugEnabled())
LOG.debug("{} produce enter",this);
while (true)
{
// If we got here, then we are the thread that is producing.
if (LOG.isDebugEnabled())
LOG.debug("{} producing",this);
Runnable task = _producer.produce();
if (LOG.isDebugEnabled())
LOG.debug("{} produced {}",this,task);
boolean dispatch=false;
try (Lock locked = _lock.lock())
try (Lock locked = _locker.lock())
{
// Finished producing
_producing=false;
// Did we produced a task?
if (task == null)
{
// There is no task.
// There is no task.
if (_execute)
{
_idle=false;
@ -164,7 +164,7 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
_idle=true;
break;
}
// We have a task, which we will run ourselves,
// so if we don't have another thread pending
if (!_pending)
@ -172,10 +172,10 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
// dispatch one
dispatch=_pending=true;
}
_execute=false;
}
// If we became pending
if (dispatch)
{
@ -191,9 +191,9 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
task.run();
if (LOG.isDebugEnabled())
LOG.debug("{} ran {}",this,task);
// Once we have run the task, we can try producing again.
try (Lock locked = _lock.lock())
try (Lock locked = _locker.lock())
{
// Is another thread already producing or we are now idle?
if (_producing || _idle)
@ -201,24 +201,24 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
_producing=true;
}
}
if (LOG.isDebugEnabled())
LOG.debug("{} produce exit",this);
}
public Boolean isIdle()
{
try (Lock locked = _lock.lock())
try (Lock locked = _locker.lock())
{
return _idle;
}
}
public String toString()
{
StringBuilder builder = new StringBuilder();
builder.append("EPR ");
try (Lock locked = _lock.lock())
try (Lock locked = _locker.lock())
{
builder.append(_idle?"Idle/":"");
builder.append(_producing?"Prod/":"");

View File

@ -18,23 +18,41 @@
package org.eclipse.jetty.util.thread;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class SpinLockTest
@RunWith(Parameterized.class)
public class LockerTest
{
@Parameterized.Parameters
public static Collection<Object[]> parameters()
{
return Arrays.asList(new Object[]{true}, new Object[]{false});
}
private boolean spin;
public LockerTest(boolean spin)
{
this.spin = spin;
}
@Test
public void testLocked()
{
SpinLock lock = new SpinLock();
Locker lock = new Locker(spin);
assertFalse(lock.isLocked());
try(SpinLock.Lock l = lock.lock())
try(Locker.Lock l = lock.lock())
{
assertTrue(lock.isLocked());
}
@ -45,14 +63,14 @@ public class SpinLockTest
assertFalse(lock.isLocked());
}
@Test
public void testLockedException()
{
SpinLock lock = new SpinLock();
Locker lock = new Locker(spin);
assertFalse(lock.isLocked());
try(SpinLock.Lock l = lock.lock())
try(Locker.Lock l = lock.lock())
{
assertTrue(lock.isLocked());
throw new Exception();
@ -72,17 +90,17 @@ public class SpinLockTest
@Test
public void testContend() throws Exception
{
final SpinLock lock = new SpinLock();
final Locker lock = new Locker(spin);
final CountDownLatch held0 = new CountDownLatch(1);
final CountDownLatch hold0 = new CountDownLatch(1);
Thread thread0 = new Thread()
{
@Override
public void run()
{
try(SpinLock.Lock l = lock.lock())
try(Locker.Lock l = lock.lock())
{
held0.countDown();
hold0.await();
@ -97,8 +115,8 @@ public class SpinLockTest
held0.await();
assertTrue(lock.isLocked());
final CountDownLatch held1 = new CountDownLatch(1);
final CountDownLatch hold1 = new CountDownLatch(1);
Thread thread1 = new Thread()
@ -106,7 +124,7 @@ public class SpinLockTest
@Override
public void run()
{
try(SpinLock.Lock l = lock.lock())
try(Locker.Lock l = lock.lock())
{
held1.countDown();
hold1.await();
@ -120,14 +138,14 @@ public class SpinLockTest
thread1.start();
// thread1 will be spinning here
assertFalse(held1.await(100, TimeUnit.MILLISECONDS));
// Let thread0 complete
hold0.countDown();
thread0.join();
// thread1 can progress
held1.await();
// let thread1 complete
hold1.countDown();
thread1.join();