469936 - Remove usages of SpinLock.
Causes high CPU usage when contended, and the JVM can do better with its own spin lock and biased locking.
This commit is contained in:
parent
9306477f5b
commit
2c26e82fea
|
@ -26,6 +26,7 @@ import java.util.concurrent.BlockingDeque;
|
|||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.eclipse.jetty.client.api.Connection;
|
||||
import org.eclipse.jetty.client.api.Destination;
|
||||
|
@ -35,7 +36,6 @@ import org.eclipse.jetty.util.component.ContainerLifeCycle;
|
|||
import org.eclipse.jetty.util.component.Dumpable;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.util.thread.SpinLock;
|
||||
import org.eclipse.jetty.util.thread.Sweeper;
|
||||
|
||||
public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
|
||||
|
@ -43,7 +43,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
|
|||
protected static final Logger LOG = Log.getLogger(ConnectionPool.class);
|
||||
|
||||
private final AtomicInteger connectionCount = new AtomicInteger();
|
||||
private final SpinLock lock = new SpinLock();
|
||||
private final ReentrantLock lock = new ReentrantLock();
|
||||
private final Destination destination;
|
||||
private final int maxConnections;
|
||||
private final Promise<Connection> requester;
|
||||
|
@ -136,11 +136,18 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
|
|||
protected void idleCreated(Connection connection)
|
||||
{
|
||||
boolean idle;
|
||||
try (SpinLock.Lock lock = this.lock.lock())
|
||||
final ReentrantLock lock = this.lock;
|
||||
lock.lock();
|
||||
try
|
||||
{
|
||||
// Use "cold" new connections as last.
|
||||
idle = idleConnections.offerLast(connection);
|
||||
}
|
||||
finally
|
||||
{
|
||||
lock.unlock();
|
||||
}
|
||||
|
||||
idle(connection, idle);
|
||||
}
|
||||
|
||||
|
@ -148,13 +155,19 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
|
|||
{
|
||||
boolean acquired;
|
||||
Connection connection;
|
||||
try (SpinLock.Lock lock = this.lock.lock())
|
||||
final ReentrantLock lock = this.lock;
|
||||
lock.lock();
|
||||
try
|
||||
{
|
||||
connection = idleConnections.pollFirst();
|
||||
if (connection == null)
|
||||
return null;
|
||||
acquired = activeConnections.offer(connection);
|
||||
}
|
||||
finally
|
||||
{
|
||||
lock.unlock();
|
||||
}
|
||||
|
||||
if (acquired)
|
||||
{
|
||||
|
@ -179,13 +192,20 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
|
|||
public boolean release(Connection connection)
|
||||
{
|
||||
boolean idle;
|
||||
try (SpinLock.Lock lock = this.lock.lock())
|
||||
final ReentrantLock lock = this.lock;
|
||||
lock.lock();
|
||||
try
|
||||
{
|
||||
if (!activeConnections.remove(connection))
|
||||
return false;
|
||||
// Make sure we use "hot" connections first.
|
||||
idle = idleConnections.offerFirst(connection);
|
||||
}
|
||||
finally
|
||||
{
|
||||
lock.unlock();
|
||||
}
|
||||
|
||||
released(connection);
|
||||
return idle(connection, idle);
|
||||
}
|
||||
|
@ -215,11 +235,18 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
|
|||
{
|
||||
boolean activeRemoved;
|
||||
boolean idleRemoved;
|
||||
try (SpinLock.Lock lock = this.lock.lock())
|
||||
final ReentrantLock lock = this.lock;
|
||||
lock.lock();
|
||||
try
|
||||
{
|
||||
activeRemoved = activeConnections.remove(connection);
|
||||
idleRemoved = idleConnections.remove(connection);
|
||||
}
|
||||
finally
|
||||
{
|
||||
lock.unlock();
|
||||
}
|
||||
|
||||
if (activeRemoved)
|
||||
released(connection);
|
||||
boolean removed = activeRemoved || idleRemoved;
|
||||
|
@ -234,18 +261,30 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
|
|||
|
||||
public boolean isActive(Connection connection)
|
||||
{
|
||||
try (SpinLock.Lock lock = this.lock.lock())
|
||||
final ReentrantLock lock = this.lock;
|
||||
lock.lock();
|
||||
try
|
||||
{
|
||||
return activeConnections.contains(connection);
|
||||
}
|
||||
finally
|
||||
{
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isIdle(Connection connection)
|
||||
{
|
||||
try (SpinLock.Lock lock = this.lock.lock())
|
||||
final ReentrantLock lock = this.lock;
|
||||
lock.lock();
|
||||
try
|
||||
{
|
||||
return idleConnections.contains(connection);
|
||||
}
|
||||
finally
|
||||
{
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isEmpty()
|
||||
|
@ -257,13 +296,20 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
|
|||
{
|
||||
List<Connection> idles = new ArrayList<>();
|
||||
List<Connection> actives = new ArrayList<>();
|
||||
try (SpinLock.Lock lock = this.lock.lock())
|
||||
final ReentrantLock lock = this.lock;
|
||||
lock.lock();
|
||||
try
|
||||
{
|
||||
idles.addAll(idleConnections);
|
||||
idleConnections.clear();
|
||||
actives.addAll(activeConnections);
|
||||
activeConnections.clear();
|
||||
}
|
||||
finally
|
||||
{
|
||||
lock.unlock();
|
||||
}
|
||||
|
||||
connectionCount.set(0);
|
||||
|
||||
for (Connection connection : idles)
|
||||
|
@ -285,11 +331,18 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
|
|||
{
|
||||
List<Connection> actives = new ArrayList<>();
|
||||
List<Connection> idles = new ArrayList<>();
|
||||
try (SpinLock.Lock lock = this.lock.lock())
|
||||
final ReentrantLock lock = this.lock;
|
||||
lock.lock();
|
||||
try
|
||||
{
|
||||
actives.addAll(activeConnections);
|
||||
idles.addAll(idleConnections);
|
||||
}
|
||||
finally
|
||||
{
|
||||
lock.unlock();
|
||||
}
|
||||
|
||||
ContainerLifeCycle.dumpObject(out, this);
|
||||
ContainerLifeCycle.dump(out, indent, actives, idles);
|
||||
}
|
||||
|
@ -298,7 +351,9 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
|
|||
public boolean sweep()
|
||||
{
|
||||
List<Sweeper.Sweepable> toSweep = new ArrayList<>();
|
||||
try (SpinLock.Lock lock = this.lock.lock())
|
||||
final ReentrantLock lock = this.lock;
|
||||
lock.lock();
|
||||
try
|
||||
{
|
||||
for (Connection connection : getActiveConnections())
|
||||
{
|
||||
|
@ -306,6 +361,10 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
|
|||
toSweep.add(((Sweeper.Sweepable)connection));
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
lock.unlock();
|
||||
}
|
||||
|
||||
for (Sweeper.Sweepable candidate : toSweep)
|
||||
{
|
||||
|
@ -329,11 +388,18 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
|
|||
{
|
||||
int activeSize;
|
||||
int idleSize;
|
||||
try (SpinLock.Lock lock = this.lock.lock())
|
||||
final ReentrantLock lock = this.lock;
|
||||
lock.lock();
|
||||
try
|
||||
{
|
||||
activeSize = activeConnections.size();
|
||||
idleSize = idleConnections.size();
|
||||
}
|
||||
finally
|
||||
{
|
||||
lock.unlock();
|
||||
}
|
||||
|
||||
return String.format("%s[c=%d/%d,a=%d,i=%d]",
|
||||
getClass().getSimpleName(),
|
||||
connectionCount.get(),
|
||||
|
|
|
@ -21,13 +21,11 @@ package org.eclipse.jetty.client;
|
|||
import org.eclipse.jetty.client.api.Result;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.util.thread.SpinLock;
|
||||
|
||||
public abstract class HttpChannel
|
||||
{
|
||||
protected static final Logger LOG = Log.getLogger(HttpChannel.class);
|
||||
|
||||
private final SpinLock _lock = new SpinLock();
|
||||
private final HttpDestination _destination;
|
||||
private HttpExchange _exchange;
|
||||
|
||||
|
@ -53,7 +51,7 @@ public abstract class HttpChannel
|
|||
{
|
||||
boolean result = false;
|
||||
boolean abort = true;
|
||||
try (SpinLock.Lock lock = _lock.lock())
|
||||
synchronized (this)
|
||||
{
|
||||
if (_exchange == null)
|
||||
{
|
||||
|
@ -76,7 +74,7 @@ public abstract class HttpChannel
|
|||
public boolean disassociate(HttpExchange exchange)
|
||||
{
|
||||
boolean result = false;
|
||||
try (SpinLock.Lock lock = _lock.lock())
|
||||
synchronized (this)
|
||||
{
|
||||
HttpExchange existing = _exchange;
|
||||
_exchange = null;
|
||||
|
@ -86,6 +84,7 @@ public abstract class HttpChannel
|
|||
result = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} disassociated {} from {}", exchange, result, this);
|
||||
return result;
|
||||
|
@ -93,7 +92,7 @@ public abstract class HttpChannel
|
|||
|
||||
public HttpExchange getHttpExchange()
|
||||
{
|
||||
try (SpinLock.Lock lock = _lock.lock())
|
||||
synchronized (this)
|
||||
{
|
||||
return _exchange;
|
||||
}
|
||||
|
|
|
@ -24,7 +24,6 @@ import org.eclipse.jetty.client.api.Response;
|
|||
import org.eclipse.jetty.client.api.Result;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.util.thread.SpinLock;
|
||||
|
||||
public class HttpExchange
|
||||
{
|
||||
|
@ -34,7 +33,6 @@ public class HttpExchange
|
|||
private final HttpRequest request;
|
||||
private final List<Response.ResponseListener> listeners;
|
||||
private final HttpResponse response;
|
||||
private final SpinLock _lock = new SpinLock();
|
||||
private State requestState = State.PENDING;
|
||||
private State responseState = State.PENDING;
|
||||
private HttpChannel _channel;
|
||||
|
@ -64,7 +62,7 @@ public class HttpExchange
|
|||
|
||||
public Throwable getRequestFailure()
|
||||
{
|
||||
try (SpinLock.Lock lock = _lock.lock())
|
||||
synchronized (this)
|
||||
{
|
||||
return requestFailure;
|
||||
}
|
||||
|
@ -82,7 +80,7 @@ public class HttpExchange
|
|||
|
||||
public Throwable getResponseFailure()
|
||||
{
|
||||
try (SpinLock.Lock lock = _lock.lock())
|
||||
synchronized (this)
|
||||
{
|
||||
return responseFailure;
|
||||
}
|
||||
|
@ -99,7 +97,7 @@ public class HttpExchange
|
|||
{
|
||||
boolean result = false;
|
||||
boolean abort = false;
|
||||
try (SpinLock.Lock lock = _lock.lock())
|
||||
synchronized (this)
|
||||
{
|
||||
// Only associate if the exchange state is initial,
|
||||
// as the exchange could be already failed.
|
||||
|
@ -123,7 +121,7 @@ public class HttpExchange
|
|||
void disassociate(HttpChannel channel)
|
||||
{
|
||||
boolean abort = false;
|
||||
try (SpinLock.Lock lock = _lock.lock())
|
||||
synchronized (this)
|
||||
{
|
||||
if (_channel != channel || requestState != State.TERMINATED || responseState != State.TERMINATED)
|
||||
abort = true;
|
||||
|
@ -136,7 +134,7 @@ public class HttpExchange
|
|||
|
||||
private HttpChannel getHttpChannel()
|
||||
{
|
||||
try (SpinLock.Lock lock = _lock.lock())
|
||||
synchronized (this)
|
||||
{
|
||||
return _channel;
|
||||
}
|
||||
|
@ -144,7 +142,7 @@ public class HttpExchange
|
|||
|
||||
public boolean requestComplete(Throwable failure)
|
||||
{
|
||||
try (SpinLock.Lock lock = _lock.lock())
|
||||
synchronized (this)
|
||||
{
|
||||
return completeRequest(failure);
|
||||
}
|
||||
|
@ -163,7 +161,7 @@ public class HttpExchange
|
|||
|
||||
public boolean responseComplete(Throwable failure)
|
||||
{
|
||||
try (SpinLock.Lock lock = _lock.lock())
|
||||
synchronized (this)
|
||||
{
|
||||
return completeResponse(failure);
|
||||
}
|
||||
|
@ -183,7 +181,7 @@ public class HttpExchange
|
|||
public Result terminateRequest()
|
||||
{
|
||||
Result result = null;
|
||||
try (SpinLock.Lock lock = _lock.lock())
|
||||
synchronized (this)
|
||||
{
|
||||
if (requestState == State.COMPLETED)
|
||||
requestState = State.TERMINATED;
|
||||
|
@ -200,7 +198,7 @@ public class HttpExchange
|
|||
public Result terminateResponse()
|
||||
{
|
||||
Result result = null;
|
||||
try (SpinLock.Lock lock = _lock.lock())
|
||||
synchronized (this)
|
||||
{
|
||||
if (responseState == State.COMPLETED)
|
||||
responseState = State.TERMINATED;
|
||||
|
@ -220,7 +218,7 @@ public class HttpExchange
|
|||
// This will avoid that this exchange can be associated to a channel.
|
||||
boolean abortRequest;
|
||||
boolean abortResponse;
|
||||
try (SpinLock.Lock lock = _lock.lock())
|
||||
synchronized (this)
|
||||
{
|
||||
abortRequest = completeRequest(failure);
|
||||
abortResponse = completeResponse(failure);
|
||||
|
@ -273,7 +271,7 @@ public class HttpExchange
|
|||
|
||||
public void resetResponse()
|
||||
{
|
||||
try (SpinLock.Lock lock = _lock.lock())
|
||||
synchronized (this)
|
||||
{
|
||||
responseState = State.PENDING;
|
||||
responseFailure = null;
|
||||
|
@ -290,7 +288,7 @@ public class HttpExchange
|
|||
@Override
|
||||
public String toString()
|
||||
{
|
||||
try (SpinLock.Lock lock = _lock.lock())
|
||||
synchronized (this)
|
||||
{
|
||||
return String.format("%s@%x req=%s/%s@%h res=%s/%s@%h",
|
||||
HttpExchange.class.getSimpleName(),
|
||||
|
|
|
@ -1,78 +0,0 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.util.thread;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/** Spin Lock
|
||||
* <p>This is a lock designed to protect VERY short sections of
|
||||
* critical code. Threads attempting to take the lock will spin
|
||||
* forever until the lock is available, thus it is important that
|
||||
* the code protected by this lock is extremely simple and non
|
||||
* blocking. The reason for this lock is that it prevents a thread
|
||||
* from giving up a CPU core when contending for the lock.</p>
|
||||
* <pre>
|
||||
* try(SpinLock.Lock lock = spinlock.lock())
|
||||
* {
|
||||
* // something very quick and non blocking
|
||||
* }
|
||||
* </pre>
|
||||
*/
|
||||
public class SpinLock
|
||||
{
|
||||
private final AtomicReference<Thread> _lock = new AtomicReference<>(null);
|
||||
private final Lock _unlock = new Lock();
|
||||
|
||||
public Lock lock()
|
||||
{
|
||||
Thread thread = Thread.currentThread();
|
||||
while(true)
|
||||
{
|
||||
if (!_lock.compareAndSet(null,thread))
|
||||
{
|
||||
if (_lock.get()==thread)
|
||||
throw new IllegalStateException("SpinLock is not reentrant");
|
||||
continue;
|
||||
}
|
||||
return _unlock;
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isLocked()
|
||||
{
|
||||
return _lock.get()!=null;
|
||||
}
|
||||
|
||||
public boolean isLockedThread()
|
||||
{
|
||||
return _lock.get()==Thread.currentThread();
|
||||
}
|
||||
|
||||
public class Lock implements AutoCloseable
|
||||
{
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
_lock.set(null);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,142 +0,0 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.util.thread;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
public class SpinLockTest
|
||||
{
|
||||
|
||||
@Test
|
||||
public void testLocked()
|
||||
{
|
||||
SpinLock lock = new SpinLock();
|
||||
assertFalse(lock.isLocked());
|
||||
|
||||
try(SpinLock.Lock l = lock.lock())
|
||||
{
|
||||
assertTrue(lock.isLocked());
|
||||
}
|
||||
finally
|
||||
{
|
||||
assertFalse(lock.isLocked());
|
||||
}
|
||||
|
||||
assertFalse(lock.isLocked());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLockedException()
|
||||
{
|
||||
SpinLock lock = new SpinLock();
|
||||
assertFalse(lock.isLocked());
|
||||
|
||||
try(SpinLock.Lock l = lock.lock())
|
||||
{
|
||||
assertTrue(lock.isLocked());
|
||||
throw new Exception();
|
||||
}
|
||||
catch(Exception e)
|
||||
{
|
||||
assertFalse(lock.isLocked());
|
||||
}
|
||||
finally
|
||||
{
|
||||
assertFalse(lock.isLocked());
|
||||
}
|
||||
|
||||
assertFalse(lock.isLocked());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testContend() throws Exception
|
||||
{
|
||||
final SpinLock lock = new SpinLock();
|
||||
|
||||
final CountDownLatch held0 = new CountDownLatch(1);
|
||||
final CountDownLatch hold0 = new CountDownLatch(1);
|
||||
|
||||
Thread thread0 = new Thread()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
try(SpinLock.Lock l = lock.lock())
|
||||
{
|
||||
held0.countDown();
|
||||
hold0.await();
|
||||
}
|
||||
catch (InterruptedException e)
|
||||
{
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
};
|
||||
thread0.start();
|
||||
held0.await();
|
||||
|
||||
assertTrue(lock.isLocked());
|
||||
|
||||
|
||||
final CountDownLatch held1 = new CountDownLatch(1);
|
||||
final CountDownLatch hold1 = new CountDownLatch(1);
|
||||
Thread thread1 = new Thread()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
try(SpinLock.Lock l = lock.lock())
|
||||
{
|
||||
held1.countDown();
|
||||
hold1.await();
|
||||
}
|
||||
catch (InterruptedException e)
|
||||
{
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
};
|
||||
thread1.start();
|
||||
// thread1 will be spinning here
|
||||
assertFalse(held1.await(100,TimeUnit.MILLISECONDS));
|
||||
|
||||
// Let thread0 complete
|
||||
hold0.countDown();
|
||||
thread0.join();
|
||||
|
||||
// thread1 can progress
|
||||
held1.await();
|
||||
|
||||
// let thread1 complete
|
||||
hold1.countDown();
|
||||
thread1.join();
|
||||
|
||||
assertFalse(lock.isLocked());
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
Loading…
Reference in New Issue