diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/MirroredQueue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/MirroredQueue.java index ea5af02848..fca1949c0f 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/MirroredQueue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/MirroredQueue.java @@ -55,6 +55,7 @@ public class MirroredQueue implements DestinationInterceptor, BrokerServiceAware message = message.copy(); } message.setDestination(destination.getActiveMQDestination()); + message.setMemoryUsage(null); // set this to null so that it will use the queue memoryUsage instance instead of the topic. super.send(context, message); } }; diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueCorrectMemoryUsageTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueCorrectMemoryUsageTest.java new file mode 100644 index 0000000000..4c19c7b254 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueCorrectMemoryUsageTest.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.virtual; + +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.DestinationInterceptor; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.virtual.MirroredQueue; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.usage.MemoryUsage; +import org.apache.activemq.usage.StoreUsage; +import org.apache.activemq.usage.SystemUsage; +import org.apache.activemq.usage.TempUsage; +import org.apache.activemq.util.IOHelper; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.util.Assert; + +/** + * This test will determine that the producer flow control does not kick in. + * The original MirroredQueue implementation was causing the queue to update + * the topic memory usage instead of the queue memory usage. + * The reason is that the message memory usage instance will not be updated + * unless it is null. This was the case when the message was initially sent + * to the topic but then it was non-null when it was being sent to the queue. + * When the region destination was set, the associated memory usage was not + * updated to the passed queue destination and thus the memory usage of the + * topic was being updated instead. + * + * @author Claudio Corsi + */ +public class MirroredQueueCorrectMemoryUsageTest extends EmbeddedBrokerTestSupport { + + private static final Logger logger = LoggerFactory.getLogger(MirroredQueueCorrectMemoryUsageTest.class); + + private static final long ONE_MB = 0x0100000; + private static final long TEN_MB = ONE_MB * 10; + private static final long TWENTY_MB = TEN_MB * 2; + + private static final String CREATED_STATIC_FOR_PERSISTENT = "created.static.for.persistent"; + + @Override + protected boolean isPersistent() { + return true; + } + + @Override + protected BrokerService createBroker() throws Exception { + // Create the broker service instance.... + BrokerService broker = super.createBroker(); + // Create and add the mirrored queue destination interceptor .... + DestinationInterceptor[] destinationInterceptors = new DestinationInterceptor[1]; + MirroredQueue mq = new MirroredQueue(); + mq.setCopyMessage(true); + mq.setPrefix(""); + mq.setPostfix(".qmirror"); + destinationInterceptors[0] = mq; + broker.setDestinationInterceptors(destinationInterceptors); + // Create the destination policy for the topics and queues + PolicyMap policyMap = new PolicyMap(); + List entries = new LinkedList(); + // Create Topic policy entry + PolicyEntry policyEntry = new PolicyEntry(); + super.useTopic = true; + ActiveMQDestination destination = super.createDestination(">"); + Assert.isTrue(destination.isTopic(), "Created destination was not a topic"); + policyEntry.setDestination(destination); + policyEntry.setProducerFlowControl(true); + policyEntry.setMemoryLimit(ONE_MB); // x10 + entries.add(policyEntry); + // Create Queue policy entry + policyEntry = new PolicyEntry(); + super.useTopic = false; + destination = super.createDestination(CREATED_STATIC_FOR_PERSISTENT); + Assert.isTrue(destination.isQueue(), "Created destination was not a queue"); + policyEntry.setDestination(destination); + policyEntry.setProducerFlowControl(true); + policyEntry.setMemoryLimit(TEN_MB); + entries.add(policyEntry); + policyMap.setPolicyEntries(entries); + broker.setDestinationPolicy(policyMap); + // Set destinations + broker.setDestinations(new ActiveMQDestination[] { destination }); + // Set system usage + SystemUsage memoryManager = new SystemUsage(); + MemoryUsage memoryUsage = new MemoryUsage(); + memoryUsage.setLimit(TEN_MB); + memoryManager.setMemoryUsage(memoryUsage); + StoreUsage storeUsage = new StoreUsage(); + storeUsage.setLimit(TWENTY_MB); + memoryManager.setStoreUsage(storeUsage); + TempUsage tempDiskUsage = new TempUsage(); + tempDiskUsage.setLimit(TEN_MB); + memoryManager.setTempUsage(tempDiskUsage); + broker.setSystemUsage(memoryManager); + // Set the persistent adapter + KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter(); + persistenceAdapter.setJournalMaxFileLength((int)TEN_MB); + // Delete all current messages... + IOHelper.deleteFile(persistenceAdapter.getDirectory()); + broker.setPersistenceAdapter(persistenceAdapter); + return broker; + } + + @Before + protected void setUp() throws Exception { + super.setUp(); + } + + @After + protected void tearDown() throws Exception { + super.tearDown(); + } + + @Test(timeout=40000) + public void testNoMemoryUsageIncreaseForTopic() throws Exception { + Connection connection = super.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Destination destination = session.createQueue(CREATED_STATIC_FOR_PERSISTENT); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + char[] m = new char[1024]; + Arrays.fill(m, 'x'); + // create some messages that have 1k each + for (int i = 1; i < 12000; i++) { + producer.send(session.createTextMessage(new String(m))); + logger.debug("Sent message: " + i); + } + producer.close(); + session.close(); + connection.stop(); + connection.close(); + } +}