Merged branch 'jetty-9.2.x' into 'master'.

This commit is contained in:
Simone Bordet 2015-06-11 15:44:06 +02:00
commit aaf0d8889d
3 changed files with 94 additions and 31 deletions

View File

@ -26,6 +26,7 @@ import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
@ -36,7 +37,6 @@ import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Sweeper;
public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
@ -44,7 +44,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
protected static final Logger LOG = Log.getLogger(ConnectionPool.class);
private final AtomicInteger connectionCount = new AtomicInteger();
private final Locker locker = new Locker();
private final ReentrantLock lock = new ReentrantLock();
private final Destination destination;
private final int maxConnections;
private final Callback requester;
@ -137,11 +137,18 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
protected void idleCreated(Connection connection)
{
boolean idle;
try (Locker.Lock lock = this.locker.lock())
final ReentrantLock lock = this.lock;
lock.lock();
try
{
// Use "cold" new connections as last.
idle = idleConnections.offerLast(connection);
}
finally
{
lock.unlock();
}
idle(connection, idle);
}
@ -149,13 +156,19 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
{
boolean acquired;
Connection connection;
try (Locker.Lock lock = this.locker.lock())
final ReentrantLock lock = this.lock;
lock.lock();
try
{
connection = idleConnections.pollFirst();
if (connection == null)
return null;
acquired = activeConnections.offer(connection);
}
finally
{
lock.unlock();
}
if (acquired)
{
@ -180,13 +193,20 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
public boolean release(Connection connection)
{
boolean idle;
try (Locker.Lock lock = this.locker.lock())
final ReentrantLock lock = this.lock;
lock.lock();
try
{
if (!activeConnections.remove(connection))
return false;
// Make sure we use "hot" connections first.
idle = idleConnections.offerFirst(connection);
}
finally
{
lock.unlock();
}
released(connection);
return idle(connection, idle);
}
@ -216,11 +236,18 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
{
boolean activeRemoved;
boolean idleRemoved;
try (Locker.Lock lock = this.locker.lock())
final ReentrantLock lock = this.lock;
lock.lock();
try
{
activeRemoved = activeConnections.remove(connection);
idleRemoved = idleConnections.remove(connection);
}
finally
{
lock.unlock();
}
if (activeRemoved)
released(connection);
boolean removed = activeRemoved || idleRemoved;
@ -235,18 +262,30 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
public boolean isActive(Connection connection)
{
try (Locker.Lock lock = this.locker.lock())
final ReentrantLock lock = this.lock;
lock.lock();
try
{
return activeConnections.contains(connection);
}
finally
{
lock.unlock();
}
}
public boolean isIdle(Connection connection)
{
try (Locker.Lock lock = this.locker.lock())
final ReentrantLock lock = this.lock;
lock.lock();
try
{
return idleConnections.contains(connection);
}
finally
{
lock.unlock();
}
}
public boolean isEmpty()
@ -258,13 +297,20 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
{
List<Connection> idles = new ArrayList<>();
List<Connection> actives = new ArrayList<>();
try (Locker.Lock lock = this.locker.lock())
final ReentrantLock lock = this.lock;
lock.lock();
try
{
idles.addAll(idleConnections);
idleConnections.clear();
actives.addAll(activeConnections);
activeConnections.clear();
}
finally
{
lock.unlock();
}
connectionCount.set(0);
for (Connection connection : idles)
@ -286,11 +332,18 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
{
List<Connection> actives = new ArrayList<>();
List<Connection> idles = new ArrayList<>();
try (Locker.Lock lock = this.locker.lock())
final ReentrantLock lock = this.lock;
lock.lock();
try
{
actives.addAll(activeConnections);
idles.addAll(idleConnections);
}
finally
{
lock.unlock();
}
ContainerLifeCycle.dumpObject(out, this);
ContainerLifeCycle.dump(out, indent, actives, idles);
}
@ -299,7 +352,9 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
public boolean sweep()
{
List<Sweeper.Sweepable> toSweep = new ArrayList<>();
try (Locker.Lock lock = this.locker.lock())
final ReentrantLock lock = this.lock;
lock.lock();
try
{
for (Connection connection : getActiveConnections())
{
@ -307,6 +362,10 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
toSweep.add(((Sweeper.Sweepable)connection));
}
}
finally
{
lock.unlock();
}
for (Sweeper.Sweepable candidate : toSweep)
{
@ -330,11 +389,18 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
{
int activeSize;
int idleSize;
try (Locker.Lock lock = this.locker.lock())
final ReentrantLock lock = this.lock;
lock.lock();
try
{
activeSize = activeConnections.size();
idleSize = idleConnections.size();
}
finally
{
lock.unlock();
}
return String.format("%s[c=%d/%d,a=%d,i=%d]",
getClass().getSimpleName(),
connectionCount.get(),

View File

@ -21,13 +21,11 @@ package org.eclipse.jetty.client;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Locker;
public abstract class HttpChannel
{
protected static final Logger LOG = Log.getLogger(HttpChannel.class);
private final Locker _locker = new Locker();
private final HttpDestination _destination;
private HttpExchange _exchange;
@ -53,7 +51,7 @@ public abstract class HttpChannel
{
boolean result = false;
boolean abort = true;
try (Locker.Lock lock = _locker.lock())
synchronized (this)
{
if (_exchange == null)
{
@ -76,7 +74,7 @@ public abstract class HttpChannel
public boolean disassociate(HttpExchange exchange)
{
boolean result = false;
try (Locker.Lock lock = _locker.lock())
synchronized (this)
{
HttpExchange existing = _exchange;
_exchange = null;
@ -86,6 +84,7 @@ public abstract class HttpChannel
result = true;
}
}
if (LOG.isDebugEnabled())
LOG.debug("{} disassociated {} from {}", exchange, result, this);
return result;
@ -93,7 +92,7 @@ public abstract class HttpChannel
public HttpExchange getHttpExchange()
{
try (Locker.Lock lock = _locker.lock())
synchronized (this)
{
return _exchange;
}

View File

@ -24,7 +24,6 @@ import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Locker;
public class HttpExchange
{
@ -34,7 +33,6 @@ public class HttpExchange
private final HttpRequest request;
private final List<Response.ResponseListener> listeners;
private final HttpResponse response;
private final Locker _locker = new Locker();
private State requestState = State.PENDING;
private State responseState = State.PENDING;
private HttpChannel _channel;
@ -64,7 +62,7 @@ public class HttpExchange
public Throwable getRequestFailure()
{
try (Locker.Lock lock = _locker.lock())
synchronized (this)
{
return requestFailure;
}
@ -82,7 +80,7 @@ public class HttpExchange
public Throwable getResponseFailure()
{
try (Locker.Lock lock = _locker.lock())
synchronized (this)
{
return responseFailure;
}
@ -99,7 +97,7 @@ public class HttpExchange
{
boolean result = false;
boolean abort = false;
try (Locker.Lock lock = _locker.lock())
synchronized (this)
{
// Only associate if the exchange state is initial,
// as the exchange could be already failed.
@ -123,7 +121,7 @@ public class HttpExchange
void disassociate(HttpChannel channel)
{
boolean abort = false;
try (Locker.Lock lock = _locker.lock())
synchronized (this)
{
if (_channel != channel || requestState != State.TERMINATED || responseState != State.TERMINATED)
abort = true;
@ -136,7 +134,7 @@ public class HttpExchange
private HttpChannel getHttpChannel()
{
try (Locker.Lock lock = _locker.lock())
synchronized (this)
{
return _channel;
}
@ -144,7 +142,7 @@ public class HttpExchange
public boolean requestComplete(Throwable failure)
{
try (Locker.Lock lock = _locker.lock())
synchronized (this)
{
return completeRequest(failure);
}
@ -163,7 +161,7 @@ public class HttpExchange
public boolean responseComplete(Throwable failure)
{
try (Locker.Lock lock = _locker.lock())
synchronized (this)
{
return completeResponse(failure);
}
@ -183,7 +181,7 @@ public class HttpExchange
public Result terminateRequest()
{
Result result = null;
try (Locker.Lock lock = _locker.lock())
synchronized (this)
{
if (requestState == State.COMPLETED)
requestState = State.TERMINATED;
@ -200,7 +198,7 @@ public class HttpExchange
public Result terminateResponse()
{
Result result = null;
try (Locker.Lock lock = _locker.lock())
synchronized (this)
{
if (responseState == State.COMPLETED)
responseState = State.TERMINATED;
@ -220,7 +218,7 @@ public class HttpExchange
// This will avoid that this exchange can be associated to a channel.
boolean abortRequest;
boolean abortResponse;
try (Locker.Lock lock = _locker.lock())
synchronized (this)
{
abortRequest = completeRequest(failure);
abortResponse = completeResponse(failure);
@ -273,7 +271,7 @@ public class HttpExchange
public void resetResponse()
{
try (Locker.Lock lock = _locker.lock())
synchronized (this)
{
responseState = State.PENDING;
responseFailure = null;
@ -290,7 +288,7 @@ public class HttpExchange
@Override
public String toString()
{
try (Locker.Lock lock = _locker.lock())
synchronized (this)
{
return String.format("%s@%x req=%s/%s@%h res=%s/%s@%h",
HttpExchange.class.getSimpleName(),