Jetty 9.4.x 2293 pending multiplexed (#2294)

Issue #2293 Pending Multiplexed Connections

Added a AtomicBiInteger to allow both total connections and pending connections to be encoded in the
same atomic int.

Signed-off-by: Greg Wilkins <gregw@webtide.com>
This commit is contained in:
Greg Wilkins 2018-03-07 19:31:22 +11:00 committed by GitHub
parent 5216d07124
commit 74055b9837
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 452 additions and 26 deletions

View File

@ -20,10 +20,10 @@ package org.eclipse.jetty.client;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.util.AtomicBiInteger;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
@ -39,7 +39,12 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
private static final Logger LOG = Log.getLogger(AbstractConnectionPool.class);
private final AtomicBoolean closed = new AtomicBoolean();
private final AtomicInteger connectionCount = new AtomicInteger();
/**
* The connectionCount encodes both the total connections plus the pending connection counts, so both can be atomically changed.
* The bottom 32 bits represent the total connections and the top 32 bits represent the pending connections.
*/
private final AtomicBiInteger connections = new AtomicBiInteger();
private final Destination destination;
private final int maxConnections;
private final Callback requester;
@ -60,13 +65,19 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
@ManagedAttribute(value = "The number of connections", readonly = true)
public int getConnectionCount()
{
return connectionCount.get();
return connections.getLo();
}
@ManagedAttribute(value = "The number of pending connections", readonly = true)
public int getPendingCount()
{
return connections.getHi();
}
@Override
public boolean isEmpty()
{
return connectionCount.get() == 0;
return connections.getLo() == 0;
}
@Override
@ -80,29 +91,34 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
{
Connection connection = activate();
if (connection == null)
connection = tryCreate();
{
tryCreate(-1);
connection = activate();
}
return connection;
}
private Connection tryCreate()
protected void tryCreate(int maxPending)
{
while (true)
{
int current = getConnectionCount();
final int next = current + 1;
long encoded = connections.get();
int pending = AtomicBiInteger.getHi(encoded);
int total = AtomicBiInteger.getLo(encoded);
if (next > maxConnections)
if (LOG.isDebugEnabled())
LOG.debug("tryCreate {}/{} connections {}/{} pending",total,maxConnections,pending,maxPending);
if (total >= maxConnections)
return;
if (maxPending>=0 && pending>=maxPending)
return;
if (connections.compareAndSet(encoded,pending+1,total+1))
{
if (LOG.isDebugEnabled())
LOG.debug("Max connections {}/{} reached", current, maxConnections);
// Try again the idle connections
return activate();
}
if (connectionCount.compareAndSet(current, next))
{
if (LOG.isDebugEnabled())
LOG.debug("Connection {}/{} creation", next, maxConnections);
LOG.debug("newConnection {}/{} connections {}/{} pending", total+1, maxConnections, pending+1, maxPending);
destination.newConnection(new Promise<Connection>()
{
@ -110,7 +126,8 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
public void succeeded(Connection connection)
{
if (LOG.isDebugEnabled())
LOG.debug("Connection {}/{} creation succeeded {}", next, maxConnections, connection);
LOG.debug("Connection {}/{} creation succeeded {}", total+1, maxConnections, connection);
connections.update(-1,0);
onCreated(connection);
proceed();
}
@ -119,14 +136,13 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
public void failed(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Connection " + next + "/" + maxConnections + " creation failed", x);
connectionCount.decrementAndGet();
LOG.debug("Connection " + (total+1) + "/" + maxConnections + " creation failed", x);
connections.update(-1,-1);
requester.failed(x);
}
});
// Try again the idle connections
return activate();
return;
}
}
}
@ -174,7 +190,7 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
protected void removed(Connection connection)
{
int pooled = connectionCount.decrementAndGet();
int pooled = connections.updateLo(-1);
if (LOG.isDebugEnabled())
LOG.debug("Connection removed {} - pooled: {}", connection, pooled);
}
@ -184,7 +200,7 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
{
if (closed.compareAndSet(false, true))
{
connectionCount.set(0);
connections.set(0,0);
}
}

View File

@ -41,6 +41,7 @@ public class MultiplexConnectionPool extends AbstractConnectionPool implements S
private static final Logger LOG = Log.getLogger(MultiplexConnectionPool.class);
private final ReentrantLock lock = new ReentrantLock();
private final HttpDestination destination;
private final Deque<Holder> idleConnections;
private final Map<Connection, Holder> muxedConnections;
private final Map<Connection, Holder> busyConnections;
@ -49,12 +50,26 @@ public class MultiplexConnectionPool extends AbstractConnectionPool implements S
public MultiplexConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
super(destination, maxConnections, requester);
this.destination = destination;
this.idleConnections = new ArrayDeque<>(maxConnections);
this.muxedConnections = new HashMap<>(maxConnections);
this.busyConnections = new HashMap<>(maxConnections);
this.maxMultiplex = maxMultiplex;
}
@Override
public Connection acquire()
{
Connection connection = activate();
if (connection == null)
{
int maxPending = 1 + destination.getQueuedRequestCount() / getMaxMultiplex();
tryCreate(maxPending);
connection = activate();
}
return connection;
}
protected void lock()
{
lock.lock();

View File

@ -1,5 +1,5 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
#org.eclipse.jetty.client.LEVEL=DEBUG
org.eclipse.jetty.client.LEVEL=DEBUG
org.eclipse.jetty.http2.hpack.LEVEL=INFO
#org.eclipse.jetty.http2.LEVEL=DEBUG
#org.eclipse.jetty.io.ssl.LEVEL=DEBUG

View File

@ -0,0 +1,290 @@
//
// ========================================================================
// Copyright (c) 1995-2018 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util;
import java.util.concurrent.atomic.AtomicLong;
/**
* An AtomicLong with additional methods to treat it has
* two hi/lo integers.
*/
public class AtomicBiInteger extends AtomicLong
{
/**
* @return the hi integer value
*/
public int getHi()
{
return getHi(get());
}
/**
* @return the lo integer value
*/
public int getLo()
{
return getLo(get());
}
/**
* Atomically set the hi integer value without changing
* the lo value.
* @param hi the new hi value
*/
public int setHi(int hi)
{
while(true)
{
long encoded = get();
long update = encodeHi(encoded,hi);
if (compareAndSet(encoded,update))
return getHi(encoded);
}
}
/**
* Atomically set the lo integer value without changing
* the hi value.
* @param lo the new lo value
*/
public int setLo(int lo)
{
while(true)
{
long encoded = get();
long update = encodeLo(encoded,lo);
if (compareAndSet(encoded,update))
return getLo(encoded);
}
}
/**
* Set the hi and lo integer values.
* @param hi the new hi value
* @param lo the new lo value
*/
public void set(int hi, int lo)
{
set(encode(hi,lo));
}
/**
* Atomically sets the hi int value to the given updated value
* only if the current value {@code ==} the expected value.
* Concurrent changes to the lo value result in a retry.
* @param expect the expected value
* @param hi the new value
* @return {@code true} if successful. False return indicates that
* the actual value was not equal to the expected value.
*/
public boolean compareAndSetHi(int expect, int hi)
{
while(true)
{
long encoded = get();
if (getHi(encoded)!=expect)
return false;
long update = encodeHi(encoded,hi);
if (compareAndSet(encoded,update))
return true;
}
}
/**
* Atomically sets the lo int value to the given updated value
* only if the current value {@code ==} the expected value.
* Concurrent changes to the hi value result in a retry.
* @param expect the expected value
* @param lo the new value
* @return {@code true} if successful. False return indicates that
* the actual value was not equal to the expected value.
*/
public boolean compareAndSetLo(int expect, int lo)
{
while(true)
{
long encoded = get();
if (getLo(encoded)!=expect)
return false;
long update = encodeLo(encoded,lo);
if (compareAndSet(encoded,update))
return true;
}
}
/**
* Atomically sets the values to the given updated values
* only if the current encoded value {@code ==} the expected value.
* @param expect the expected encoded values
* @param hi the new hi value
* @param lo the new lo value
* @return {@code true} if successful. False return indicates that
* the actual value was not equal to the expected value.
*/
public boolean compareAndSet(long expect, int hi, int lo)
{
long encoded = get();
long update = encode(hi,lo);
return compareAndSet(encoded,update);
}
/**
* Atomically sets the values to the given updated values
* only if the current encoded value {@code ==} the expected value.
* @param expectHi the expected hi values
* @param hi the new hi value
* @param expectLo the expected lo values
* @param lo the new lo value
* @return {@code true} if successful. False return indicates that
* the actual value was not equal to the expected value.
*/
public boolean compareAndSet(int expectHi, int hi, int expectLo, int lo)
{
long encoded = encode(expectHi,expectLo);
long update = encode(hi,lo);
return compareAndSet(encoded,update);
}
/**
* Atomically updates the current hi value with the results of
* applying the given delta, returning the updated value.
*
* @param delta the delta to apply
* @return the updated value
*/
public int updateHi(int delta)
{
while(true)
{
long encoded = get();
int hi = getHi(encoded)+delta;
long update = encodeHi(encoded,hi);
if (compareAndSet(encoded,update))
return hi;
}
}
/**
* Atomically updates the current lo value with the results of
* applying the given delta, returning the updated value.
*
* @param delta the delta to apply
* @return the updated value
*/
public int updateLo(int delta)
{
while(true)
{
long encoded = get();
int lo = getLo(encoded)+delta;
long update = encodeLo(encoded,lo);
if (compareAndSet(encoded,update))
return lo;
}
}
/**
* Atomically updates the current values with the results of
* applying the given deltas.
*
* @param deltaHi the delta to apply to the hi value
* @param deltaLo the delta to apply to the lo value
*/
public void update(int deltaHi, int deltaLo)
{
while(true)
{
long encoded = get();
long update = encode(getHi(encoded)+deltaHi, getLo(encoded)+deltaLo);
if (compareAndSet(encoded,update))
return;
}
}
/**
* Get a hi int value from an encoded long
* @param encoded the encoded value
* @return the hi int value
*/
public static int getHi(long encoded)
{
return (int) ((encoded>>32)&0xFFFF_FFFFl);
}
/**
* Get a lo int value from an encoded long
* @param encoded the encoded value
* @return the lo int value
*/
public static int getLo(long encoded)
{
return (int) (encoded&0xFFFF_FFFFl);
}
/**
* Encode hi and lo int values into a long
* @param hi the hi int value
* @param lo the lo int value
* @return the encoded value
*
*/
public static long encode(int hi, int lo)
{
long h = ((long)hi)&0xFFFF_FFFFl;
long l = ((long)lo)&0xFFFF_FFFFl;
long encoded = (h<<32)+l;
return encoded;
}
/**
* Encode hi int values into an already encoded long
* @param encoded the encoded value
* @param hi the hi int value
* @return the encoded value
*
*/
public static long encodeHi(long encoded, int hi)
{
long h = ((long)hi)&0xFFFF_FFFFl;
long l = encoded&0xFFFF_FFFFl;
encoded = (h<<32)+l;
return encoded;
}
/**
* Encode lo int values into an already encoded long
* @param encoded the encoded value
* @param lo the lo int value
* @return the encoded value
*
*/
public static long encodeLo(long encoded, int lo)
{
long h = (encoded>>32)&0xFFFF_FFFFl;
long l = ((long)lo)&0xFFFF_FFFFl;
encoded = (h<<32)+l;
return encoded;
}
}

View File

@ -0,0 +1,105 @@
//
// ========================================================================
// Copyright (c) 1995-2018 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.*;
import org.junit.Test;
public class AtomicBiIntegerTest
{
@Test
public void testBitOperations()
{
long encoded;
encoded = AtomicBiInteger.encode(0,0);
assertThat(AtomicBiInteger.getHi(encoded),is(0));
assertThat(AtomicBiInteger.getLo(encoded),is(0));
encoded = AtomicBiInteger.encode(1,2);
assertThat(AtomicBiInteger.getHi(encoded),is(1));
assertThat(AtomicBiInteger.getLo(encoded),is(2));
encoded = AtomicBiInteger.encode(Integer.MAX_VALUE,-1);
assertThat(AtomicBiInteger.getHi(encoded),is(Integer.MAX_VALUE));
assertThat(AtomicBiInteger.getLo(encoded),is(-1));
encoded = AtomicBiInteger.encodeLo(encoded,42);
assertThat(AtomicBiInteger.getHi(encoded),is(Integer.MAX_VALUE));
assertThat(AtomicBiInteger.getLo(encoded),is(42));
encoded = AtomicBiInteger.encode(-1,Integer.MAX_VALUE);
assertThat(AtomicBiInteger.getHi(encoded),is(-1));
assertThat(AtomicBiInteger.getLo(encoded),is(Integer.MAX_VALUE));
encoded = AtomicBiInteger.encodeHi(encoded,42);
assertThat(AtomicBiInteger.getHi(encoded),is(42));
assertThat(AtomicBiInteger.getLo(encoded),is(Integer.MAX_VALUE));
encoded = AtomicBiInteger.encode(Integer.MIN_VALUE,1);
assertThat(AtomicBiInteger.getHi(encoded),is(Integer.MIN_VALUE));
assertThat(AtomicBiInteger.getLo(encoded),is(1));
encoded = AtomicBiInteger.encodeLo(encoded,Integer.MAX_VALUE);
assertThat(AtomicBiInteger.getHi(encoded),is(Integer.MIN_VALUE));
assertThat(AtomicBiInteger.getLo(encoded),is(Integer.MAX_VALUE));
encoded = AtomicBiInteger.encode(1,Integer.MIN_VALUE);
assertThat(AtomicBiInteger.getHi(encoded),is(1));
assertThat(AtomicBiInteger.getLo(encoded),is(Integer.MIN_VALUE));
encoded = AtomicBiInteger.encodeHi(encoded,Integer.MAX_VALUE);
assertThat(AtomicBiInteger.getHi(encoded),is(Integer.MAX_VALUE));
assertThat(AtomicBiInteger.getLo(encoded),is(Integer.MIN_VALUE));
}
@Test
public void testSet()
{
AtomicBiInteger abi = new AtomicBiInteger();
assertThat(abi.getHi(),is(0));
assertThat(abi.getLo(),is(0));
abi.setHi(Integer.MAX_VALUE);
assertThat(abi.getHi(),is(Integer.MAX_VALUE));
assertThat(abi.getLo(),is(0));
abi.setLo(Integer.MIN_VALUE);
assertThat(abi.getHi(),is(Integer.MAX_VALUE));
assertThat(abi.getLo(),is(Integer.MIN_VALUE));
}
@Test
public void testCompareAndSet()
{
AtomicBiInteger abi = new AtomicBiInteger();
assertThat(abi.getHi(),is(0));
assertThat(abi.getLo(),is(0));
assertFalse(abi.compareAndSetHi(1,42));
assertTrue(abi.compareAndSetHi(0,42));
assertThat(abi.getHi(),is(42));
assertThat(abi.getLo(),is(0));
assertFalse(abi.compareAndSetLo(1,-42));
assertTrue(abi.compareAndSetLo(0,-42));
assertThat(abi.getHi(),is(42));
assertThat(abi.getLo(),is(-42));
}
}