diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java index 6eda5561d2..bc2612bea8 100755 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -224,6 +224,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon // Configure a single threaded executor who's core thread can timeout if // idle executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() { + @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport); //Don't make these daemon threads - see https://issues.apache.org/jira/browse/AMQ-796 @@ -318,6 +319,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * @see Session#DUPS_OK_ACKNOWLEDGE * @since 1.1 */ + @Override public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { checkClosedOrFailed(); ensureConnectionInfoSent(); @@ -352,6 +354,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * @throws JMSException if the JMS provider fails to return the client ID * for this connection due to some internal error. */ + @Override public String getClientID() throws JMSException { checkClosedOrFailed(); return this.info.getClientId(); @@ -395,6 +398,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * a connection's client ID at the wrong time or when it has * been administratively configured. */ + @Override public void setClientID(String newClientID) throws JMSException { checkClosedOrFailed(); @@ -428,6 +432,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * metadata for this connection. * @see javax.jms.ConnectionMetaData */ + @Override public ConnectionMetaData getMetaData() throws JMSException { checkClosedOrFailed(); return ActiveMQConnectionMetaData.INSTANCE; @@ -445,6 +450,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * ExceptionListener for this connection. * @see javax.jms.Connection#setExceptionListener(ExceptionListener) */ + @Override public ExceptionListener getExceptionListener() throws JMSException { checkClosedOrFailed(); return this.exceptionListener; @@ -473,6 +479,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * @throws JMSException if the JMS provider fails to set the exception * listener for this connection. */ + @Override public void setExceptionListener(ExceptionListener listener) throws JMSException { checkClosedOrFailed(); this.exceptionListener = listener; @@ -511,6 +518,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * due to some internal error. * @see javax.jms.Connection#stop() */ + @Override public void start() throws JMSException { checkClosedOrFailed(); ensureConnectionInfoSent(); @@ -553,6 +561,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * due to some internal error. * @see javax.jms.Connection#start() */ + @Override public void stop() throws JMSException { checkClosedOrFailed(); if (started.compareAndSet(true, false)) { @@ -608,6 +617,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * release resources or to close a socket connection can * cause this exception to be thrown. */ + @Override public void close() throws JMSException { // Store the interrupted state and clear so that cleanup happens without // leaking connection resources. Reset in finally to preserve state. @@ -744,6 +754,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * @see javax.jms.ConnectionConsumer * @since 1.1 */ + @Override public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { return this.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages, false); @@ -1031,6 +1042,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * @return a lazily created destination source * @throws JMSException */ + @Override public DestinationSource getDestinationSource() throws JMSException { if (destinationSource == null) { destinationSource = new DestinationSource(this); @@ -1105,6 +1117,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * @see Session#CLIENT_ACKNOWLEDGE * @see Session#DUPS_OK_ACKNOWLEDGE */ + @Override public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException { return new ActiveMQTopicSession((ActiveMQSession)createSession(transacted, acknowledgeMode)); } @@ -1133,6 +1146,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * invalid. * @see javax.jms.ConnectionConsumer */ + @Override public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, false); } @@ -1161,6 +1175,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * invalid. * @see javax.jms.ConnectionConsumer */ + @Override public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, false); } @@ -1190,6 +1205,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * @see javax.jms.ConnectionConsumer * @since 1.1 */ + @Override public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, false); } @@ -1250,6 +1266,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * @see Session#CLIENT_ACKNOWLEDGE * @see Session#DUPS_OK_ACKNOWLEDGE */ + @Override public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException { return new ActiveMQQueueSession((ActiveMQSession)createSession(transacted, acknowledgeMode)); } @@ -1432,6 +1449,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon /** * @return statistics for this Connection */ + @Override public StatsImpl getStats() { return stats; } @@ -1590,6 +1608,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon started.set(false); } + @Override public void finalize() throws Throwable{ Scheduler s = this.scheduler; if (s != null){ @@ -1811,6 +1830,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon /** * @param o - the command to consume */ + @Override public void onCommand(final Object o) { final Command command = (Command)o; if (!closed.get() && command != null) { @@ -1832,6 +1852,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon msg.setReadOnlyProperties(true); msg.setRedeliveryCounter(md.getRedeliveryCounter()); msg.setConnection(ActiveMQConnection.this); + msg.setMemoryUsage(null); md.setMessage(msg); } dispatcher.dispatch(md); @@ -1862,6 +1883,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon @Override public Response processConnectionError(final ConnectionError error) throws Exception { executor.execute(new Runnable() { + @Override public void run() { onAsyncException(error.getException()); } @@ -1922,6 +1944,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon if ( !closed.get() && !closing.get() ) { if ( this.clientInternalExceptionListener != null ) { executor.execute(new Runnable() { + @Override public void run() { ActiveMQConnection.this.clientInternalExceptionListener.onException(error); } @@ -1948,6 +1971,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon final JMSException e = (JMSException)error; executor.execute(new Runnable() { + @Override public void run() { ActiveMQConnection.this.exceptionListener.onException(e); } @@ -1959,10 +1983,12 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon } } + @Override public void onException(final IOException error) { onAsyncException(error); if (!closing.get() && !closed.get()) { executor.execute(new Runnable() { + @Override public void run() { transportFailed(error); ServiceSupport.dispose(ActiveMQConnection.this.transport); @@ -1981,6 +2007,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon } } + @Override public void transportInterupted() { this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size() - (advisoryConsumer != null ? 1:0)); if (LOG.isDebugEnabled()) { @@ -2003,6 +2030,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon } } + @Override public void transportResumed() { for (Iterator iter = transportListeners.iterator(); iter.hasNext();) { TransportListener listener = iter.next(); @@ -2141,34 +2169,42 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon this.objectMessageSerializationDefered = objectMessageSerializationDefered; } + @Override public InputStream createInputStream(Destination dest) throws JMSException { return createInputStream(dest, null); } + @Override public InputStream createInputStream(Destination dest, String messageSelector) throws JMSException { return createInputStream(dest, messageSelector, false); } + @Override public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException { return createInputStream(dest, messageSelector, noLocal, -1); } + @Override public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException { return doCreateInputStream(dest, messageSelector, noLocal, null, timeout); } + @Override public InputStream createDurableInputStream(Topic dest, String name) throws JMSException { return createInputStream(dest, null, false); } + @Override public InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException { return createDurableInputStream(dest, name, messageSelector, false); } + @Override public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException { return createDurableInputStream(dest, name, messageSelector, noLocal, -1); } + @Override public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal, long timeout) throws JMSException { return doCreateInputStream(dest, messageSelector, noLocal, name, timeout); } @@ -2183,6 +2219,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * Creates a persistent output stream; individual messages will be written * to disk/database by the broker */ + @Override public OutputStream createOutputStream(Destination dest) throws JMSException { return createOutputStream(dest, null, ActiveMQMessage.DEFAULT_DELIVERY_MODE, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE); } @@ -2207,6 +2244,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * {@link javax.jms.Message#setObjectProperty(String, Object)} * method */ + @Override public OutputStream createOutputStream(Destination dest, Map streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException { checkClosedOrFailed(); ensureConnectionInfoSent(); @@ -2232,6 +2270,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * specified. * @since 1.1 */ + @Override public void unsubscribe(String name) throws InvalidDestinationException, JMSException { checkClosedOrFailed(); RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo(); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4116Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4116Test.java new file mode 100644 index 0000000000..ffd69f1750 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4116Test.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.bugs; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQQueue; +import org.junit.Assert; + +public class AMQ4116Test extends EmbeddedBrokerTestSupport { + + private final String tcpAddr = "tcp://localhost:0"; + private String connectionUri; + + /** + * In this test, a message is produced and consumed from the test queue. + * Memory usage on the test queue should be reset to 0. The memory that was + * consumed is then sent to a second queue. Memory usage on the original + * test queue should remain 0, but actually increased when the second + * enqueue occurs. + */ + public void testVMTransport() throws Exception { + runTest(connectionFactory); + } + + /** + * This is an analog to the previous test, but occurs over TCP and passes. + */ + public void testTCPTransport() throws Exception { + runTest(new ActiveMQConnectionFactory(connectionUri)); + } + + private void runTest(ConnectionFactory connFactory) throws Exception { + // Verify that test queue is empty and not using any memory. + Destination physicalDestination = broker.getDestination(destination); + Assert.assertEquals(0, physicalDestination.getMemoryUsage().getUsage()); + + // Enqueue a single message and verify that the test queue is using + // memory. + Connection conn = connFactory.createConnection(); + conn.start(); + Session session = conn.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(destination); + + producer.send(new ActiveMQMessage()); + + // Commit, which ensures message is in queue and memory usage updated. + session.commit(); + Assert.assertTrue(physicalDestination.getMemoryUsage().getUsage() > 0); + + // Consume the message and verify that the test queue is no longer using + // any memory. + MessageConsumer consumer = session.createConsumer(destination); + Message received = consumer.receive(); + Assert.assertNotNull(received); + + // Commit, which ensures message is removed from queue and memory usage + // updated. + session.commit(); + Assert.assertEquals(0, physicalDestination.getMemoryUsage().getUsage()); + + // Resend the message to a different queue and verify that the original + // test queue is still not using any memory. + ActiveMQQueue secondDestination = new ActiveMQQueue(AMQ4116Test.class + ".second"); + MessageProducer secondPproducer = session.createProducer(secondDestination); + + secondPproducer.send(received); + + // Commit, which ensures message is in queue and memory usage updated. + // NOTE: This assertion fails due to bug. + session.commit(); + Assert.assertEquals(0, physicalDestination.getMemoryUsage().getUsage()); + + conn.stop(); + } + + /** + * Create an embedded broker that has both TCP and VM connectors. + */ + @Override + protected BrokerService createBroker() throws Exception { + BrokerService broker = super.createBroker(); + connectionUri = broker.addConnector(tcpAddr).getPublishableConnectString(); + return broker; + } +}