diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java index 65f6454eceb..19ec8ba7bf8 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java @@ -20,10 +20,10 @@ package org.eclipse.jetty.client; import java.util.Collection; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.client.api.Destination; +import org.eclipse.jetty.util.AtomicBiInteger; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.annotation.ManagedAttribute; @@ -39,7 +39,12 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable private static final Logger LOG = Log.getLogger(AbstractConnectionPool.class); private final AtomicBoolean closed = new AtomicBoolean(); - private final AtomicInteger connectionCount = new AtomicInteger(); + + /** + * The connectionCount encodes both the total connections plus the pending connection counts, so both can be atomically changed. + * The bottom 32 bits represent the total connections and the top 32 bits represent the pending connections. + */ + private final AtomicBiInteger connections = new AtomicBiInteger(); private final Destination destination; private final int maxConnections; private final Callback requester; @@ -60,13 +65,19 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable @ManagedAttribute(value = "The number of connections", readonly = true) public int getConnectionCount() { - return connectionCount.get(); + return connections.getLo(); + } + + @ManagedAttribute(value = "The number of pending connections", readonly = true) + public int getPendingCount() + { + return connections.getHi(); } @Override public boolean isEmpty() { - return connectionCount.get() == 0; + return connections.getLo() == 0; } @Override @@ -80,29 +91,34 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable { Connection connection = activate(); if (connection == null) - connection = tryCreate(); + { + tryCreate(-1); + connection = activate(); + } return connection; } - private Connection tryCreate() + protected void tryCreate(int maxPending) { while (true) { - int current = getConnectionCount(); - final int next = current + 1; + long encoded = connections.get(); + int pending = AtomicBiInteger.getHi(encoded); + int total = AtomicBiInteger.getLo(encoded); - if (next > maxConnections) + if (LOG.isDebugEnabled()) + LOG.debug("tryCreate {}/{} connections {}/{} pending",total,maxConnections,pending,maxPending); + + if (total >= maxConnections) + return; + + if (maxPending>=0 && pending>=maxPending) + return; + + if (connections.compareAndSet(encoded,pending+1,total+1)) { if (LOG.isDebugEnabled()) - LOG.debug("Max connections {}/{} reached", current, maxConnections); - // Try again the idle connections - return activate(); - } - - if (connectionCount.compareAndSet(current, next)) - { - if (LOG.isDebugEnabled()) - LOG.debug("Connection {}/{} creation", next, maxConnections); + LOG.debug("newConnection {}/{} connections {}/{} pending", total+1, maxConnections, pending+1, maxPending); destination.newConnection(new Promise() { @@ -110,7 +126,8 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable public void succeeded(Connection connection) { if (LOG.isDebugEnabled()) - LOG.debug("Connection {}/{} creation succeeded {}", next, maxConnections, connection); + LOG.debug("Connection {}/{} creation succeeded {}", total+1, maxConnections, connection); + connections.update(-1,0); onCreated(connection); proceed(); } @@ -119,14 +136,13 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable public void failed(Throwable x) { if (LOG.isDebugEnabled()) - LOG.debug("Connection " + next + "/" + maxConnections + " creation failed", x); - connectionCount.decrementAndGet(); + LOG.debug("Connection " + (total+1) + "/" + maxConnections + " creation failed", x); + connections.update(-1,-1); requester.failed(x); } }); - // Try again the idle connections - return activate(); + return; } } } @@ -174,7 +190,7 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable protected void removed(Connection connection) { - int pooled = connectionCount.decrementAndGet(); + int pooled = connections.updateLo(-1); if (LOG.isDebugEnabled()) LOG.debug("Connection removed {} - pooled: {}", connection, pooled); } @@ -184,7 +200,7 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable { if (closed.compareAndSet(false, true)) { - connectionCount.set(0); + connections.set(0,0); } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java index e26aacb8c37..6f2f73ad708 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java @@ -41,6 +41,7 @@ public class MultiplexConnectionPool extends AbstractConnectionPool implements S private static final Logger LOG = Log.getLogger(MultiplexConnectionPool.class); private final ReentrantLock lock = new ReentrantLock(); + private final HttpDestination destination; private final Deque idleConnections; private final Map muxedConnections; private final Map busyConnections; @@ -49,12 +50,26 @@ public class MultiplexConnectionPool extends AbstractConnectionPool implements S public MultiplexConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex) { super(destination, maxConnections, requester); + this.destination = destination; this.idleConnections = new ArrayDeque<>(maxConnections); this.muxedConnections = new HashMap<>(maxConnections); this.busyConnections = new HashMap<>(maxConnections); this.maxMultiplex = maxMultiplex; } + @Override + public Connection acquire() + { + Connection connection = activate(); + if (connection == null) + { + int maxPending = 1 + destination.getQueuedRequestCount() / getMaxMultiplex(); + tryCreate(maxPending); + connection = activate(); + } + return connection; + } + protected void lock() { lock.lock(); diff --git a/jetty-http2/http2-http-client-transport/src/test/resources/jetty-logging.properties b/jetty-http2/http2-http-client-transport/src/test/resources/jetty-logging.properties index 287d28319e0..a7ef5da7827 100644 --- a/jetty-http2/http2-http-client-transport/src/test/resources/jetty-logging.properties +++ b/jetty-http2/http2-http-client-transport/src/test/resources/jetty-logging.properties @@ -1,5 +1,5 @@ org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog -#org.eclipse.jetty.client.LEVEL=DEBUG +org.eclipse.jetty.client.LEVEL=DEBUG org.eclipse.jetty.http2.hpack.LEVEL=INFO #org.eclipse.jetty.http2.LEVEL=DEBUG #org.eclipse.jetty.io.ssl.LEVEL=DEBUG diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/AtomicBiInteger.java b/jetty-util/src/main/java/org/eclipse/jetty/util/AtomicBiInteger.java new file mode 100644 index 00000000000..34f89170786 --- /dev/null +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/AtomicBiInteger.java @@ -0,0 +1,290 @@ +// +// ======================================================================== +// Copyright (c) 1995-2018 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.util; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * An AtomicLong with additional methods to treat it has + * two hi/lo integers. + */ +public class AtomicBiInteger extends AtomicLong +{ + /** + * @return the hi integer value + */ + public int getHi() + { + return getHi(get()); + } + + /** + * @return the lo integer value + */ + public int getLo() + { + return getLo(get()); + } + + /** + * Atomically set the hi integer value without changing + * the lo value. + * @param hi the new hi value + */ + public int setHi(int hi) + { + while(true) + { + long encoded = get(); + long update = encodeHi(encoded,hi); + if (compareAndSet(encoded,update)) + return getHi(encoded); + } + } + + /** + * Atomically set the lo integer value without changing + * the hi value. + * @param lo the new lo value + */ + public int setLo(int lo) + { + while(true) + { + long encoded = get(); + long update = encodeLo(encoded,lo); + if (compareAndSet(encoded,update)) + return getLo(encoded); + } + } + + /** + * Set the hi and lo integer values. + * @param hi the new hi value + * @param lo the new lo value + */ + public void set(int hi, int lo) + { + set(encode(hi,lo)); + } + + /** + * Atomically sets the hi int value to the given updated value + * only if the current value {@code ==} the expected value. + * Concurrent changes to the lo value result in a retry. + * @param expect the expected value + * @param hi the new value + * @return {@code true} if successful. False return indicates that + * the actual value was not equal to the expected value. + */ + public boolean compareAndSetHi(int expect, int hi) + { + while(true) + { + long encoded = get(); + if (getHi(encoded)!=expect) + return false; + long update = encodeHi(encoded,hi); + if (compareAndSet(encoded,update)) + return true; + } + } + + /** + * Atomically sets the lo int value to the given updated value + * only if the current value {@code ==} the expected value. + * Concurrent changes to the hi value result in a retry. + * @param expect the expected value + * @param lo the new value + * @return {@code true} if successful. False return indicates that + * the actual value was not equal to the expected value. + */ + public boolean compareAndSetLo(int expect, int lo) + { + while(true) + { + long encoded = get(); + if (getLo(encoded)!=expect) + return false; + long update = encodeLo(encoded,lo); + if (compareAndSet(encoded,update)) + return true; + } + } + + /** + * Atomically sets the values to the given updated values + * only if the current encoded value {@code ==} the expected value. + * @param expect the expected encoded values + * @param hi the new hi value + * @param lo the new lo value + * @return {@code true} if successful. False return indicates that + * the actual value was not equal to the expected value. + */ + public boolean compareAndSet(long expect, int hi, int lo) + { + long encoded = get(); + long update = encode(hi,lo); + return compareAndSet(encoded,update); + } + + /** + * Atomically sets the values to the given updated values + * only if the current encoded value {@code ==} the expected value. + * @param expectHi the expected hi values + * @param hi the new hi value + * @param expectLo the expected lo values + * @param lo the new lo value + * @return {@code true} if successful. False return indicates that + * the actual value was not equal to the expected value. + */ + public boolean compareAndSet(int expectHi, int hi, int expectLo, int lo) + { + long encoded = encode(expectHi,expectLo); + long update = encode(hi,lo); + return compareAndSet(encoded,update); + } + + /** + * Atomically updates the current hi value with the results of + * applying the given delta, returning the updated value. + * + * @param delta the delta to apply + * @return the updated value + */ + public int updateHi(int delta) + { + while(true) + { + long encoded = get(); + int hi = getHi(encoded)+delta; + long update = encodeHi(encoded,hi); + if (compareAndSet(encoded,update)) + return hi; + } + } + + /** + * Atomically updates the current lo value with the results of + * applying the given delta, returning the updated value. + * + * @param delta the delta to apply + * @return the updated value + */ + public int updateLo(int delta) + { + while(true) + { + long encoded = get(); + int lo = getLo(encoded)+delta; + long update = encodeLo(encoded,lo); + if (compareAndSet(encoded,update)) + return lo; + } + } + + /** + * Atomically updates the current values with the results of + * applying the given deltas. + * + * @param deltaHi the delta to apply to the hi value + * @param deltaLo the delta to apply to the lo value + */ + public void update(int deltaHi, int deltaLo) + { + while(true) + { + long encoded = get(); + long update = encode(getHi(encoded)+deltaHi, getLo(encoded)+deltaLo); + if (compareAndSet(encoded,update)) + return; + } + } + + /** + * Get a hi int value from an encoded long + * @param encoded the encoded value + * @return the hi int value + */ + public static int getHi(long encoded) + { + return (int) ((encoded>>32)&0xFFFF_FFFFl); + } + + /** + * Get a lo int value from an encoded long + * @param encoded the encoded value + * @return the lo int value + */ + public static int getLo(long encoded) + { + return (int) (encoded&0xFFFF_FFFFl); + } + + /** + * Encode hi and lo int values into a long + * @param hi the hi int value + * @param lo the lo int value + * @return the encoded value + * + */ + public static long encode(int hi, int lo) + { + long h = ((long)hi)&0xFFFF_FFFFl; + long l = ((long)lo)&0xFFFF_FFFFl; + long encoded = (h<<32)+l; + return encoded; + } + + + /** + * Encode hi int values into an already encoded long + * @param encoded the encoded value + * @param hi the hi int value + * @return the encoded value + * + */ + public static long encodeHi(long encoded, int hi) + { + long h = ((long)hi)&0xFFFF_FFFFl; + long l = encoded&0xFFFF_FFFFl; + encoded = (h<<32)+l; + return encoded; + } + + /** + * Encode lo int values into an already encoded long + * @param encoded the encoded value + * @param lo the lo int value + * @return the encoded value + * + */ + public static long encodeLo(long encoded, int lo) + { + long h = (encoded>>32)&0xFFFF_FFFFl; + long l = ((long)lo)&0xFFFF_FFFFl; + encoded = (h<<32)+l; + return encoded; + } + + + + + +} diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/AtomicBiIntegerTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/AtomicBiIntegerTest.java new file mode 100644 index 00000000000..edda43df9e4 --- /dev/null +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/AtomicBiIntegerTest.java @@ -0,0 +1,105 @@ +// +// ======================================================================== +// Copyright (c) 1995-2018 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.util; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.*; + +import org.junit.Test; + +public class AtomicBiIntegerTest +{ + + @Test + public void testBitOperations() + { + long encoded; + + encoded = AtomicBiInteger.encode(0,0); + assertThat(AtomicBiInteger.getHi(encoded),is(0)); + assertThat(AtomicBiInteger.getLo(encoded),is(0)); + + encoded = AtomicBiInteger.encode(1,2); + assertThat(AtomicBiInteger.getHi(encoded),is(1)); + assertThat(AtomicBiInteger.getLo(encoded),is(2)); + + encoded = AtomicBiInteger.encode(Integer.MAX_VALUE,-1); + assertThat(AtomicBiInteger.getHi(encoded),is(Integer.MAX_VALUE)); + assertThat(AtomicBiInteger.getLo(encoded),is(-1)); + encoded = AtomicBiInteger.encodeLo(encoded,42); + assertThat(AtomicBiInteger.getHi(encoded),is(Integer.MAX_VALUE)); + assertThat(AtomicBiInteger.getLo(encoded),is(42)); + + encoded = AtomicBiInteger.encode(-1,Integer.MAX_VALUE); + assertThat(AtomicBiInteger.getHi(encoded),is(-1)); + assertThat(AtomicBiInteger.getLo(encoded),is(Integer.MAX_VALUE)); + encoded = AtomicBiInteger.encodeHi(encoded,42); + assertThat(AtomicBiInteger.getHi(encoded),is(42)); + assertThat(AtomicBiInteger.getLo(encoded),is(Integer.MAX_VALUE)); + + encoded = AtomicBiInteger.encode(Integer.MIN_VALUE,1); + assertThat(AtomicBiInteger.getHi(encoded),is(Integer.MIN_VALUE)); + assertThat(AtomicBiInteger.getLo(encoded),is(1)); + encoded = AtomicBiInteger.encodeLo(encoded,Integer.MAX_VALUE); + assertThat(AtomicBiInteger.getHi(encoded),is(Integer.MIN_VALUE)); + assertThat(AtomicBiInteger.getLo(encoded),is(Integer.MAX_VALUE)); + + encoded = AtomicBiInteger.encode(1,Integer.MIN_VALUE); + assertThat(AtomicBiInteger.getHi(encoded),is(1)); + assertThat(AtomicBiInteger.getLo(encoded),is(Integer.MIN_VALUE)); + encoded = AtomicBiInteger.encodeHi(encoded,Integer.MAX_VALUE); + assertThat(AtomicBiInteger.getHi(encoded),is(Integer.MAX_VALUE)); + assertThat(AtomicBiInteger.getLo(encoded),is(Integer.MIN_VALUE)); + } + + @Test + public void testSet() + { + AtomicBiInteger abi = new AtomicBiInteger(); + assertThat(abi.getHi(),is(0)); + assertThat(abi.getLo(),is(0)); + + abi.setHi(Integer.MAX_VALUE); + assertThat(abi.getHi(),is(Integer.MAX_VALUE)); + assertThat(abi.getLo(),is(0)); + + abi.setLo(Integer.MIN_VALUE); + assertThat(abi.getHi(),is(Integer.MAX_VALUE)); + assertThat(abi.getLo(),is(Integer.MIN_VALUE)); + } + + @Test + public void testCompareAndSet() + { + AtomicBiInteger abi = new AtomicBiInteger(); + assertThat(abi.getHi(),is(0)); + assertThat(abi.getLo(),is(0)); + + assertFalse(abi.compareAndSetHi(1,42)); + assertTrue(abi.compareAndSetHi(0,42)); + assertThat(abi.getHi(),is(42)); + assertThat(abi.getLo(),is(0)); + + assertFalse(abi.compareAndSetLo(1,-42)); + assertTrue(abi.compareAndSetLo(0,-42)); + assertThat(abi.getHi(),is(42)); + assertThat(abi.getLo(),is(-42)); + } + +}