mirror of https://github.com/apache/activemq.git
Enhancement for https://issues.apache.org/jira/browse/AMQ-5076 -- pooled session creation blocks
This commit is contained in:
parent
c3d8ca7160
commit
dc607bbf35
|
@ -296,6 +296,34 @@ public class ConnectionPool {
|
||||||
return this.sessionPool.getWhenExhaustedAction() == GenericObjectPool.WHEN_EXHAUSTED_BLOCK;
|
return this.sessionPool.getWhenExhaustedAction() == GenericObjectPool.WHEN_EXHAUSTED_BLOCK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the timeout to use for blocking creating new sessions
|
||||||
|
*
|
||||||
|
* @return true if the pooled Connection createSession method will block when the limit is hit.
|
||||||
|
* @see #setBlockIfSessionPoolIsFull(boolean)
|
||||||
|
*/
|
||||||
|
public long getBlockIfSessionPoolIsFullTimeout() {
|
||||||
|
return this.sessionPool.getMaxWait();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Controls the behavior of the internal session pool. By default the call to
|
||||||
|
* Connection.getSession() will block if the session pool is full. This setting
|
||||||
|
* will affect how long it blocks and throws an exception after the timeout.
|
||||||
|
*
|
||||||
|
* The size of the session pool is controlled by the @see #maximumActive
|
||||||
|
* property.
|
||||||
|
*
|
||||||
|
* Whether or not the call to create session blocks is controlled by the @see #blockIfSessionPoolIsFull
|
||||||
|
* property
|
||||||
|
*
|
||||||
|
* @param blockIfSessionPoolIsFullTimeout - if blockIfSessionPoolIsFullTimeout is true,
|
||||||
|
* then use this setting to configure how long to block before retry
|
||||||
|
*/
|
||||||
|
public void setBlockIfSessionPoolIsFullTimeout(long blockIfSessionPoolIsFullTimeout) {
|
||||||
|
this.sessionPool.setMaxWait(blockIfSessionPoolIsFullTimeout);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "ConnectionPool[" + connection + "]";
|
return "ConnectionPool[" + connection + "]";
|
||||||
|
|
|
@ -68,6 +68,7 @@ public class PooledConnectionFactory implements ConnectionFactory {
|
||||||
private int maximumActiveSessionPerConnection = 500;
|
private int maximumActiveSessionPerConnection = 500;
|
||||||
private int idleTimeout = 30 * 1000;
|
private int idleTimeout = 30 * 1000;
|
||||||
private boolean blockIfSessionPoolIsFull = true;
|
private boolean blockIfSessionPoolIsFull = true;
|
||||||
|
private long blockIfSessionPoolIsFullTimeout = -1L;
|
||||||
private long expiryTimeout = 0l;
|
private long expiryTimeout = 0l;
|
||||||
private boolean createConnectionOnStartup = true;
|
private boolean createConnectionOnStartup = true;
|
||||||
private boolean useAnonymousProducers = true;
|
private boolean useAnonymousProducers = true;
|
||||||
|
@ -102,6 +103,9 @@ public class PooledConnectionFactory implements ConnectionFactory {
|
||||||
connection.setExpiryTimeout(getExpiryTimeout());
|
connection.setExpiryTimeout(getExpiryTimeout());
|
||||||
connection.setMaximumActiveSessionPerConnection(getMaximumActiveSessionPerConnection());
|
connection.setMaximumActiveSessionPerConnection(getMaximumActiveSessionPerConnection());
|
||||||
connection.setBlockIfSessionPoolIsFull(isBlockIfSessionPoolIsFull());
|
connection.setBlockIfSessionPoolIsFull(isBlockIfSessionPoolIsFull());
|
||||||
|
if (isBlockIfSessionPoolIsFull() && getBlockIfSessionPoolIsFullTimeout() > 0) {
|
||||||
|
connection.setBlockIfSessionPoolIsFullTimeout(getBlockIfSessionPoolIsFullTimeout());
|
||||||
|
}
|
||||||
connection.setUseAnonymousProducers(isUseAnonymousProducers());
|
connection.setUseAnonymousProducers(isUseAnonymousProducers());
|
||||||
|
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
|
@ -337,7 +341,7 @@ public class PooledConnectionFactory implements ConnectionFactory {
|
||||||
* once the maximum number of sessions has been borrowed from the the Session Pool.
|
* once the maximum number of sessions has been borrowed from the the Session Pool.
|
||||||
*
|
*
|
||||||
* @return true if the pooled Connection createSession method will block when the limit is hit.
|
* @return true if the pooled Connection createSession method will block when the limit is hit.
|
||||||
* @see setBlockIfSessionPoolIsFull
|
* @see #setBlockIfSessionPoolIsFull(boolean)
|
||||||
*/
|
*/
|
||||||
public boolean isBlockIfSessionPoolIsFull() {
|
public boolean isBlockIfSessionPoolIsFull() {
|
||||||
return this.blockIfSessionPoolIsFull;
|
return this.blockIfSessionPoolIsFull;
|
||||||
|
@ -504,4 +508,32 @@ public class PooledConnectionFactory implements ConnectionFactory {
|
||||||
protected ConnectionPool createConnectionPool(Connection connection) {
|
protected ConnectionPool createConnectionPool(Connection connection) {
|
||||||
return new ConnectionPool(connection);
|
return new ConnectionPool(connection);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the timeout to use for blocking creating new sessions
|
||||||
|
*
|
||||||
|
* @return true if the pooled Connection createSession method will block when the limit is hit.
|
||||||
|
* @see #setBlockIfSessionPoolIsFull(boolean)
|
||||||
|
*/
|
||||||
|
public long getBlockIfSessionPoolIsFullTimeout() {
|
||||||
|
return blockIfSessionPoolIsFullTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Controls the behavior of the internal session pool. By default the call to
|
||||||
|
* Connection.getSession() will block if the session pool is full. This setting
|
||||||
|
* will affect how long it blocks and throws an exception after the timeout.
|
||||||
|
*
|
||||||
|
* The size of the session pool is controlled by the @see #maximumActive
|
||||||
|
* property.
|
||||||
|
*
|
||||||
|
* Whether or not the call to create session blocks is controlled by the @see #blockIfSessionPoolIsFull
|
||||||
|
* property
|
||||||
|
*
|
||||||
|
* @param blockIfSessionPoolIsFullTimeout - if blockIfSessionPoolIsFullTimeout is true,
|
||||||
|
* then use this setting to configure how long to block before retry
|
||||||
|
*/
|
||||||
|
public void setBlockIfSessionPoolIsFullTimeout(long blockIfSessionPoolIsFullTimeout) {
|
||||||
|
this.blockIfSessionPoolIsFullTimeout = blockIfSessionPoolIsFullTimeout;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,156 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.jms.pool;
|
||||||
|
|
||||||
|
import junit.framework.TestCase;
|
||||||
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.TransportConnector;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
|
import javax.jms.*;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.*;
|
||||||
|
|
||||||
|
public class PooledSessionExhaustionBlockTimeoutTest extends TestCase {
|
||||||
|
private static final String QUEUE = "FOO";
|
||||||
|
private static final int NUM_MESSAGES = 500;
|
||||||
|
|
||||||
|
private Logger logger = Logger.getLogger(getClass());
|
||||||
|
|
||||||
|
private BrokerService broker;
|
||||||
|
private ActiveMQConnectionFactory factory;
|
||||||
|
private PooledConnectionFactory pooledFactory;
|
||||||
|
private String connectionUri;
|
||||||
|
private int numReceived = 0;
|
||||||
|
private final List<Exception> exceptionList = new ArrayList<Exception>();
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void setUp() throws Exception {
|
||||||
|
broker = new BrokerService();
|
||||||
|
broker.setPersistent(false);
|
||||||
|
broker.setUseJmx(false);
|
||||||
|
TransportConnector connector = broker.addConnector("tcp://localhost:0");
|
||||||
|
broker.start();
|
||||||
|
connectionUri = connector.getPublishableConnectString();
|
||||||
|
factory = new ActiveMQConnectionFactory(connectionUri);
|
||||||
|
pooledFactory = new PooledConnectionFactory();
|
||||||
|
pooledFactory.setConnectionFactory(factory);
|
||||||
|
pooledFactory.setMaxConnections(1);
|
||||||
|
pooledFactory.setBlockIfSessionPoolIsFull(true);
|
||||||
|
pooledFactory.setBlockIfSessionPoolIsFullTimeout(500);
|
||||||
|
pooledFactory.setMaximumActiveSessionPerConnection(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void tearDown() throws Exception {
|
||||||
|
broker.stop();
|
||||||
|
broker.waitUntilStopped();
|
||||||
|
broker = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
class TestRunner implements Runnable {
|
||||||
|
|
||||||
|
CyclicBarrier barrier;
|
||||||
|
CountDownLatch latch;
|
||||||
|
TestRunner(CyclicBarrier barrier, CountDownLatch latch) {
|
||||||
|
this.barrier = barrier;
|
||||||
|
this.latch = latch;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
barrier.await();
|
||||||
|
sendMessages(pooledFactory);
|
||||||
|
this.latch.countDown();
|
||||||
|
} catch (Exception e) {
|
||||||
|
exceptionList.add(e);
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void sendMessages(ConnectionFactory connectionFactory) throws Exception {
|
||||||
|
for (int i = 0; i < NUM_MESSAGES; i++) {
|
||||||
|
Connection connection = connectionFactory.createConnection();
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Destination destination = session.createQueue(QUEUE);
|
||||||
|
MessageProducer producer = session.createProducer(destination);
|
||||||
|
|
||||||
|
String msgTo = "hello";
|
||||||
|
TextMessage message = session.createTextMessage(msgTo);
|
||||||
|
producer.send(message);
|
||||||
|
connection.close();
|
||||||
|
logger.info("sent " + i + " messages using " + connectionFactory.getClass());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testCanExhaustSessions() throws Exception {
|
||||||
|
final int totalMessagesExpected = NUM_MESSAGES * 2;
|
||||||
|
final CountDownLatch latch = new CountDownLatch(2);
|
||||||
|
Thread thread = new Thread(new Runnable() {
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri);
|
||||||
|
Connection connection = connectionFactory.createConnection();
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Destination destination = session.createQueue(QUEUE);
|
||||||
|
MessageConsumer consumer = session.createConsumer(destination);
|
||||||
|
for (int i = 0; i < totalMessagesExpected; ++i) {
|
||||||
|
Message msg = consumer.receive(5000);
|
||||||
|
if (msg == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
numReceived++;
|
||||||
|
if (numReceived % 20 == 0) {
|
||||||
|
logger.debug("received " + numReceived + " messages ");
|
||||||
|
System.runFinalization();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
thread.start();
|
||||||
|
|
||||||
|
ExecutorService threads = Executors.newFixedThreadPool(2);
|
||||||
|
final CyclicBarrier barrier = new CyclicBarrier(2, new Runnable() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
System.out.println("Starting threads to send messages!");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
threads.execute(new TestRunner(barrier, latch));
|
||||||
|
threads.execute(new TestRunner(barrier, latch));
|
||||||
|
|
||||||
|
latch.await(2, TimeUnit.SECONDS);
|
||||||
|
thread.join();
|
||||||
|
|
||||||
|
assertEquals(totalMessagesExpected, numReceived);
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -32,9 +32,15 @@ import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.broker.TransportConnector;
|
import org.apache.activemq.broker.TransportConnector;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CyclicBarrier;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
|
||||||
public class PooledSessionExhaustionTest extends TestCase {
|
public class PooledSessionExhaustionTest extends TestCase {
|
||||||
private static final String QUEUE = "FOO";
|
private static final String QUEUE = "FOO";
|
||||||
private static final int NUM_MESSAGES = 700;
|
private static final int NUM_MESSAGES = 500;
|
||||||
|
|
||||||
private Logger logger = Logger.getLogger(getClass());
|
private Logger logger = Logger.getLogger(getClass());
|
||||||
|
|
||||||
|
@ -43,6 +49,7 @@ public class PooledSessionExhaustionTest extends TestCase {
|
||||||
private PooledConnectionFactory pooledFactory;
|
private PooledConnectionFactory pooledFactory;
|
||||||
private String connectionUri;
|
private String connectionUri;
|
||||||
private int numReceived = 0;
|
private int numReceived = 0;
|
||||||
|
private final List<Exception> exceptionList = new ArrayList<Exception>();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void setUp() throws Exception {
|
protected void setUp() throws Exception {
|
||||||
|
@ -57,6 +64,7 @@ public class PooledSessionExhaustionTest extends TestCase {
|
||||||
pooledFactory.setConnectionFactory(factory);
|
pooledFactory.setConnectionFactory(factory);
|
||||||
pooledFactory.setMaxConnections(1);
|
pooledFactory.setMaxConnections(1);
|
||||||
pooledFactory.setBlockIfSessionPoolIsFull(false);
|
pooledFactory.setBlockIfSessionPoolIsFull(false);
|
||||||
|
pooledFactory.setMaximumActiveSessionPerConnection(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -66,6 +74,25 @@ public class PooledSessionExhaustionTest extends TestCase {
|
||||||
broker = null;
|
broker = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class TestRunner implements Runnable {
|
||||||
|
|
||||||
|
CyclicBarrier barrier;
|
||||||
|
TestRunner(CyclicBarrier barrier) {
|
||||||
|
this.barrier = barrier;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
barrier.await();
|
||||||
|
sendMessages(pooledFactory);
|
||||||
|
} catch (Exception e) {
|
||||||
|
exceptionList.add(e);
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void sendMessages(ConnectionFactory connectionFactory) throws Exception {
|
public void sendMessages(ConnectionFactory connectionFactory) throws Exception {
|
||||||
for (int i = 0; i < NUM_MESSAGES; i++) {
|
for (int i = 0; i < NUM_MESSAGES; i++) {
|
||||||
Connection connection = connectionFactory.createConnection();
|
Connection connection = connectionFactory.createConnection();
|
||||||
|
@ -79,7 +106,7 @@ public class PooledSessionExhaustionTest extends TestCase {
|
||||||
TextMessage message = session.createTextMessage(msgTo);
|
TextMessage message = session.createTextMessage(msgTo);
|
||||||
producer.send(message);
|
producer.send(message);
|
||||||
connection.close();
|
connection.close();
|
||||||
logger.debug("sent " + i + " messages using " + connectionFactory.getClass());
|
logger.info("sent " + i + " messages using " + connectionFactory.getClass());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -112,9 +139,23 @@ public class PooledSessionExhaustionTest extends TestCase {
|
||||||
});
|
});
|
||||||
thread.start();
|
thread.start();
|
||||||
|
|
||||||
sendMessages(pooledFactory);
|
ExecutorService threads = Executors.newFixedThreadPool(2);
|
||||||
|
final CyclicBarrier barrier = new CyclicBarrier(2, new Runnable() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
System.out.println("Starting threads to send messages!");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
threads.execute(new TestRunner(barrier));
|
||||||
|
threads.execute(new TestRunner(barrier));
|
||||||
|
|
||||||
thread.join();
|
thread.join();
|
||||||
|
|
||||||
|
// we should expect that one of the threads will die because it cannot acquire a session,
|
||||||
|
// will throw an exception
|
||||||
assertEquals(NUM_MESSAGES, numReceived);
|
assertEquals(NUM_MESSAGES, numReceived);
|
||||||
|
assertEquals(exceptionList.size(), 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue