From dc607bbf3565bd955b7f5237047fdf3bec4cb7bd Mon Sep 17 00:00:00 2001 From: Christian Posta Date: Thu, 27 Feb 2014 08:39:38 -0700 Subject: [PATCH] Enhancement for https://issues.apache.org/jira/browse/AMQ-5076 -- pooled session creation blocks --- .../activemq/jms/pool/ConnectionPool.java | 28 ++++ .../jms/pool/PooledConnectionFactory.java | 34 +++- ...oledSessionExhaustionBlockTimeoutTest.java | 156 ++++++++++++++++++ .../jms/pool/PooledSessionExhaustionTest.java | 47 +++++- 4 files changed, 261 insertions(+), 4 deletions(-) create mode 100644 activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionExhaustionBlockTimeoutTest.java diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java index 08d20380e1..eced58821f 100644 --- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java +++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java @@ -296,6 +296,34 @@ public class ConnectionPool { return this.sessionPool.getWhenExhaustedAction() == GenericObjectPool.WHEN_EXHAUSTED_BLOCK; } + /** + * Returns the timeout to use for blocking creating new sessions + * + * @return true if the pooled Connection createSession method will block when the limit is hit. + * @see #setBlockIfSessionPoolIsFull(boolean) + */ + public long getBlockIfSessionPoolIsFullTimeout() { + return this.sessionPool.getMaxWait(); + } + + /** + * Controls the behavior of the internal session pool. By default the call to + * Connection.getSession() will block if the session pool is full. This setting + * will affect how long it blocks and throws an exception after the timeout. + * + * The size of the session pool is controlled by the @see #maximumActive + * property. + * + * Whether or not the call to create session blocks is controlled by the @see #blockIfSessionPoolIsFull + * property + * + * @param blockIfSessionPoolIsFullTimeout - if blockIfSessionPoolIsFullTimeout is true, + * then use this setting to configure how long to block before retry + */ + public void setBlockIfSessionPoolIsFullTimeout(long blockIfSessionPoolIsFullTimeout) { + this.sessionPool.setMaxWait(blockIfSessionPoolIsFullTimeout); + } + @Override public String toString() { return "ConnectionPool[" + connection + "]"; diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java index 9ac853d90c..86f5972de6 100644 --- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java +++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java @@ -68,6 +68,7 @@ public class PooledConnectionFactory implements ConnectionFactory { private int maximumActiveSessionPerConnection = 500; private int idleTimeout = 30 * 1000; private boolean blockIfSessionPoolIsFull = true; + private long blockIfSessionPoolIsFullTimeout = -1L; private long expiryTimeout = 0l; private boolean createConnectionOnStartup = true; private boolean useAnonymousProducers = true; @@ -102,6 +103,9 @@ public class PooledConnectionFactory implements ConnectionFactory { connection.setExpiryTimeout(getExpiryTimeout()); connection.setMaximumActiveSessionPerConnection(getMaximumActiveSessionPerConnection()); connection.setBlockIfSessionPoolIsFull(isBlockIfSessionPoolIsFull()); + if (isBlockIfSessionPoolIsFull() && getBlockIfSessionPoolIsFullTimeout() > 0) { + connection.setBlockIfSessionPoolIsFullTimeout(getBlockIfSessionPoolIsFullTimeout()); + } connection.setUseAnonymousProducers(isUseAnonymousProducers()); if (LOG.isTraceEnabled()) { @@ -337,7 +341,7 @@ public class PooledConnectionFactory implements ConnectionFactory { * once the maximum number of sessions has been borrowed from the the Session Pool. * * @return true if the pooled Connection createSession method will block when the limit is hit. - * @see setBlockIfSessionPoolIsFull + * @see #setBlockIfSessionPoolIsFull(boolean) */ public boolean isBlockIfSessionPoolIsFull() { return this.blockIfSessionPoolIsFull; @@ -504,4 +508,32 @@ public class PooledConnectionFactory implements ConnectionFactory { protected ConnectionPool createConnectionPool(Connection connection) { return new ConnectionPool(connection); } + + /** + * Returns the timeout to use for blocking creating new sessions + * + * @return true if the pooled Connection createSession method will block when the limit is hit. + * @see #setBlockIfSessionPoolIsFull(boolean) + */ + public long getBlockIfSessionPoolIsFullTimeout() { + return blockIfSessionPoolIsFullTimeout; + } + + /** + * Controls the behavior of the internal session pool. By default the call to + * Connection.getSession() will block if the session pool is full. This setting + * will affect how long it blocks and throws an exception after the timeout. + * + * The size of the session pool is controlled by the @see #maximumActive + * property. + * + * Whether or not the call to create session blocks is controlled by the @see #blockIfSessionPoolIsFull + * property + * + * @param blockIfSessionPoolIsFullTimeout - if blockIfSessionPoolIsFullTimeout is true, + * then use this setting to configure how long to block before retry + */ + public void setBlockIfSessionPoolIsFullTimeout(long blockIfSessionPoolIsFullTimeout) { + this.blockIfSessionPoolIsFullTimeout = blockIfSessionPoolIsFullTimeout; + } } diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionExhaustionBlockTimeoutTest.java b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionExhaustionBlockTimeoutTest.java new file mode 100644 index 0000000000..ab857ef8f9 --- /dev/null +++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionExhaustionBlockTimeoutTest.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.jms.pool; + +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.log4j.Logger; + +import javax.jms.*; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; + +public class PooledSessionExhaustionBlockTimeoutTest extends TestCase { + private static final String QUEUE = "FOO"; + private static final int NUM_MESSAGES = 500; + + private Logger logger = Logger.getLogger(getClass()); + + private BrokerService broker; + private ActiveMQConnectionFactory factory; + private PooledConnectionFactory pooledFactory; + private String connectionUri; + private int numReceived = 0; + private final List exceptionList = new ArrayList(); + + + @Override + protected void setUp() throws Exception { + broker = new BrokerService(); + broker.setPersistent(false); + broker.setUseJmx(false); + TransportConnector connector = broker.addConnector("tcp://localhost:0"); + broker.start(); + connectionUri = connector.getPublishableConnectString(); + factory = new ActiveMQConnectionFactory(connectionUri); + pooledFactory = new PooledConnectionFactory(); + pooledFactory.setConnectionFactory(factory); + pooledFactory.setMaxConnections(1); + pooledFactory.setBlockIfSessionPoolIsFull(true); + pooledFactory.setBlockIfSessionPoolIsFullTimeout(500); + pooledFactory.setMaximumActiveSessionPerConnection(1); + } + + @Override + protected void tearDown() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + broker = null; + } + + class TestRunner implements Runnable { + + CyclicBarrier barrier; + CountDownLatch latch; + TestRunner(CyclicBarrier barrier, CountDownLatch latch) { + this.barrier = barrier; + this.latch = latch; + } + + @Override + public void run() { + try { + barrier.await(); + sendMessages(pooledFactory); + this.latch.countDown(); + } catch (Exception e) { + exceptionList.add(e); + throw new RuntimeException(e); + } + } + } + + public void sendMessages(ConnectionFactory connectionFactory) throws Exception { + for (int i = 0; i < NUM_MESSAGES; i++) { + Connection connection = connectionFactory.createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(QUEUE); + MessageProducer producer = session.createProducer(destination); + + String msgTo = "hello"; + TextMessage message = session.createTextMessage(msgTo); + producer.send(message); + connection.close(); + logger.info("sent " + i + " messages using " + connectionFactory.getClass()); + } + } + + public void testCanExhaustSessions() throws Exception { + final int totalMessagesExpected = NUM_MESSAGES * 2; + final CountDownLatch latch = new CountDownLatch(2); + Thread thread = new Thread(new Runnable() { + public void run() { + try { + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri); + Connection connection = connectionFactory.createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(QUEUE); + MessageConsumer consumer = session.createConsumer(destination); + for (int i = 0; i < totalMessagesExpected; ++i) { + Message msg = consumer.receive(5000); + if (msg == null) { + return; + } + numReceived++; + if (numReceived % 20 == 0) { + logger.debug("received " + numReceived + " messages "); + System.runFinalization(); + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + thread.start(); + + ExecutorService threads = Executors.newFixedThreadPool(2); + final CyclicBarrier barrier = new CyclicBarrier(2, new Runnable() { + + @Override + public void run() { + System.out.println("Starting threads to send messages!"); + } + }); + + threads.execute(new TestRunner(barrier, latch)); + threads.execute(new TestRunner(barrier, latch)); + + latch.await(2, TimeUnit.SECONDS); + thread.join(); + + assertEquals(totalMessagesExpected, numReceived); + + } +} diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionExhaustionTest.java b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionExhaustionTest.java index 7ad15ca482..119d4a9f1b 100644 --- a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionExhaustionTest.java +++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionExhaustionTest.java @@ -32,9 +32,15 @@ import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; import org.apache.log4j.Logger; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + public class PooledSessionExhaustionTest extends TestCase { private static final String QUEUE = "FOO"; - private static final int NUM_MESSAGES = 700; + private static final int NUM_MESSAGES = 500; private Logger logger = Logger.getLogger(getClass()); @@ -43,6 +49,7 @@ public class PooledSessionExhaustionTest extends TestCase { private PooledConnectionFactory pooledFactory; private String connectionUri; private int numReceived = 0; + private final List exceptionList = new ArrayList(); @Override protected void setUp() throws Exception { @@ -57,6 +64,7 @@ public class PooledSessionExhaustionTest extends TestCase { pooledFactory.setConnectionFactory(factory); pooledFactory.setMaxConnections(1); pooledFactory.setBlockIfSessionPoolIsFull(false); + pooledFactory.setMaximumActiveSessionPerConnection(1); } @Override @@ -66,6 +74,25 @@ public class PooledSessionExhaustionTest extends TestCase { broker = null; } + class TestRunner implements Runnable { + + CyclicBarrier barrier; + TestRunner(CyclicBarrier barrier) { + this.barrier = barrier; + } + + @Override + public void run() { + try { + barrier.await(); + sendMessages(pooledFactory); + } catch (Exception e) { + exceptionList.add(e); + throw new RuntimeException(e); + } + } + } + public void sendMessages(ConnectionFactory connectionFactory) throws Exception { for (int i = 0; i < NUM_MESSAGES; i++) { Connection connection = connectionFactory.createConnection(); @@ -79,7 +106,7 @@ public class PooledSessionExhaustionTest extends TestCase { TextMessage message = session.createTextMessage(msgTo); producer.send(message); connection.close(); - logger.debug("sent " + i + " messages using " + connectionFactory.getClass()); + logger.info("sent " + i + " messages using " + connectionFactory.getClass()); } } @@ -112,9 +139,23 @@ public class PooledSessionExhaustionTest extends TestCase { }); thread.start(); - sendMessages(pooledFactory); + ExecutorService threads = Executors.newFixedThreadPool(2); + final CyclicBarrier barrier = new CyclicBarrier(2, new Runnable() { + + @Override + public void run() { + System.out.println("Starting threads to send messages!"); + } + }); + + threads.execute(new TestRunner(barrier)); + threads.execute(new TestRunner(barrier)); + thread.join(); + // we should expect that one of the threads will die because it cannot acquire a session, + // will throw an exception assertEquals(NUM_MESSAGES, numReceived); + assertEquals(exceptionList.size(), 1); } }