From 2a7c3499766c961c6996429af82c3ee374d24a33 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Mon, 13 Jan 2014 14:05:04 -0500 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-4968 Add non-caching mode for Session producers. --- .../activemq/jms/pool/ConnectionPool.java | 13 +- .../jms/pool/PooledConnectionFactory.java | 28 ++++ .../activemq/jms/pool/PooledProducer.java | 24 +++- .../activemq/jms/pool/PooledSession.java | 136 ++++++++++++------ .../PooledSessionNoPublisherCachingTest.java | 129 +++++++++++++++++ .../activemq/jms/pool/PooledSessionTest.java | 71 +++++++++ 6 files changed, 358 insertions(+), 43 deletions(-) create mode 100644 activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionNoPublisherCachingTest.java diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java index df2da170f7..08d20380e1 100644 --- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java +++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java @@ -22,9 +22,9 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.Connection; +import javax.jms.IllegalStateException; import javax.jms.JMSException; import javax.jms.Session; -import javax.jms.IllegalStateException; import org.apache.commons.pool.KeyedPoolableObjectFactory; import org.apache.commons.pool.impl.GenericKeyedObjectPool; @@ -51,6 +51,7 @@ public class ConnectionPool { private boolean hasExpired; private int idleTimeout = 30 * 1000; private long expiryTimeout = 0l; + private boolean useAnonymousProducers = true; private final AtomicBoolean started = new AtomicBoolean(false); private final GenericKeyedObjectPool sessionPool; @@ -78,7 +79,7 @@ public class ConnectionPool { @Override public PooledSession makeObject(SessionKey key) throws Exception { Session session = makeSession(key); - return new PooledSession(key, session, sessionPool, key.isTransacted()); + return new PooledSession(key, session, sessionPool, key.isTransacted(), useAnonymousProducers); } @Override @@ -248,6 +249,14 @@ public class ConnectionPool { this.sessionPool.setMaxActive(maximumActiveSessionPerConnection); } + public boolean isUseAnonymousProducers() { + return this.useAnonymousProducers; + } + + public void setUseAnonymousProducers(boolean value) { + this.useAnonymousProducers = value; + } + /** * @return the total number of Pooled session including idle sessions that are not * currently loaned out to any client. diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java index 64eaad2d6f..9ac853d90c 100644 --- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java +++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java @@ -70,6 +70,7 @@ public class PooledConnectionFactory implements ConnectionFactory { private boolean blockIfSessionPoolIsFull = true; private long expiryTimeout = 0l; private boolean createConnectionOnStartup = true; + private boolean useAnonymousProducers = true; public void initConnectionsPool() { if (this.connectionsPool == null) { @@ -101,6 +102,7 @@ public class PooledConnectionFactory implements ConnectionFactory { connection.setExpiryTimeout(getExpiryTimeout()); connection.setMaximumActiveSessionPerConnection(getMaximumActiveSessionPerConnection()); connection.setBlockIfSessionPoolIsFull(isBlockIfSessionPoolIsFull()); + connection.setUseAnonymousProducers(isUseAnonymousProducers()); if (LOG.isTraceEnabled()) { LOG.trace("Created new connection: {}", connection); @@ -426,6 +428,32 @@ public class PooledConnectionFactory implements ConnectionFactory { this.createConnectionOnStartup = createConnectionOnStartup; } + /** + * Should Sessions use one anonymous producer for all producer requests or should a new + * MessageProducer be created for each request to create a producer object, default is true. + * + * When enabled the session only needs to allocate one MessageProducer for all requests and + * the MessageProducer#send(destination, message) method can be used. Normally this is the + * right thing to do however it does result in the Broker not showing the producers per + * destination. + * + * @return true if a PooledSession will use only a single anonymous message producer instance. + */ + public boolean isUseAnonymousProducers() { + return this.useAnonymousProducers; + } + + /** + * Sets whether a PooledSession uses only one anonymous MessageProducer instance or creates + * a new MessageProducer for each call the create a MessageProducer. + * + * @param value + * Boolean value that configures whether anonymous producers are used. + */ + public void setUseAnonymousProducers(boolean value) { + this.useAnonymousProducers = value; + } + /** * Gets the Pool of ConnectionPool instances which are keyed by different ConnectionKeys. * diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledProducer.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledProducer.java index 817a1f1c92..7f54b99173 100644 --- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledProducer.java +++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledProducer.java @@ -17,6 +17,7 @@ package org.apache.activemq.jms.pool; import javax.jms.Destination; +import javax.jms.InvalidDestinationException; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; @@ -34,10 +35,12 @@ public class PooledProducer implements MessageProducer { private boolean disableMessageTimestamp; private int priority; private long timeToLive; + private boolean anonymous = true; public PooledProducer(MessageProducer messageProducer, Destination destination) throws JMSException { this.messageProducer = messageProducer; this.destination = destination; + this.anonymous = messageProducer.getDestination() == null; this.deliveryMode = messageProducer.getDeliveryMode(); this.disableMessageID = messageProducer.getDisableMessageID(); @@ -48,6 +51,9 @@ public class PooledProducer implements MessageProducer { @Override public void close() throws JMSException { + if (!anonymous) { + this.messageProducer.close(); + } } @Override @@ -67,13 +73,25 @@ public class PooledProducer implements MessageProducer { @Override public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { + if (destination == null) { - destination = this.destination; + if (messageProducer.getDestination() == null) { + throw new UnsupportedOperationException("A destination must be specified."); + } + throw new InvalidDestinationException("Don't understand null destinations"); } + MessageProducer messageProducer = getMessageProducer(); // just in case let only one thread send at once synchronized (messageProducer) { + + if (anonymous && !this.destination.equals(destination)) { + throw new UnsupportedOperationException("This producer can only send messages to: " + this.destination); + } + + // Producer will do it's own Destination validation so always use the destination + // based send method otherwise we might violate a JMS rule. messageProducer.send(destination, message, deliveryMode, priority, timeToLive); } } @@ -139,6 +157,10 @@ public class PooledProducer implements MessageProducer { return messageProducer; } + protected boolean isAnonymous() { + return anonymous; + } + @Override public String toString() { return "PooledProducer { " + messageProducer + " }"; diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java index d0e4a09275..1d3fc2f6c1 100644 --- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java +++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java @@ -57,22 +57,24 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes private final KeyedObjectPool sessionPool; private final CopyOnWriteArrayList consumers = new CopyOnWriteArrayList(); private final CopyOnWriteArrayList browsers = new CopyOnWriteArrayList(); - private final CopyOnWriteArrayList sessionEventListeners = - new CopyOnWriteArrayList(); + private final CopyOnWriteArrayList sessionEventListeners = new CopyOnWriteArrayList(); + + private MessageProducer producer; + private TopicPublisher publisher; + private QueueSender sender; private Session session; - private MessageProducer messageProducer; - private QueueSender queueSender; - private TopicPublisher topicPublisher; private boolean transactional = true; private boolean ignoreClose; private boolean isXa; + private boolean useAnonymousProducers = true; - public PooledSession(SessionKey key, Session session, KeyedObjectPool sessionPool, boolean transactional) { + public PooledSession(SessionKey key, Session session, KeyedObjectPool sessionPool, boolean transactional, boolean anonymous) { this.key = key; this.session = session; this.sessionPool = sessionPool; this.transactional = transactional; + this.useAnonymousProducers = anonymous; } public void addSessionEventListener(PooledSessionEventListener listener) { @@ -268,7 +270,7 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes @Override public XAResource getXAResource() { if (session instanceof XASession) { - return ((XASession)session).getXAResource(); + return ((XASession) session).getXAResource(); } return null; } @@ -334,53 +336,39 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes @Override public TopicSubscriber createSubscriber(Topic topic) throws JMSException { - return addTopicSubscriber(((TopicSession)getInternalSession()).createSubscriber(topic)); + return addTopicSubscriber(((TopicSession) getInternalSession()).createSubscriber(topic)); } @Override public TopicSubscriber createSubscriber(Topic topic, String selector, boolean local) throws JMSException { - return addTopicSubscriber(((TopicSession)getInternalSession()).createSubscriber(topic, selector, local)); + return addTopicSubscriber(((TopicSession) getInternalSession()).createSubscriber(topic, selector, local)); } @Override public QueueReceiver createReceiver(Queue queue) throws JMSException { - return addQueueReceiver(((QueueSession)getInternalSession()).createReceiver(queue)); + return addQueueReceiver(((QueueSession) getInternalSession()).createReceiver(queue)); } @Override public QueueReceiver createReceiver(Queue queue, String selector) throws JMSException { - return addQueueReceiver(((QueueSession)getInternalSession()).createReceiver(queue, selector)); + return addQueueReceiver(((QueueSession) getInternalSession()).createReceiver(queue, selector)); } // Producer related methods // ------------------------------------------------------------------------- @Override public MessageProducer createProducer(Destination destination) throws JMSException { - return new PooledProducer(getMessageProducer(), destination); + return new PooledProducer(getMessageProducer(destination), destination); } @Override public QueueSender createSender(Queue queue) throws JMSException { - return new PooledQueueSender(getQueueSender(), queue); + return new PooledQueueSender(getQueueSender(queue), queue); } @Override public TopicPublisher createPublisher(Topic topic) throws JMSException { - return new PooledTopicPublisher(getTopicPublisher(), topic); - } - - /** - * Callback invoked when the consumer is closed. - *

- * This is used to keep track of an explicit closed consumer created by this - * session, by which we know do not need to keep track of the consumer, as - * its already closed. - * - * @param consumer - * the consumer which is being closed - */ - protected void onConsumerClose(MessageConsumer consumer) { - consumers.remove(consumer); + return new PooledTopicPublisher(getTopicPublisher(topic), topic); } public Session getInternalSession() throws IllegalStateException { @@ -391,24 +379,78 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes } public MessageProducer getMessageProducer() throws JMSException { - if (messageProducer == null) { - messageProducer = getInternalSession().createProducer(null); + return getMessageProducer(null); + } + + public MessageProducer getMessageProducer(Destination destination) throws JMSException { + MessageProducer result = null; + + if (useAnonymousProducers) { + if (producer == null) { + // Don't allow for duplicate anonymous producers. + synchronized (this) { + if (producer == null) { + producer = getInternalSession().createProducer(null); + } + } + } + + result = producer; + } else { + result = getInternalSession().createProducer(destination); } - return messageProducer; + + return result; } public QueueSender getQueueSender() throws JMSException { - if (queueSender == null) { - queueSender = ((QueueSession)getInternalSession()).createSender(null); + return getQueueSender(null); + } + + public QueueSender getQueueSender(Queue destination) throws JMSException { + QueueSender result = null; + + if (useAnonymousProducers) { + if (sender == null) { + // Don't allow for duplicate anonymous producers. + synchronized (this) { + if (sender == null) { + sender = ((QueueSession) getInternalSession()).createSender(null); + } + } + } + + result = sender; + } else { + result = ((QueueSession) getInternalSession()).createSender(destination); } - return queueSender; + + return result; } public TopicPublisher getTopicPublisher() throws JMSException { - if (topicPublisher == null) { - topicPublisher = ((TopicSession)getInternalSession()).createPublisher(null); + return getTopicPublisher(null); + } + + public TopicPublisher getTopicPublisher(Topic destination) throws JMSException { + TopicPublisher result = null; + + if (useAnonymousProducers) { + if (publisher == null) { + // Don't allow for duplicate anonymous producers. + synchronized (this) { + if (publisher == null) { + publisher = ((TopicSession) getInternalSession()).createPublisher(null); + } + } + } + + result = publisher; + } else { + result = ((TopicSession) getInternalSession()).createPublisher(destination); } - return topicPublisher; + + return result; } private QueueBrowser addQueueBrowser(QueueBrowser browser) { @@ -418,9 +460,9 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes private MessageConsumer addConsumer(MessageConsumer consumer) { consumers.add(consumer); - // must wrap in PooledMessageConsumer to ensure the onConsumerClose method is - // invoked when the returned consumer is closed, to avoid memory leak in this - // session class in case many consumers is created + // must wrap in PooledMessageConsumer to ensure the onConsumerClose + // method is invoked when the returned consumer is closed, to avoid memory + // leak in this session class in case many consumers is created return new PooledMessageConsumer(this, consumer); } @@ -442,4 +484,18 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes public String toString() { return "PooledSession { " + session + " }"; } + + /** + * Callback invoked when the consumer is closed. + *

+ * This is used to keep track of an explicit closed consumer created by this + * session, by which we know do not need to keep track of the consumer, as + * its already closed. + * + * @param consumer + * the consumer which is being closed + */ + protected void onConsumerClose(MessageConsumer consumer) { + consumers.remove(consumer); + } } diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionNoPublisherCachingTest.java b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionNoPublisherCachingTest.java new file mode 100644 index 0000000000..66713764db --- /dev/null +++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionNoPublisherCachingTest.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.jms.pool; + +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.fail; + +import javax.jms.Queue; +import javax.jms.QueueSession; +import javax.jms.Session; +import javax.jms.Topic; +import javax.jms.TopicSession; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class PooledSessionNoPublisherCachingTest { + + private BrokerService broker; + private ActiveMQConnectionFactory factory; + private PooledConnectionFactory pooledFactory; + private String connectionUri; + + @Before + public void setUp() throws Exception { + broker = new BrokerService(); + broker.setPersistent(false); + broker.setUseJmx(false); + TransportConnector connector = broker.addConnector("tcp://localhost:0"); + broker.start(); + connectionUri = connector.getPublishableConnectString(); + factory = new ActiveMQConnectionFactory(connectionUri); + pooledFactory = new PooledConnectionFactory(); + pooledFactory.setConnectionFactory(factory); + pooledFactory.setMaxConnections(1); + pooledFactory.setBlockIfSessionPoolIsFull(false); + pooledFactory.setUseAnonymousProducers(false); + } + + @After + public void tearDown() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + broker = null; + } + + @Test + public void testMessageProducersAreUnique() throws Exception { + PooledConnection connection = (PooledConnection) pooledFactory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue queue1 = session.createTemporaryQueue(); + Queue queue2 = session.createTemporaryQueue(); + + PooledProducer producer1 = (PooledProducer) session.createProducer(queue1); + PooledProducer producer2 = (PooledProducer) session.createProducer(queue2); + + assertNotSame(producer1.getMessageProducer(), producer2.getMessageProducer()); + } + + @Test + public void testThrowsWhenDestinationGiven() throws Exception { + PooledConnection connection = (PooledConnection) pooledFactory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue queue1 = session.createTemporaryQueue(); + Queue queue2 = session.createTemporaryQueue(); + + PooledProducer producer = (PooledProducer) session.createProducer(queue1); + + try { + producer.send(queue2, session.createTextMessage()); + fail("Should only be able to send to queue 1"); + } catch (Exception ex) { + } + + try { + producer.send(null, session.createTextMessage()); + fail("Should only be able to send to queue 1"); + } catch (Exception ex) { + } + } + + @Test + public void testCreateTopicPublisher() throws Exception { + PooledConnection connection = (PooledConnection) pooledFactory.createConnection(); + TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + + Topic topic1 = session.createTopic("Topic-1"); + Topic topic2 = session.createTopic("Topic-2"); + + PooledTopicPublisher publisher1 = (PooledTopicPublisher) session.createPublisher(topic1); + PooledTopicPublisher publisher2 = (PooledTopicPublisher) session.createPublisher(topic2); + + assertNotSame(publisher1.getMessageProducer(), publisher2.getMessageProducer()); + } + + @Test + public void testQueueSender() throws Exception { + PooledConnection connection = (PooledConnection) pooledFactory.createConnection(); + QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue queue1 = session.createTemporaryQueue(); + Queue queue2 = session.createTemporaryQueue(); + + PooledQueueSender sender1 = (PooledQueueSender) session.createSender(queue1); + PooledQueueSender sender2 = (PooledQueueSender) session.createSender(queue2); + + assertNotSame(sender1.getMessageProducer(), sender2.getMessageProducer()); + } +} diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java index a60d053e7a..7483e6bbd3 100644 --- a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java +++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java @@ -17,8 +17,14 @@ package org.apache.activemq.jms.pool; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.fail; +import javax.jms.Queue; +import javax.jms.QueueSession; import javax.jms.Session; +import javax.jms.Topic; +import javax.jms.TopicSession; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; @@ -68,4 +74,69 @@ public class PooledSessionTest { assertEquals(1, connection.getNumtIdleSessions()); assertEquals(1, connection.getNumSessions()); } + + @Test + public void testMessageProducersAreAllTheSame() throws Exception { + PooledConnection connection = (PooledConnection) pooledFactory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue queue1 = session.createTemporaryQueue(); + Queue queue2 = session.createTemporaryQueue(); + + PooledProducer producer1 = (PooledProducer) session.createProducer(queue1); + PooledProducer producer2 = (PooledProducer) session.createProducer(queue2); + + assertSame(producer1.getMessageProducer(), producer2.getMessageProducer()); + } + + @Test + public void testThrowsWhenDifferentDestinationGiven() throws Exception { + PooledConnection connection = (PooledConnection) pooledFactory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue queue1 = session.createTemporaryQueue(); + Queue queue2 = session.createTemporaryQueue(); + + PooledProducer producer = (PooledProducer) session.createProducer(queue1); + + try { + producer.send(queue2, session.createTextMessage()); + fail("Should only be able to send to queue 1"); + } catch (Exception ex) { + } + + try { + producer.send(null, session.createTextMessage()); + fail("Should only be able to send to queue 1"); + } catch (Exception ex) { + } + } + + @Test + public void testCreateTopicPublisher() throws Exception { + PooledConnection connection = (PooledConnection) pooledFactory.createConnection(); + TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + + Topic topic1 = session.createTopic("Topic-1"); + Topic topic2 = session.createTopic("Topic-2"); + + PooledTopicPublisher publisher1 = (PooledTopicPublisher) session.createPublisher(topic1); + PooledTopicPublisher publisher2 = (PooledTopicPublisher) session.createPublisher(topic2); + + assertSame(publisher1.getMessageProducer(), publisher2.getMessageProducer()); + } + + @Test + public void testQueueSender() throws Exception { + PooledConnection connection = (PooledConnection) pooledFactory.createConnection(); + QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue queue1 = session.createTemporaryQueue(); + Queue queue2 = session.createTemporaryQueue(); + + PooledQueueSender sender1 = (PooledQueueSender) session.createSender(queue1); + PooledQueueSender sender2 = (PooledQueueSender) session.createSender(queue2); + + assertSame(sender1.getMessageProducer(), sender2.getMessageProducer()); + } }