diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java index 1cf0c57f16c..e3f84905df5 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java @@ -36,7 +36,7 @@ import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -import org.eclipse.jetty.util.thread.SpinLock; +import org.eclipse.jetty.util.thread.Locker; import org.eclipse.jetty.util.thread.Sweeper; public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable @@ -44,7 +44,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable protected static final Logger LOG = Log.getLogger(ConnectionPool.class); private final AtomicInteger connectionCount = new AtomicInteger(); - private final SpinLock lock = new SpinLock(); + private final Locker locker = new Locker(); private final Destination destination; private final int maxConnections; private final Callback requester; @@ -137,7 +137,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable protected void idleCreated(Connection connection) { boolean idle; - try (SpinLock.Lock lock = this.lock.lock()) + try (Locker.Lock lock = this.locker.lock()) { // Use "cold" new connections as last. idle = idleConnections.offerLast(connection); @@ -149,7 +149,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable { boolean acquired; Connection connection; - try (SpinLock.Lock lock = this.lock.lock()) + try (Locker.Lock lock = this.locker.lock()) { connection = idleConnections.pollFirst(); if (connection == null) @@ -180,7 +180,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable public boolean release(Connection connection) { boolean idle; - try (SpinLock.Lock lock = this.lock.lock()) + try (Locker.Lock lock = this.locker.lock()) { if (!activeConnections.remove(connection)) return false; @@ -216,7 +216,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable { boolean activeRemoved; boolean idleRemoved; - try (SpinLock.Lock lock = this.lock.lock()) + try (Locker.Lock lock = this.locker.lock()) { activeRemoved = activeConnections.remove(connection); idleRemoved = idleConnections.remove(connection); @@ -235,7 +235,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable public boolean isActive(Connection connection) { - try (SpinLock.Lock lock = this.lock.lock()) + try (Locker.Lock lock = this.locker.lock()) { return activeConnections.contains(connection); } @@ -243,7 +243,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable public boolean isIdle(Connection connection) { - try (SpinLock.Lock lock = this.lock.lock()) + try (Locker.Lock lock = this.locker.lock()) { return idleConnections.contains(connection); } @@ -258,7 +258,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable { List idles = new ArrayList<>(); List actives = new ArrayList<>(); - try (SpinLock.Lock lock = this.lock.lock()) + try (Locker.Lock lock = this.locker.lock()) { idles.addAll(idleConnections); idleConnections.clear(); @@ -286,7 +286,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable { List actives = new ArrayList<>(); List idles = new ArrayList<>(); - try (SpinLock.Lock lock = this.lock.lock()) + try (Locker.Lock lock = this.locker.lock()) { actives.addAll(activeConnections); idles.addAll(idleConnections); @@ -299,7 +299,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable public boolean sweep() { List toSweep = new ArrayList<>(); - try (SpinLock.Lock lock = this.lock.lock()) + try (Locker.Lock lock = this.locker.lock()) { for (Connection connection : getActiveConnections()) { @@ -330,7 +330,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable { int activeSize; int idleSize; - try (SpinLock.Lock lock = this.lock.lock()) + try (Locker.Lock lock = this.locker.lock()) { activeSize = activeConnections.size(); idleSize = idleConnections.size(); diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpChannel.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpChannel.java index cf18ba24b7a..5f3ea2dba43 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpChannel.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpChannel.java @@ -21,13 +21,13 @@ package org.eclipse.jetty.client; import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -import org.eclipse.jetty.util.thread.SpinLock; +import org.eclipse.jetty.util.thread.Locker; public abstract class HttpChannel { protected static final Logger LOG = Log.getLogger(HttpChannel.class); - private final SpinLock _lock = new SpinLock(); + private final Locker _locker = new Locker(); private final HttpDestination _destination; private HttpExchange _exchange; @@ -53,7 +53,7 @@ public abstract class HttpChannel { boolean result = false; boolean abort = true; - try (SpinLock.Lock lock = _lock.lock()) + try (Locker.Lock lock = _locker.lock()) { if (_exchange == null) { @@ -76,7 +76,7 @@ public abstract class HttpChannel public boolean disassociate(HttpExchange exchange) { boolean result = false; - try (SpinLock.Lock lock = _lock.lock()) + try (Locker.Lock lock = _locker.lock()) { HttpExchange existing = _exchange; _exchange = null; @@ -93,7 +93,7 @@ public abstract class HttpChannel public HttpExchange getHttpExchange() { - try (SpinLock.Lock lock = _lock.lock()) + try (Locker.Lock lock = _locker.lock()) { return _exchange; } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java index 708b26bce56..7ef26400b0d 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java @@ -24,7 +24,7 @@ import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -import org.eclipse.jetty.util.thread.SpinLock; +import org.eclipse.jetty.util.thread.Locker; public class HttpExchange { @@ -34,7 +34,7 @@ public class HttpExchange private final HttpRequest request; private final List listeners; private final HttpResponse response; - private final SpinLock _lock = new SpinLock(); + private final Locker _locker = new Locker(); private State requestState = State.PENDING; private State responseState = State.PENDING; private HttpChannel _channel; @@ -64,7 +64,7 @@ public class HttpExchange public Throwable getRequestFailure() { - try (SpinLock.Lock lock = _lock.lock()) + try (Locker.Lock lock = _locker.lock()) { return requestFailure; } @@ -82,7 +82,7 @@ public class HttpExchange public Throwable getResponseFailure() { - try (SpinLock.Lock lock = _lock.lock()) + try (Locker.Lock lock = _locker.lock()) { return responseFailure; } @@ -99,7 +99,7 @@ public class HttpExchange { boolean result = false; boolean abort = false; - try (SpinLock.Lock lock = _lock.lock()) + try (Locker.Lock lock = _locker.lock()) { // Only associate if the exchange state is initial, // as the exchange could be already failed. @@ -123,7 +123,7 @@ public class HttpExchange void disassociate(HttpChannel channel) { boolean abort = false; - try (SpinLock.Lock lock = _lock.lock()) + try (Locker.Lock lock = _locker.lock()) { if (_channel != channel || requestState != State.TERMINATED || responseState != State.TERMINATED) abort = true; @@ -136,7 +136,7 @@ public class HttpExchange private HttpChannel getHttpChannel() { - try (SpinLock.Lock lock = _lock.lock()) + try (Locker.Lock lock = _locker.lock()) { return _channel; } @@ -144,7 +144,7 @@ public class HttpExchange public boolean requestComplete(Throwable failure) { - try (SpinLock.Lock lock = _lock.lock()) + try (Locker.Lock lock = _locker.lock()) { return completeRequest(failure); } @@ -163,7 +163,7 @@ public class HttpExchange public boolean responseComplete(Throwable failure) { - try (SpinLock.Lock lock = _lock.lock()) + try (Locker.Lock lock = _locker.lock()) { return completeResponse(failure); } @@ -183,7 +183,7 @@ public class HttpExchange public Result terminateRequest() { Result result = null; - try (SpinLock.Lock lock = _lock.lock()) + try (Locker.Lock lock = _locker.lock()) { if (requestState == State.COMPLETED) requestState = State.TERMINATED; @@ -200,7 +200,7 @@ public class HttpExchange public Result terminateResponse() { Result result = null; - try (SpinLock.Lock lock = _lock.lock()) + try (Locker.Lock lock = _locker.lock()) { if (responseState == State.COMPLETED) responseState = State.TERMINATED; @@ -220,7 +220,7 @@ public class HttpExchange // This will avoid that this exchange can be associated to a channel. boolean abortRequest; boolean abortResponse; - try (SpinLock.Lock lock = _lock.lock()) + try (Locker.Lock lock = _locker.lock()) { abortRequest = completeRequest(failure); abortResponse = completeResponse(failure); @@ -273,7 +273,7 @@ public class HttpExchange public void resetResponse() { - try (SpinLock.Lock lock = _lock.lock()) + try (Locker.Lock lock = _locker.lock()) { responseState = State.PENDING; responseFailure = null; @@ -290,7 +290,7 @@ public class HttpExchange @Override public String toString() { - try (SpinLock.Lock lock = _lock.lock()) + try (Locker.Lock lock = _locker.lock()) { return String.format("%s@%x req=%s/%s@%h res=%s/%s@%h", HttpExchange.class.getSimpleName(), diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java index d65a1f8a6f0..cbaebf78db6 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java @@ -31,8 +31,8 @@ import org.eclipse.jetty.util.ArrayQueue; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.Locker; import org.eclipse.jetty.util.thread.Scheduler; -import org.eclipse.jetty.util.thread.SpinLock; /* ------------------------------------------------------------ */ @@ -51,10 +51,10 @@ public class ByteArrayEndPoint extends AbstractEndPoint public void run() { getFillInterest().fillable(); - } + } }; - - private final SpinLock _lock = new SpinLock(); + + private final Locker _locker = new Locker(); private final Queue _inQ = new ArrayQueue<>(); private ByteBuffer _out; private boolean _ishut; @@ -73,7 +73,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint /* ------------------------------------------------------------ */ /** - * @param input the input bytes + * @param input the input bytes * @param outputSize the output size */ public ByteArrayEndPoint(byte[] input, int outputSize) @@ -83,7 +83,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint /* ------------------------------------------------------------ */ /** - * @param input the input string (converted to bytes using default encoding charset) + * @param input the input string (converted to bytes using default encoding charset) * @param outputSize the output size */ public ByteArrayEndPoint(String input, int outputSize) @@ -131,16 +131,16 @@ public class ByteArrayEndPoint extends AbstractEndPoint { new Thread(task,"BAEPoint-"+Integer.toHexString(hashCode())).start(); } - + /* ------------------------------------------------------------ */ @Override protected void needsFillInterest() throws IOException { - try(SpinLock.Lock lock = _lock.lock()) + try(Locker.Lock lock = _locker.lock()) { if (_closed) throw new ClosedChannelException(); - + ByteBuffer in = _inQ.peek(); if (BufferUtil.hasContent(in) || in==EOF) execute(_runFillable); @@ -162,7 +162,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint public void addInput(ByteBuffer in) { boolean fillable=false; - try(SpinLock.Lock lock = _lock.lock()) + try(Locker.Lock lock = _locker.lock()) { if (_inQ.peek()==EOF) throw new RuntimeIOException(new EOFException()); @@ -185,7 +185,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint public void addInputAndExecute(ByteBuffer in) { boolean fillable=false; - try(SpinLock.Lock lock = _lock.lock()) + try(Locker.Lock lock = _locker.lock()) { if (_inQ.peek()==EOF) throw new RuntimeIOException(new EOFException()); @@ -294,7 +294,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint @Override public boolean isOpen() { - try(SpinLock.Lock lock = _lock.lock()) + try(Locker.Lock lock = _locker.lock()) { return !_closed; } @@ -306,7 +306,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint @Override public boolean isInputShutdown() { - try(SpinLock.Lock lock = _lock.lock()) + try(Locker.Lock lock = _locker.lock()) { return _ishut||_closed; } @@ -318,7 +318,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint @Override public boolean isOutputShutdown() { - try(SpinLock.Lock lock = _lock.lock()) + try(Locker.Lock lock = _locker.lock()) { return _oshut||_closed; } @@ -328,7 +328,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint public void shutdownInput() { boolean close=false; - try(SpinLock.Lock lock = _lock.lock()) + try(Locker.Lock lock = _locker.lock()) { _ishut=true; if (_oshut && !_closed) @@ -346,7 +346,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint public void shutdownOutput() { boolean close=false; - try(SpinLock.Lock lock = _lock.lock()) + try(Locker.Lock lock = _locker.lock()) { _oshut=true; if (_ishut && !_closed) @@ -364,7 +364,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint public void close() { boolean close=false; - try(SpinLock.Lock lock = _lock.lock()) + try(Locker.Lock lock = _locker.lock()) { if (!_closed) close=_closed=_ishut=_oshut=true; @@ -391,7 +391,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint { int filled=0; boolean close=false; - try(SpinLock.Lock lock = _lock.lock()) + try(Locker.Lock lock = _locker.lock()) { while(true) { @@ -400,10 +400,10 @@ public class ByteArrayEndPoint extends AbstractEndPoint if (_ishut) return -1; - + if (_inQ.isEmpty()) break; - + ByteBuffer in= _inQ.peek(); if (in==EOF) { @@ -413,7 +413,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint filled=-1; break; } - + if (BufferUtil.hasContent(in)) { filled=BufferUtil.append(buffer,in); @@ -424,7 +424,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint _inQ.poll(); } } - + if (close) super.close(); if (filled>0) diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java index 83ea90b2345..ae5522c3fe4 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java @@ -44,8 +44,8 @@ import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.ExecutionStrategy; +import org.eclipse.jetty.util.thread.Locker; import org.eclipse.jetty.util.thread.Scheduler; -import org.eclipse.jetty.util.thread.SpinLock; /** *

{@link ManagedSelector} wraps a {@link Selector} simplifying non-blocking operations on channels.

@@ -57,7 +57,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump { private static final Logger LOG = Log.getLogger(ManagedSelector.class); - private final SpinLock _lock = new SpinLock(); + private final Locker _locker = new Locker(); private boolean _selecting = false; private final Queue _actions = new ArrayDeque<>(); private final SelectorManager _selectorManager; @@ -116,7 +116,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump LOG.debug("Queued change {} on {}", change, this); Selector selector = null; - try (SpinLock.Lock lock = _lock.lock()) + try (Locker.Lock lock = _locker.lock()) { _actions.offer(change); if (_selecting) @@ -187,7 +187,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump while (true) { Runnable action; - try (SpinLock.Lock lock = _lock.lock()) + try (Locker.Lock lock = _locker.lock()) { action = _actions.poll(); if (action == null) @@ -233,7 +233,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump if (LOG.isDebugEnabled()) LOG.debug("Selector loop woken up from select, {}/{} selected", selected, selector.keys().size()); - try (SpinLock.Lock lock = _lock.lock()) + try (Locker.Lock lock = _locker.lock()) { // finished selecting _selecting = false; diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java index e91b8da71c5..dff8d4092b6 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java @@ -25,8 +25,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.Locker; import org.eclipse.jetty.util.thread.Scheduler; -import org.eclipse.jetty.util.thread.SpinLock; /** * An ChannelEndpoint that can be scheduled by {@link SelectorManager}. @@ -35,7 +35,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel { public static final Logger LOG = Log.getLogger(SelectChannelEndPoint.class); - private final SpinLock _lock = new SpinLock(); + private final Locker _locker = new Locker(); private boolean _updatePending; /** @@ -88,7 +88,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel int readyOps = _key.readyOps(); int oldInterestOps; int newInterestOps; - try (SpinLock.Lock lock = _lock.lock()) + try (Locker.Lock lock = _locker.lock()) { _updatePending = true; // Remove the readyOps, that here can only be OP_READ or OP_WRITE (or both). @@ -117,7 +117,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel { int oldInterestOps; int newInterestOps; - try (SpinLock.Lock lock = _lock.lock()) + try (Locker.Lock lock = _locker.lock()) { _updatePending = false; oldInterestOps = _currentInterestOps; @@ -154,7 +154,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel int oldInterestOps; int newInterestOps; boolean pending; - try (SpinLock.Lock lock = _lock.lock()) + try (Locker.Lock lock = _locker.lock()) { pending = _updatePending; oldInterestOps = _desiredInterestOps; diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java index 9a2dccc260b..ad4e31b703a 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java @@ -31,8 +31,8 @@ import org.eclipse.jetty.server.handler.ContextHandler; import org.eclipse.jetty.server.handler.ContextHandler.Context; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.Locker; import org.eclipse.jetty.util.thread.Scheduler; -import org.eclipse.jetty.util.thread.SpinLock; /** * Implementation of AsyncContext interface that holds the state of request-response cycle. @@ -62,17 +62,17 @@ public class HttpChannelState */ public enum Action { - REQUEST_DISPATCH, // handle a normal request dispatch + REQUEST_DISPATCH, // handle a normal request dispatch ASYNC_DISPATCH, // handle an async request dispatch ASYNC_EXPIRED, // handle an async timeout WRITE_CALLBACK, // handle an IO write callback READ_CALLBACK, // handle an IO read callback - WAIT, // Wait for further events + WAIT, // Wait for further events COMPLETE // Complete the channel } - + /** - * The state of the servlet async API. This can lead or follow the + * The state of the servlet async API. This can lead or follow the * channel dispatch state and also includes reasons such as expired, * dispatched or completed. */ @@ -86,7 +86,7 @@ public class HttpChannelState } private final boolean DEBUG=LOG.isDebugEnabled(); - private final SpinLock _lock=new SpinLock(); + private final Locker _locker=new Locker(); private final HttpChannel _channel; private List _asyncListeners; @@ -109,7 +109,7 @@ public class HttpChannelState public State getState() { - try(SpinLock.Lock lock=_lock.lock()) + try(Locker.Lock lock= _locker.lock()) { return _state; } @@ -117,7 +117,7 @@ public class HttpChannelState public void addListener(AsyncListener listener) { - try(SpinLock.Lock lock=_lock.lock()) + try(Locker.Lock lock= _locker.lock()) { if (_asyncListeners==null) _asyncListeners=new ArrayList<>(); @@ -127,7 +127,7 @@ public class HttpChannelState public void setTimeout(long ms) { - try(SpinLock.Lock lock=_lock.lock()) + try(Locker.Lock lock= _locker.lock()) { _timeoutMs=ms; } @@ -135,7 +135,7 @@ public class HttpChannelState public long getTimeout() { - try(SpinLock.Lock lock=_lock.lock()) + try(Locker.Lock lock= _locker.lock()) { return _timeoutMs; } @@ -143,7 +143,7 @@ public class HttpChannelState public AsyncContextEvent getAsyncContextEvent() { - try(SpinLock.Lock lock=_lock.lock()) + try(Locker.Lock lock= _locker.lock()) { return _event; } @@ -152,7 +152,7 @@ public class HttpChannelState @Override public String toString() { - try(SpinLock.Lock lock=_lock.lock()) + try(Locker.Lock lock= _locker.lock()) { return String.format("%s@%x{s=%s a=%s i=%b r=%s w=%b}",getClass().getSimpleName(),hashCode(),_state,_async,_initial, _asyncReadPossible?(_asyncReadUnready?"PU":"P!U"):(_asyncReadUnready?"!PU":"!P!U"), @@ -162,7 +162,7 @@ public class HttpChannelState public String getStatusString() { - try(SpinLock.Lock lock=_lock.lock()) + try(Locker.Lock lock= _locker.lock()) { return String.format("s=%s i=%b a=%s",_state,_initial,_async); } @@ -175,7 +175,7 @@ public class HttpChannelState { if(DEBUG) LOG.debug("{} handling {}",this,_state); - try(SpinLock.Lock lock=_lock.lock()) + try(Locker.Lock lock= _locker.lock()) { switch(_state) { @@ -197,7 +197,7 @@ public class HttpChannelState _asyncReadUnready=false; return Action.READ_CALLBACK; } - + // TODO refactor the same as read if (_asyncWrite) { @@ -205,7 +205,7 @@ public class HttpChannelState _asyncWrite=false; return Action.WRITE_CALLBACK; } - + if (_async!=null) { Async async=_async; @@ -228,7 +228,7 @@ public class HttpChannelState return Action.WAIT; } } - + return Action.WAIT; default: @@ -240,12 +240,12 @@ public class HttpChannelState public void startAsync(AsyncContextEvent event) { final List lastAsyncListeners; - - try(SpinLock.Lock lock=_lock.lock()) + + try(Locker.Lock lock= _locker.lock()) { if (_state!=State.DISPATCHED || _async!=null) throw new IllegalStateException(this.getStatusString()); - + _async=Async.STARTED; _event=event; lastAsyncListeners=_asyncListeners; @@ -270,7 +270,7 @@ public class HttpChannelState protected void error(Throwable th) { - try(SpinLock.Lock lock=_lock.lock()) + try(Locker.Lock lock= _locker.lock()) { if (_event!=null) _event.setThrowable(th); @@ -293,7 +293,7 @@ public class HttpChannelState if(DEBUG) LOG.debug("{} unhandle {}",this,_state); - try(SpinLock.Lock lock=_lock.lock()) + try(Locker.Lock lock= _locker.lock()) { switch(_state) { @@ -340,9 +340,9 @@ public class HttpChannelState _state=State.ASYNC_IO; action = Action.WRITE_CALLBACK; } - else + else { - schedule_event=_event; + schedule_event=_event; read_interested=_asyncReadUnready; _state=State.ASYNC_WAIT; action = Action.WAIT; @@ -378,17 +378,17 @@ public class HttpChannelState public void dispatch(ServletContext context, String path) { boolean dispatch; - try(SpinLock.Lock lock=_lock.lock()) + try(Locker.Lock lock= _locker.lock()) { if (_async!=Async.STARTED && _async!=Async.EXPIRING) throw new IllegalStateException("AsyncContext#dispath "+this.getStatusString()); _async=Async.DISPATCH; - + if (context!=null) _event.setDispatchContext(context); if (path!=null) _event.setDispatchPath(path); - + switch(_state) { case DISPATCHED: @@ -418,7 +418,7 @@ public class HttpChannelState { final List aListeners; AsyncContextEvent event; - try(SpinLock.Lock lock=_lock.lock()) + try(Locker.Lock lock= _locker.lock()) { if (_async!=Async.STARTED) return; @@ -444,9 +444,9 @@ public class HttpChannelState } } } - + boolean dispatch=false; - try(SpinLock.Lock lock=_lock.lock()) + try(Locker.Lock lock= _locker.lock()) { if (_async==Async.EXPIRING) { @@ -467,7 +467,7 @@ public class HttpChannelState { // just like resume, except don't set _dispatched=true; boolean handle=false; - try(SpinLock.Lock lock=_lock.lock()) + try(Locker.Lock lock= _locker.lock()) { if (_async!=Async.STARTED && _async!=Async.EXPIRING) throw new IllegalStateException(this.getStatusString()); @@ -492,7 +492,7 @@ public class HttpChannelState public void errorComplete() { - try(SpinLock.Lock lock=_lock.lock()) + try(Locker.Lock lock= _locker.lock()) { _async=Async.COMPLETE; _event.setDispatchContext(null); @@ -506,7 +506,7 @@ public class HttpChannelState { final List aListeners; final AsyncContextEvent event; - try(SpinLock.Lock lock=_lock.lock()) + try(Locker.Lock lock= _locker.lock()) { switch(_state) { @@ -554,7 +554,7 @@ public class HttpChannelState protected void recycle() { cancelTimeout(); - try(SpinLock.Lock lock=_lock.lock()) + try(Locker.Lock lock= _locker.lock()) { switch(_state) { @@ -576,11 +576,11 @@ public class HttpChannelState _event=null; } } - + public void upgrade() { cancelTimeout(); - try(SpinLock.Lock lock=_lock.lock()) + try(Locker.Lock lock= _locker.lock()) { switch(_state) { @@ -617,8 +617,8 @@ public class HttpChannelState protected void cancelTimeout() { final AsyncContextEvent event; - try(SpinLock.Lock lock=_lock.lock()) - { + try(Locker.Lock lock= _locker.lock()) + { event=_event; } if (event!=null) @@ -627,15 +627,15 @@ public class HttpChannelState public boolean isIdle() { - try(SpinLock.Lock lock=_lock.lock()) + try(Locker.Lock lock= _locker.lock()) { return _state==State.IDLE; } } - + public boolean isExpired() { - try(SpinLock.Lock lock=_lock.lock()) + try(Locker.Lock lock= _locker.lock()) { return _async==Async.EXPIRED; } @@ -643,7 +643,7 @@ public class HttpChannelState public boolean isInitial() { - try(SpinLock.Lock lock=_lock.lock()) + try(Locker.Lock lock= _locker.lock()) { return _initial; } @@ -651,7 +651,7 @@ public class HttpChannelState public boolean isSuspended() { - try(SpinLock.Lock lock=_lock.lock()) + try(Locker.Lock lock= _locker.lock()) { return _state==State.ASYNC_WAIT || _state==State.DISPATCHED && _async==Async.STARTED; } @@ -659,7 +659,7 @@ public class HttpChannelState boolean isCompleting() { - try(SpinLock.Lock lock=_lock.lock()) + try(Locker.Lock lock= _locker.lock()) { return _state==State.COMPLETING; } @@ -667,7 +667,7 @@ public class HttpChannelState boolean isCompleted() { - try(SpinLock.Lock lock=_lock.lock()) + try(Locker.Lock lock= _locker.lock()) { return _state == State.COMPLETED; } @@ -675,18 +675,18 @@ public class HttpChannelState public boolean isAsyncStarted() { - try(SpinLock.Lock lock=_lock.lock()) - { + try(Locker.Lock lock= _locker.lock()) + { if (_state==State.DISPATCHED) return _async!=null; return _async==Async.STARTED || _async==Async.EXPIRING; } } - + public boolean isAsync() { - try(SpinLock.Lock lock=_lock.lock()) + try(Locker.Lock lock= _locker.lock()) { return !_initial || _async!=null; } @@ -705,11 +705,11 @@ public class HttpChannelState public ContextHandler getContextHandler() { final AsyncContextEvent event; - try(SpinLock.Lock lock=_lock.lock()) - { + try(Locker.Lock lock= _locker.lock()) + { event=_event; } - + if (event!=null) { Context context=((Context)event.getServletContext()); @@ -722,8 +722,8 @@ public class HttpChannelState public ServletResponse getServletResponse() { final AsyncContextEvent event; - try(SpinLock.Lock lock=_lock.lock()) - { + try(Locker.Lock lock= _locker.lock()) + { event=_event; } if (event!=null && event.getSuppliedResponse()!=null) @@ -746,7 +746,7 @@ public class HttpChannelState _channel.getRequest().setAttribute(name,attribute); } - + /* ------------------------------------------------------------ */ /** Called to signal async read isReady() has returned false. * This indicates that there is no content available to be consumed @@ -757,7 +757,7 @@ public class HttpChannelState public void onReadUnready() { boolean interested=false; - try(SpinLock.Lock lock=_lock.lock()) + try(Locker.Lock lock= _locker.lock()) { // We were already unready, this is not a state change, so do nothing if (!_asyncReadUnready) @@ -768,22 +768,22 @@ public class HttpChannelState interested=true; } } - + if (interested) _channel.asyncReadFillInterested(); } - + /* ------------------------------------------------------------ */ /** Called to signal that content is now available to read. * If the channel is in ASYNC_WAIT state and unready (ie isReady() has - * returned false), then the state is changed to ASYNC_WOKEN and true + * returned false), then the state is changed to ASYNC_WOKEN and true * is returned. * @return True IFF the channel was unready and in ASYNC_WAIT state */ public boolean onReadPossible() { boolean woken=false; - try(SpinLock.Lock lock=_lock.lock()) + try(Locker.Lock lock= _locker.lock()) { _asyncReadPossible=true; if (_state==State.ASYNC_WAIT && _asyncReadUnready) @@ -794,7 +794,7 @@ public class HttpChannelState } return woken; } - + /* ------------------------------------------------------------ */ /** Called to signal that the channel is ready for a callback. * This is similar to calling {@link #onReadUnready()} followed by @@ -805,7 +805,7 @@ public class HttpChannelState public boolean onReadReady() { boolean woken=false; - try(SpinLock.Lock lock=_lock.lock()) + try(Locker.Lock lock= _locker.lock()) { _asyncReadUnready=true; _asyncReadPossible=true; @@ -817,20 +817,20 @@ public class HttpChannelState } return woken; } - + public boolean isReadPossible() { - try(SpinLock.Lock lock=_lock.lock()) + try(Locker.Lock lock= _locker.lock()) { return _asyncReadPossible; } } - + public boolean onWritePossible() { boolean handle=false; - try(SpinLock.Lock lock=_lock.lock()) + try(Locker.Lock lock= _locker.lock()) { _asyncWrite=true; if (_state==State.ASYNC_WAIT) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Server.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Server.java index 00c0c291608..24199553b39 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Server.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Server.java @@ -61,9 +61,9 @@ import org.eclipse.jetty.util.component.Graceful; import org.eclipse.jetty.util.component.LifeCycle; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.Locker; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.ShutdownThread; -import org.eclipse.jetty.util.thread.SpinLock; import org.eclipse.jetty.util.thread.ThreadPool; import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool; @@ -87,11 +87,11 @@ public class Server extends HandlerWrapper implements Attributes private boolean _dumpAfterStart=false; private boolean _dumpBeforeStop=false; private RequestLog _requestLog; - - private final SpinLock _dateLock = new SpinLock(); + + private final Locker _dateLocker = new Locker(); private volatile DateField _dateField; - - + + /* ------------------------------------------------------------ */ public Server() { @@ -113,11 +113,11 @@ public class Server extends HandlerWrapper implements Attributes } /* ------------------------------------------------------------ */ - /** + /** * Convenience constructor *

* Creates server and a {@link ServerConnector} at the passed address. - * @param addr the inet socket address to create the connector from + * @param addr the inet socket address to create the connector from */ public Server(@Name("address")InetSocketAddress addr) { @@ -161,8 +161,8 @@ public class Server extends HandlerWrapper implements Attributes { return _stopAtShutdown; } - - + + /* ------------------------------------------------------------ */ /** * Set a graceful stop time. @@ -309,10 +309,10 @@ public class Server extends HandlerWrapper implements Attributes long now=System.currentTimeMillis(); long seconds = now/1000; DateField df = _dateField; - + if (df==null || df._seconds!=seconds) { - try(SpinLock.Lock lock = _dateLock.lock()) + try(Locker.Lock lock = _dateLocker.lock()) { df = _dateField; if (df==null || df._seconds!=seconds) @@ -338,7 +338,7 @@ public class Server extends HandlerWrapper implements Attributes //Register the Server with the handler thread for receiving //remote stop commands ShutdownMonitor.register(this); - + //Start a thread waiting to receive "stop" commands. ShutdownMonitor.getInstance().start(); // initialize @@ -357,7 +357,7 @@ public class Server extends HandlerWrapper implements Attributes { if (connector instanceof AbstractConnector) acceptors+=((AbstractConnector)connector).getAcceptors(); - + if (connector instanceof ServerConnector) selectors+=((ServerConnector)connector).getSelectorManager().getSelectorCount(); } @@ -366,7 +366,7 @@ public class Server extends HandlerWrapper implements Attributes int needed=1+selectors+acceptors; if (max>0 && needed>max) throw new IllegalStateException(String.format("Insufficient threads: max=%d < needed(acceptors=%d + selectors=%d + request=1)",max,acceptors,selectors)); - + try { super.doStart(); @@ -380,7 +380,7 @@ public class Server extends HandlerWrapper implements Attributes for (Connector connector : _connectors) { try - { + { connector.start(); } catch(Throwable e) @@ -388,7 +388,7 @@ public class Server extends HandlerWrapper implements Attributes mex.add(e); } } - + if (isDumpAfterStart()) dumpStdErr(); @@ -422,7 +422,7 @@ public class Server extends HandlerWrapper implements Attributes futures.add(connector.shutdown()); // Then tell the contexts that we are shutting down - + Handler[] gracefuls = getChildHandlersByClass(Graceful.class); for (Handler graceful : gracefuls) futures.add(((Graceful)graceful).shutdown()); @@ -480,11 +480,11 @@ public class Server extends HandlerWrapper implements Attributes if (getStopAtShutdown()) ShutdownThread.deregister(this); - + //Unregister the Server with the handler thread for receiving //remote stop commands as we are stopped already ShutdownMonitor.deregister(this); - + mex.ifExceptionThrow(); } @@ -538,7 +538,7 @@ public class Server extends HandlerWrapper implements Attributes final Request baseRequest=connection.getRequest(); final String path=event.getPath(); - + if (path!=null) { // this is a dispatch with a path @@ -724,6 +724,6 @@ public class Server extends HandlerWrapper implements Attributes _seconds = seconds; _dateField = dateField; } - + } } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java index 970bb45d49d..e511883286c 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java @@ -20,7 +20,7 @@ package org.eclipse.jetty.util; import java.nio.channels.ClosedChannelException; -import org.eclipse.jetty.util.thread.SpinLock; +import org.eclipse.jetty.util.thread.Locker; /** * This specialized callback implements a pattern that allows @@ -72,31 +72,31 @@ public abstract class IteratingCallback implements Callback * and set iterating to true. */ PROCESSING, - + /** * Waiting for a schedule callback */ PENDING, - + /** * Called by a schedule callback */ CALLED, - + /** - * The overall job has succeeded as indicated by a {@link Action#SUCCEEDED} return + * The overall job has succeeded as indicated by a {@link Action#SUCCEEDED} return * from {@link IteratingCallback#process()} */ SUCCEEDED, - + /** * The overall job has failed as indicated by a call to {@link IteratingCallback#failed(Throwable)} */ FAILED, - + /** * This callback has been closed and cannot be reset. - */ + */ CLOSED } @@ -118,28 +118,28 @@ public abstract class IteratingCallback implements Callback * may have not yet been invoked. */ SCHEDULED, - + /** * Indicates that {@link #process()} has completed the overall job. */ SUCCEEDED } - private SpinLock _lock = new SpinLock(); + private Locker _locker = new Locker(); private State _state; private boolean _iterate; - - + + protected IteratingCallback() { _state = State.IDLE; } - + protected IteratingCallback(boolean needReset) { _state = needReset ? State.SUCCEEDED : State.IDLE; } - + /** * Method called by {@link #iterate()} to process the sub task. *

@@ -153,8 +153,8 @@ public abstract class IteratingCallback implements Callback * has been started *

  • {@link Action#SUCCEEDED} when the overall job is completed
  • * - * - * @return the appropriate Action + * + * @return the appropriate Action * * @throws Exception if the sub task processing throws */ @@ -168,7 +168,7 @@ public abstract class IteratingCallback implements Callback protected void onCompleteSuccess() { } - + /** * Invoked when the overall task has completed with a failure. * @param cause the throwable to indicate cause of failure @@ -181,9 +181,9 @@ public abstract class IteratingCallback implements Callback /** * This method must be invoked by applications to start the processing - * of sub tasks. It can be called at any time by any thread, and it's + * of sub tasks. It can be called at any time by any thread, and it's * contract is that when called, then the {@link #process()} method will - * be called during or soon after, either by the calling thread or by + * be called during or soon after, either by the calling thread or by * another thread. */ public void iterate() @@ -192,7 +192,7 @@ public abstract class IteratingCallback implements Callback loop: while (true) { - try (SpinLock.Lock lock = _lock.lock()) + try (Locker.Lock lock = _locker.lock()) { switch (_state) { @@ -206,7 +206,7 @@ public abstract class IteratingCallback implements Callback process=true; break loop; - case PROCESSING: + case PROCESSING: _iterate=true; break loop; @@ -224,13 +224,13 @@ public abstract class IteratingCallback implements Callback processing(); } - private void processing() + private void processing() { // This should only ever be called when in processing state, however a failed or close call // may happen concurrently, so state is not assumed. - + boolean on_complete_success=false; - + // While we are processing processing: while (true) { @@ -247,7 +247,7 @@ public abstract class IteratingCallback implements Callback } // acted on the action we have just received - try(SpinLock.Lock lock = _lock.lock()) + try(Locker.Lock lock = _locker.lock()) { switch (_state) { @@ -272,7 +272,7 @@ public abstract class IteratingCallback implements Callback } case SCHEDULED: - { + { // we won the race against the callback, so the callback has to process and we can break processing _state=State.PENDING; break processing; @@ -280,7 +280,7 @@ public abstract class IteratingCallback implements Callback case SUCCEEDED: { - // we lost the race against the callback, + // we lost the race against the callback, _iterate=false; _state=State.SUCCEEDED; on_complete_success=true; @@ -288,7 +288,7 @@ public abstract class IteratingCallback implements Callback } default: - throw new IllegalStateException("state="+_state+" action="+action); + throw new IllegalStateException("state="+_state+" action="+action); } } @@ -297,14 +297,14 @@ public abstract class IteratingCallback implements Callback switch (action) { case SCHEDULED: - { + { // we lost the race, so we have to keep processing _state=State.PROCESSING; continue processing; } default: - throw new IllegalStateException("state="+_state+" action="+action); + throw new IllegalStateException("state="+_state+" action="+action); } } @@ -316,7 +316,7 @@ public abstract class IteratingCallback implements Callback case IDLE: case PENDING: default: - throw new IllegalStateException("state="+_state+" action="+action); + throw new IllegalStateException("state="+_state+" action="+action); } } } @@ -334,7 +334,7 @@ public abstract class IteratingCallback implements Callback public void succeeded() { boolean process=false; - try(SpinLock.Lock lock = _lock.lock()) + try(Locker.Lock lock = _locker.lock()) { switch (_state) { @@ -354,7 +354,7 @@ public abstract class IteratingCallback implements Callback { // Too late! break; - } + } default: { throw new IllegalStateException("state="+_state); @@ -374,7 +374,7 @@ public abstract class IteratingCallback implements Callback public void failed(Throwable x) { boolean failure=false; - try(SpinLock.Lock lock = _lock.lock()) + try(Locker.Lock lock = _locker.lock()) { switch (_state) { @@ -385,9 +385,9 @@ public abstract class IteratingCallback implements Callback case CALLED: // too late!. break; - - case PENDING: - case PROCESSING: + + case PENDING: + case PROCESSING: { _state=State.FAILED; failure=true; @@ -404,7 +404,7 @@ public abstract class IteratingCallback implements Callback public void close() { boolean failure=false; - try(SpinLock.Lock lock = _lock.lock()) + try(Locker.Lock lock = _locker.lock()) { switch (_state) { @@ -433,7 +433,7 @@ public abstract class IteratingCallback implements Callback */ boolean isIdle() { - try(SpinLock.Lock lock = _lock.lock()) + try(Locker.Lock lock = _locker.lock()) { return _state == State.IDLE; } @@ -441,18 +441,18 @@ public abstract class IteratingCallback implements Callback public boolean isClosed() { - try(SpinLock.Lock lock = _lock.lock()) + try(Locker.Lock lock = _locker.lock()) { return _state == State.CLOSED; } } - + /** * @return whether this callback has failed */ public boolean isFailed() { - try(SpinLock.Lock lock = _lock.lock()) + try(Locker.Lock lock = _locker.lock()) { return _state == State.FAILED; } @@ -463,7 +463,7 @@ public abstract class IteratingCallback implements Callback */ public boolean isSucceeded() { - try(SpinLock.Lock lock = _lock.lock()) + try(Locker.Lock lock = _locker.lock()) { return _state == State.SUCCEEDED; } @@ -480,7 +480,7 @@ public abstract class IteratingCallback implements Callback */ public boolean reset() { - try(SpinLock.Lock lock = _lock.lock()) + try(Locker.Lock lock = _locker.lock()) { switch(_state) { @@ -492,13 +492,13 @@ public abstract class IteratingCallback implements Callback _iterate=false; _state=State.IDLE; return true; - + default: return false; } } } - + @Override public String toString() { diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/SpinLock.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Locker.java similarity index 52% rename from jetty-util/src/main/java/org/eclipse/jetty/util/thread/SpinLock.java rename to jetty-util/src/main/java/org/eclipse/jetty/util/thread/Locker.java index fc76529b64b..a5f423cbdf6 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/SpinLock.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Locker.java @@ -19,46 +19,79 @@ package org.eclipse.jetty.util.thread; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; /** *

    This is a lock designed to protect VERY short sections of - * critical code. Threads attempting to take the lock will spin - * forever until the lock is available, thus it is important that + * critical code. Threads attempting to take the lock will wait + * until the lock is available, thus it is important that * the code protected by this lock is extremely simple and non - * blocking. The reason for this lock is that it prevents a thread - * from giving up a CPU core when contending for the lock.

    + * blocking.

    *
    - * try(SpinLock.Lock lock = spinlock.lock())
    + * try(SpinLock.Lock lock = locker.lock())
      * {
      *   // something very quick and non blocking
      * }
      * 
    */ -public class SpinLock +public class Locker { - private final AtomicReference _lock = new AtomicReference<>(null); + private static final boolean SPIN = Boolean.getBoolean(Locker.class.getName() + ".spin"); + + private final boolean _spin; + private final ReentrantLock _lock = new ReentrantLock(); + private final AtomicReference _spinLockState = new AtomicReference<>(null); private final Lock _unlock = new Lock(); + public Locker() + { + this(SPIN); + } + + protected Locker(boolean spin) + { + this._spin = spin; + } + public Lock lock() + { + if (_spin) + spinLock(); + else + concLock(); + return _unlock; + } + + private void spinLock() { Thread current = Thread.currentThread(); while (true) { // Using test-and-test-and-set for better performance. - Thread locker = _lock.get(); - if (locker != null || !_lock.compareAndSet(null, current)) + Thread locker = _spinLockState.get(); + if (locker != null || !_spinLockState.compareAndSet(null, current)) { if (locker == current) - throw new IllegalStateException("SpinLock is not reentrant"); + throw new IllegalStateException("Locker is not reentrant"); continue; } - return _unlock; + return; } } + private void concLock() + { + if (_lock.isHeldByCurrentThread()) + throw new IllegalStateException("Locker is not reentrant"); + _lock.lock(); + } + public boolean isLocked() { - return _lock.get() != null; + if (_spin) + return _spinLockState.get() != null; + else + return _lock.isLocked(); } public class Lock implements AutoCloseable @@ -66,7 +99,10 @@ public class SpinLock @Override public void close() { - _lock.set(null); + if (_spin) + _spinLockState.set(null); + else + _lock.unlock(); } } } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java index ea2e984e248..406e3d00a15 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java @@ -23,28 +23,28 @@ import java.util.concurrent.Executor; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.ExecutionStrategy; -import org.eclipse.jetty.util.thread.SpinLock; -import org.eclipse.jetty.util.thread.SpinLock.Lock; +import org.eclipse.jetty.util.thread.Locker; +import org.eclipse.jetty.util.thread.Locker.Lock; /** *

    A strategy where the thread calls produce will always run the resulting task * itself. The strategy may dispatches another thread to continue production. *

    - *

    The strategy is also known by the nickname 'eat what you kill', which comes from - * the hunting ethic that says a person should not kill anything he or she does not - * plan on eating. In this case, the phrase is used to mean that a thread should - * not produce a task that it does not intend to run. By making producers run the - * task that they have just produced avoids execution delays and avoids parallel slow - * down by running the task in the same core, with good chances of having a hot CPU - * cache. It also avoids the creation of a queue of produced tasks that the system - * does not yet have capacity to consume, which can save memory and exert back + *

    The strategy is also known by the nickname 'eat what you kill', which comes from + * the hunting ethic that says a person should not kill anything he or she does not + * plan on eating. In this case, the phrase is used to mean that a thread should + * not produce a task that it does not intend to run. By making producers run the + * task that they have just produced avoids execution delays and avoids parallel slow + * down by running the task in the same core, with good chances of having a hot CPU + * cache. It also avoids the creation of a queue of produced tasks that the system + * does not yet have capacity to consume, which can save memory and exert back * pressure on producers. *

    */ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable { private static final Logger LOG = Log.getLogger(ExecuteProduceConsume.class); - private final SpinLock _lock = new SpinLock(); + private final Locker _locker = new Locker(); private final Runnable _runExecute = new RunExecute(); private final Producer _producer; private final Executor _executor; @@ -65,14 +65,14 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable if (LOG.isDebugEnabled()) LOG.debug("{} execute",this); boolean produce=false; - try (Lock locked = _lock.lock()) + try (Lock locked = _locker.lock()) { // If we are idle and a thread is not producing if (_idle) { if (_producing) throw new IllegalStateException(); - + // Then this thread will do the producing produce=_producing=true; // and we are no longer idle @@ -96,7 +96,7 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable if (LOG.isDebugEnabled()) LOG.debug("{} spawning",this); boolean dispatch=false; - try (Lock locked = _lock.lock()) + try (Lock locked = _locker.lock()) { if (_idle) dispatch=true; @@ -113,7 +113,7 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable if (LOG.isDebugEnabled()) LOG.debug("{} run",this); boolean produce=false; - try (Lock locked = _lock.lock()) + try (Lock locked = _locker.lock()) { _pending=false; if (!_idle && !_producing) @@ -125,33 +125,33 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable if (produce) produceAndRun(); } - + private void produceAndRun() { if (LOG.isDebugEnabled()) LOG.debug("{} produce enter",this); - + while (true) { // If we got here, then we are the thread that is producing. if (LOG.isDebugEnabled()) LOG.debug("{} producing",this); - + Runnable task = _producer.produce(); - + if (LOG.isDebugEnabled()) LOG.debug("{} produced {}",this,task); - + boolean dispatch=false; - try (Lock locked = _lock.lock()) + try (Lock locked = _locker.lock()) { // Finished producing _producing=false; - + // Did we produced a task? if (task == null) { - // There is no task. + // There is no task. if (_execute) { _idle=false; @@ -164,7 +164,7 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable _idle=true; break; } - + // We have a task, which we will run ourselves, // so if we don't have another thread pending if (!_pending) @@ -172,10 +172,10 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable // dispatch one dispatch=_pending=true; } - + _execute=false; } - + // If we became pending if (dispatch) { @@ -191,9 +191,9 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable task.run(); if (LOG.isDebugEnabled()) LOG.debug("{} ran {}",this,task); - + // Once we have run the task, we can try producing again. - try (Lock locked = _lock.lock()) + try (Lock locked = _locker.lock()) { // Is another thread already producing or we are now idle? if (_producing || _idle) @@ -201,24 +201,24 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable _producing=true; } } - + if (LOG.isDebugEnabled()) LOG.debug("{} produce exit",this); } public Boolean isIdle() { - try (Lock locked = _lock.lock()) + try (Lock locked = _locker.lock()) { return _idle; } } - + public String toString() { StringBuilder builder = new StringBuilder(); builder.append("EPR "); - try (Lock locked = _lock.lock()) + try (Lock locked = _locker.lock()) { builder.append(_idle?"Idle/":""); builder.append(_producing?"Prod/":""); diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/SpinLockTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/LockerTest.java similarity index 80% rename from jetty-util/src/test/java/org/eclipse/jetty/util/thread/SpinLockTest.java rename to jetty-util/src/test/java/org/eclipse/jetty/util/thread/LockerTest.java index 49021091675..b9a5c3d1b0b 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/SpinLockTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/LockerTest.java @@ -18,23 +18,41 @@ package org.eclipse.jetty.util.thread; +import java.util.Arrays; +import java.util.Collection; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -public class SpinLockTest +@RunWith(Parameterized.class) +public class LockerTest { + @Parameterized.Parameters + public static Collection parameters() + { + return Arrays.asList(new Object[]{true}, new Object[]{false}); + } + + private boolean spin; + + public LockerTest(boolean spin) + { + this.spin = spin; + } + @Test public void testLocked() { - SpinLock lock = new SpinLock(); + Locker lock = new Locker(spin); assertFalse(lock.isLocked()); - - try(SpinLock.Lock l = lock.lock()) + + try(Locker.Lock l = lock.lock()) { assertTrue(lock.isLocked()); } @@ -45,14 +63,14 @@ public class SpinLockTest assertFalse(lock.isLocked()); } - + @Test public void testLockedException() { - SpinLock lock = new SpinLock(); + Locker lock = new Locker(spin); assertFalse(lock.isLocked()); - - try(SpinLock.Lock l = lock.lock()) + + try(Locker.Lock l = lock.lock()) { assertTrue(lock.isLocked()); throw new Exception(); @@ -72,17 +90,17 @@ public class SpinLockTest @Test public void testContend() throws Exception { - final SpinLock lock = new SpinLock(); - + final Locker lock = new Locker(spin); + final CountDownLatch held0 = new CountDownLatch(1); final CountDownLatch hold0 = new CountDownLatch(1); - + Thread thread0 = new Thread() { @Override public void run() { - try(SpinLock.Lock l = lock.lock()) + try(Locker.Lock l = lock.lock()) { held0.countDown(); hold0.await(); @@ -97,8 +115,8 @@ public class SpinLockTest held0.await(); assertTrue(lock.isLocked()); - - + + final CountDownLatch held1 = new CountDownLatch(1); final CountDownLatch hold1 = new CountDownLatch(1); Thread thread1 = new Thread() @@ -106,7 +124,7 @@ public class SpinLockTest @Override public void run() { - try(SpinLock.Lock l = lock.lock()) + try(Locker.Lock l = lock.lock()) { held1.countDown(); hold1.await(); @@ -120,14 +138,14 @@ public class SpinLockTest thread1.start(); // thread1 will be spinning here assertFalse(held1.await(100, TimeUnit.MILLISECONDS)); - + // Let thread0 complete hold0.countDown(); thread0.join(); - + // thread1 can progress held1.await(); - + // let thread1 complete hold1.countDown(); thread1.join();