diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java index 3f94d6ca6a..6e34bb4b90 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.broker.region; +import java.io.IOException; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; @@ -23,6 +24,8 @@ import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQTempDestination; import org.apache.activemq.store.MessageStore; import org.apache.activemq.thread.TaskRunnerFactory; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; /** * The Queue is a List of MessageEntry objects that are dispatched to matching @@ -31,6 +34,7 @@ import org.apache.activemq.thread.TaskRunnerFactory; * @version $Revision: 1.28 $ */ public class TempQueue extends Queue{ + private static final Log LOG = LogFactory.getLog(TempQueue.class); private final ActiveMQTempDestination tempDest; @@ -50,6 +54,7 @@ public class TempQueue extends Queue{ this.tempDest = (ActiveMQTempDestination) destination; } + @Override public void initialize() throws Exception { this.messages=new VMPendingMessageCursor(); this.messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); @@ -58,6 +63,7 @@ public class TempQueue extends Queue{ this.taskRunner = taskFactory.createTaskRunner(this, "TempQueue: " + destination.getPhysicalName()); } + @Override public void addSubscription(ConnectionContext context, Subscription sub) throws Exception { // Only consumers on the same connection can consume from // the temporary destination @@ -74,4 +80,14 @@ public class TempQueue extends Queue{ } super.addSubscription(context, sub); } + + @Override + public void dispose(ConnectionContext context) throws IOException { + try { + purge(); + } catch (Exception e) { + LOG.warn("Caught an exception purging Queue: " + destination); + } + super.dispose(context); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java index 7f9bff7302..6492e29b8a 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java @@ -86,7 +86,12 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i clearIterator(true); recovered = true; } else { - LOG.error(regionDestination.getActiveMQDestination().getPhysicalName() + " cursor got duplicate: " + message); + /* + * we should expect to get these - as the message is recorded as it before it goes into + * the cache. If subsequently, we pull out that message from the store (before its deleted) + * it will be a duplicate - but should be ignored + */ + //LOG.error(regionDestination.getActiveMQDestination().getPhysicalName() + " cursor got duplicate: " + message); storeHasMessages = true; } return recovered; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java index 7103e045a9..fb128f7bd3 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java @@ -20,7 +20,6 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; import java.util.List; - import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; @@ -33,38 +32,39 @@ import org.apache.activemq.broker.region.QueueMessageReference; * @version $Revision$ */ public class VMPendingMessageCursor extends AbstractPendingMessageCursor { - private LinkedList list = new LinkedList(); + private final LinkedList list = new LinkedList(); private Iterator iter; - public VMPendingMessageCursor(){ - this.useCache=false; + public VMPendingMessageCursor() { + this.useCache = false; } - @Override - public synchronized List remove(ConnectionContext context, Destination destination) throws Exception { - List rc = new ArrayList(); + public synchronized List remove(ConnectionContext context, Destination destination) + throws Exception { + List rc = new ArrayList(); for (Iterator iterator = list.iterator(); iterator.hasNext();) { MessageReference r = iterator.next(); - if( r.getRegionDestination()==destination ) { + if (r.getRegionDestination() == destination) { r.decrementReferenceCount(); rc.add(r); iterator.remove(); } } - return rc ; + return rc; } - + /** * @return true if there are no pending messages */ + @Override public synchronized boolean isEmpty() { if (list.isEmpty()) { return true; } else { for (Iterator iterator = list.iterator(); iterator.hasNext();) { MessageReference node = iterator.next(); - if (node== QueueMessageReference.NULL_MESSAGE){ - continue; + if (node == QueueMessageReference.NULL_MESSAGE) { + continue; } if (!node.isDropped()) { return false; @@ -79,6 +79,7 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor { /** * reset the cursor */ + @Override public synchronized void reset() { iter = list.listIterator(); last = null; @@ -89,6 +90,7 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor { * * @param node */ + @Override public synchronized void addMessageLast(MessageReference node) { node.incrementReferenceCount(); list.addLast(node); @@ -100,6 +102,7 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor { * @param position * @param node */ + @Override public synchronized void addMessageFirst(MessageReference node) { node.incrementReferenceCount(); list.addFirst(node); @@ -108,6 +111,7 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor { /** * @return true if there pending messages to dispatch */ + @Override public synchronized boolean hasNext() { return iter.hasNext(); } @@ -115,8 +119,9 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor { /** * @return the next pending message */ + @Override public synchronized MessageReference next() { - last = (MessageReference)iter.next(); + last = iter.next(); if (last != null) { last.incrementReferenceCount(); } @@ -126,6 +131,7 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor { /** * remove the message at the cursor position */ + @Override public synchronized void remove() { if (last != null) { last.decrementReferenceCount(); @@ -136,6 +142,7 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor { /** * @return the number of pending messages */ + @Override public synchronized int size() { return list.size(); } @@ -143,10 +150,16 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor { /** * clear all pending messages */ + @Override public synchronized void clear() { + for (Iterator i = list.iterator(); i.hasNext();) { + MessageReference ref = i.next(); + ref.decrementReferenceCount(); + } list.clear(); } + @Override public synchronized void remove(MessageReference node) { for (Iterator i = list.iterator(); i.hasNext();) { MessageReference ref = i.next(); @@ -164,11 +177,19 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor { * @param maxItems * @return a list of paged in messages */ + @Override public LinkedList pageInList(int maxItems) { return list; } - + + @Override public boolean isTransient() { return true; } + + @Override + public void destroy() throws Exception { + super.destroy(); + clear(); + } } diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2616Test.java b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2616Test.java new file mode 100644 index 0000000000..3919c1a9a6 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2616Test.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.io.File; +import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.Topic; +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.util.IOHelper; + +public class AMQ2616Test extends TestCase { + private static final int NUMBER = 2000; + private BrokerService brokerService; + private final ArrayList threads = new ArrayList(); + String ACTIVEMQ_BROKER_BIND = "tcp://0.0.0.0:61616"; + AtomicBoolean shutdown = new AtomicBoolean(); + + public void testQueueResourcesReleased() throws Exception{ + ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_BIND); + Connection tempConnection = fac.createConnection(); + tempConnection.start(); + Session tempSession = tempConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue tempQueue = tempSession.createTemporaryQueue(); + final MessageConsumer tempConsumer = tempSession.createConsumer(tempQueue); + + Connection testConnection = fac.createConnection(); + long startUsage = brokerService.getSystemUsage().getMemoryUsage().getUsage(); + Session testSession = testConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer testProducer = testSession.createProducer(tempQueue); + byte[] payload = new byte[1024*4]; + for (int i = 0; i < NUMBER; i++ ) { + BytesMessage msg = testSession.createBytesMessage(); + msg.writeBytes(payload); + testProducer.send(msg); + } + long endUsage = brokerService.getSystemUsage().getMemoryUsage().getUsage(); + assertFalse(startUsage==endUsage); + tempConnection.close(); + Thread.sleep(1000); + endUsage = brokerService.getSystemUsage().getMemoryUsage().getUsage(); + assertEquals(startUsage,endUsage); + } + + public void testTopicResourcesReleased() throws Exception{ + ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_BIND); + Connection tempConnection = fac.createConnection(); + tempConnection.start(); + Session tempSession = tempConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic tempTopic = tempSession.createTemporaryTopic(); + final MessageConsumer tempConsumer = tempSession.createConsumer(tempTopic); + + Connection testConnection = fac.createConnection(); + long startUsage = brokerService.getSystemUsage().getMemoryUsage().getUsage(); + Session testSession = testConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer testProducer = testSession.createProducer(tempTopic); + byte[] payload = new byte[1024*4]; + for (int i = 0; i < NUMBER; i++ ) { + BytesMessage msg = testSession.createBytesMessage(); + msg.writeBytes(payload); + testProducer.send(msg); + } + long endUsage = brokerService.getSystemUsage().getMemoryUsage().getUsage(); + assertFalse(startUsage==endUsage); + tempConnection.close(); + Thread.sleep(1000); + endUsage = brokerService.getSystemUsage().getMemoryUsage().getUsage(); + assertEquals(startUsage,endUsage); + } + + + @Override + protected void setUp() throws Exception { + // Start an embedded broker up. + brokerService = new BrokerService(); + + KahaDBPersistenceAdapter adaptor = new KahaDBPersistenceAdapter(); + adaptor.setEnableJournalDiskSyncs(false); + File file = new File("target/AMQ2616Test"); + IOHelper.mkdirs(file); + IOHelper.deleteChildren(file); + adaptor.setDirectory(file); + brokerService.setPersistenceAdapter(adaptor); + + PolicyMap policyMap = new PolicyMap(); + PolicyEntry pe = new PolicyEntry(); + pe.setMemoryLimit(10 * 1024 * 1024); + pe.setOptimizedDispatch(true); + pe.setProducerFlowControl(false); + pe.setExpireMessagesPeriod(1000); + pe.setPendingQueuePolicy(new FilePendingQueueMessageStoragePolicy()); + policyMap.put(new ActiveMQQueue(">"), pe); + brokerService.setDestinationPolicy(policyMap); + brokerService.getSystemUsage().getMemoryUsage().setLimit(20 * 1024 * 1024); + brokerService.getSystemUsage().getTempUsage().setLimit(200 * 1024 * 1024); + brokerService.addConnector(ACTIVEMQ_BROKER_BIND); + brokerService.start(); + new ActiveMQQueue(getName()); + } + + @Override + protected void tearDown() throws Exception { + // Stop any running threads. + shutdown.set(true); + for (Thread t : threads) { + t.interrupt(); + t.join(); + } + brokerService.stop(); + } + +} diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsClient.java b/activemq-core/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsClient.java new file mode 100644 index 0000000000..f6fd1754bb --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsClient.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs.amq1974; + +import java.io.File; +import java.net.URISyntaxException; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.network.DiscoveryNetworkConnector; +import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter; +import org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent; + +public class TryJmsClient +{ + private final BrokerService broker = new BrokerService(); + + public static void main(String[] args) throws Exception { + new TryJmsClient().start(); + } + + private void start() throws Exception { + + broker.setUseJmx(false); + broker.setPersistent(true); + broker.setBrokerName("TestBroker"); + broker.getSystemUsage().setSendFailIfNoSpace(true); + + broker.getSystemUsage().getMemoryUsage().setLimit(10 * 1024 * 1024); + + KahaPersistenceAdapter persist = new KahaPersistenceAdapter(); + persist.setDirectory(new File("/tmp/broker2")); + persist.setMaxDataFileLength(20 * 1024 * 1024); + broker.setPersistenceAdapter(persist); + + String brokerUrl = "tcp://localhost:4501"; + broker.addConnector(brokerUrl); + + broker.start(); + + addNetworkBroker(); + + startUsageMonitor(broker); + + startMessageSend(); + + synchronized(this) { + this.wait(); + } + } + + private void startUsageMonitor(final BrokerService brokerService) { + new Thread(new Runnable() { + public void run() { + while (true) { + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + System.out.println("ActiveMQ memeory " + brokerService.getSystemUsage().getMemoryUsage().getPercentUsage() + + " " + brokerService.getSystemUsage().getMemoryUsage().getUsage()); + System.out.println("ActiveMQ message store " + brokerService.getSystemUsage().getStoreUsage().getPercentUsage()); + System.out.println("ActiveMQ temp space " + brokerService.getSystemUsage().getTempUsage().getPercentUsage()); + } + } + }).start(); + } + + private void addNetworkBroker() throws Exception { + + DiscoveryNetworkConnector dnc = new DiscoveryNetworkConnector(); + dnc.setNetworkTTL(1); + dnc.setBrokerName("TestBroker"); + dnc.setName("Broker1Connector"); + dnc.setDynamicOnly(true); + + SimpleDiscoveryAgent discoveryAgent = new SimpleDiscoveryAgent(); + String remoteUrl = "tcp://localhost:4500"; + discoveryAgent.setServices(remoteUrl); + + dnc.setDiscoveryAgent(discoveryAgent); + + broker.addNetworkConnector(dnc); + dnc.start(); + } + + private void startMessageSend() { + new Thread(new MessageSend()).start(); + } + + private class MessageSend implements Runnable { + public void run() { + try { + String url = "vm://TestBroker"; + ActiveMQConnection connection = ActiveMQConnection.makeConnection(url); + connection.setDispatchAsync(true); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination dest = session.createTopic("TestDestination"); + + MessageProducer producer = session.createProducer(dest); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + for(int i = 0; i < 99999999; i++) { + TextMessage message = session.createTextMessage("test" + i); + + /* + try { + Thread.sleep(1); + } catch (InterruptedException e) { + e.printStackTrace(); + } + */ + + try { + producer.send(message); + } catch (Exception e ) { + e.printStackTrace(); + System.out.println("TOTAL number of messages sent " + i); + break; + } + + if (i % 1000 == 0) { + System.out.println("sent message " + message.getJMSMessageID()); + } + } + } catch (JMSException e) { + e.printStackTrace(); + } catch (URISyntaxException e) { + e.printStackTrace(); + } + } + } +} diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsManager.java b/activemq-core/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsManager.java new file mode 100644 index 0000000000..059400cda4 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsManager.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs.amq1974; +import java.io.File; +import java.net.URISyntaxException; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.network.DiscoveryNetworkConnector; +import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter; +import org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent; + +public class TryJmsManager { + + private final BrokerService broker = new BrokerService(); + + public static void main(String[] args) throws Exception { + new TryJmsManager().start(); + } + + private void start() throws Exception { + + broker.setUseJmx(false); + broker.setPersistent(true); + broker.setBrokerName("TestBroker"); + broker.getSystemUsage().setSendFailIfNoSpace(true); + + broker.getSystemUsage().getMemoryUsage().setLimit(10 * 1024 * 1024); + + KahaPersistenceAdapter persist = new KahaPersistenceAdapter(); + persist.setDirectory(new File("/tmp/broker1")); + persist.setMaxDataFileLength(20 * 1024 * 1024); + broker.setPersistenceAdapter(persist); + + String brokerUrl = "tcp://localhost:4500"; + broker.addConnector(brokerUrl); + + broker.start(); + + addNetworkBroker(); + + startUsageMonitor(broker); + + startMessageConsumer(); + + synchronized(this) { + this.wait(); + } + } + + private void startUsageMonitor(final BrokerService brokerService) { + new Thread(new Runnable() { + public void run() { + while (true) { + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + System.out.println("ActiveMQ memeory " + brokerService.getSystemUsage().getMemoryUsage().getPercentUsage() + + " " + brokerService.getSystemUsage().getMemoryUsage().getUsage()); + System.out.println("ActiveMQ message store " + brokerService.getSystemUsage().getStoreUsage().getPercentUsage()); + System.out.println("ActiveMQ temp space " + brokerService.getSystemUsage().getTempUsage().getPercentUsage()); + } + } + }).start(); + } + + private void addNetworkBroker() throws Exception { + DiscoveryNetworkConnector dnc = new DiscoveryNetworkConnector(); + dnc.setNetworkTTL(1); + dnc.setBrokerName("TestBroker"); + dnc.setName("Broker1Connector"); + dnc.setDynamicOnly(true); + + SimpleDiscoveryAgent discoveryAgent = new SimpleDiscoveryAgent(); + String remoteUrl = "tcp://localhost:4501"; + discoveryAgent.setServices(remoteUrl); + + dnc.setDiscoveryAgent(discoveryAgent); + + broker.addNetworkConnector(dnc); + dnc.start(); + } + + private void startMessageConsumer() throws JMSException, URISyntaxException { + String url = "vm://TestBroker"; + ActiveMQConnection connection = ActiveMQConnection.makeConnection(url); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination dest = session.createTopic("TestDestination"); + + MessageConsumer consumer = session.createConsumer(dest); + consumer.setMessageListener(new MessageListener() { + + public void onMessage(Message message) { + try { + System.out.println("got message " + message.getJMSMessageID()); + } catch (JMSException e) { + e.printStackTrace(); + } + } + } + ); + + connection.start(); + } +}