diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java index 1a9949e807..c43f55e820 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -512,6 +512,10 @@ public class Topic extends BaseDestination implements Task { waitForSpace(context,producerExchange, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage); } result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage()); + + if (isReduceMemoryFootprint()) { + message.clearMarshalledState(); + } } message.incrementReferenceCount(); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractVmConcurrentDispatchTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractVmConcurrentDispatchTest.java index aaaaf6916f..69bd2c8af9 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractVmConcurrentDispatchTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractVmConcurrentDispatchTest.java @@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import javax.jms.Connection; import javax.jms.DeliveryMode; @@ -40,6 +41,7 @@ import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.TextMessage; +import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQSession; @@ -48,6 +50,7 @@ import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.junit.After; +import org.junit.Assume; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -67,17 +70,22 @@ public abstract class AbstractVmConcurrentDispatchTest { private final MessageType messageType; private final boolean reduceMemoryFootPrint; + protected final boolean useTopic; protected static enum MessageType {TEXT, MAP, OBJECT} protected final static boolean[] booleanVals = {true, false}; protected static boolean[] reduceMemoryFootPrintVals = booleanVals; + protected static boolean[] useTopicVals = booleanVals; + private String testTopicName = "mytopic"; @Rule public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target")); - public AbstractVmConcurrentDispatchTest(MessageType messageType, boolean reduceMemoryFootPrint) { + public AbstractVmConcurrentDispatchTest(MessageType messageType, boolean reduceMemoryFootPrint, + boolean useTopic) { this.messageType = messageType; this.reduceMemoryFootPrint = reduceMemoryFootPrint; + this.useTopic = useTopic; } private BrokerService broker; @@ -92,7 +100,7 @@ public abstract class AbstractVmConcurrentDispatchTest { private final int NUM_PRODUCERS = 1; private final int NUM_TASKS = NUM_CONSUMERS + NUM_PRODUCERS; - private int i = 0; + private final AtomicInteger count = new AtomicInteger(); private String MessageId = null; private int MessageCount = 0; @@ -127,23 +135,28 @@ public abstract class AbstractVmConcurrentDispatchTest { @Test(timeout=180000) public void testMessagesAreValid() throws Exception { + if (this.useTopic) { + Assume.assumeTrue(reduceMemoryFootPrint); + } ExecutorService tasks = Executors.newFixedThreadPool(NUM_TASKS); for (int i = 0; i < NUM_CONSUMERS; i++) { LOG.info("Created Consumer: {}", i + 1); - tasks.execute(new HelloWorldConsumer()); + tasks.execute(new HelloWorldConsumer(useTopic)); } for (int i = 0; i < NUM_PRODUCERS; i++) { LOG.info("Created Producer: {}", i + 1); - tasks.execute(new HelloWorldProducer()); + tasks.execute(new HelloWorldProducer(useTopic)); } assertTrue(ready.await(20, TimeUnit.SECONDS)); try { tasks.shutdown(); - tasks.awaitTermination(20, TimeUnit.SECONDS); + //run for 10 seconds as that seems to be enough time to cause an error + //if there is going to be one + tasks.awaitTermination(10, TimeUnit.SECONDS); } catch (Exception e) { //should get exception with no errors } @@ -161,6 +174,12 @@ public abstract class AbstractVmConcurrentDispatchTest { public class HelloWorldProducer implements Runnable { + final boolean useTopic; + + public HelloWorldProducer(boolean useTopic) { + this.useTopic = useTopic; + } + @Override public void run() { try { @@ -172,7 +191,10 @@ public abstract class AbstractVmConcurrentDispatchTest { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination destination = session.createTopic("VirtualTopic.AMQ6218Test"); + //If using topics, just test a generic topic name + //If useTopic is false then we are testing virtual topics/queue consumes + Destination destination = useTopic ? session.createTopic(testTopicName) : + session.createTopic("VirtualTopic.AMQ6218Test"); MessageProducer producer = session.createProducer(destination); @@ -213,25 +235,30 @@ public abstract class AbstractVmConcurrentDispatchTest { } public class HelloWorldConsumer implements Runnable, ExceptionListener { - String queueName; + final boolean useTopic; + + public HelloWorldConsumer(boolean useTopic) { + this.useTopic = useTopic; + } @Override public void run() { try { + int i = count.incrementAndGet(); + String destName = !useTopic ? "Consumer.Q" + i + ".VirtualTopic.AMQ6218Test" : testTopicName; + LOG.info(destName); + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(getBrokerURI()); Connection connection = connectionFactory.createConnection(); + connection.setClientID("clientId" + i); connection.start(); - Session session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); - synchronized (this) { - queueName = "Consumer.Q" + i + ".VirtualTopic.AMQ6218Test"; - i++; - LOG.info(queueName); - } - Destination destination = session.createQueue(queueName); - MessageConsumer consumer = session.createConsumer(destination); + Destination destination = useTopic ? session.createTopic(destName) : session.createQueue(destName); + MessageConsumer consumer = useTopic ? + session.createDurableSubscriber((Topic) destination, "sub" + i) : + session.createConsumer(destination); ready.countDown(); @@ -266,14 +293,14 @@ public abstract class AbstractVmConcurrentDispatchTest { MapMessage mapMessage = (MapMessage) message; text = mapMessage.getString("text"); } else { - LOG.info(queueName + " Message is not a instanceof " + messageType + " message id: " + message.getJMSMessageID() + message); + LOG.info(destName + " Message is not a instanceof " + messageType + " message id: " + message.getJMSMessageID() + message); } if (text == null) { - LOG.warn(queueName + " text received as a null " + message); + LOG.warn(destName + " text received as a null " + message); failure.set(true); } else { - LOG.info(queueName + " text " + text + " message id: " + message.getJMSMessageID()); + LOG.info(destName + " text " + text + " message id: " + message.getJMSMessageID()); } message.acknowledge(); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDbVmConcurrentDispatchTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDbVmConcurrentDispatchTest.java index 217a7c7429..36d5a81c77 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDbVmConcurrentDispatchTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDbVmConcurrentDispatchTest.java @@ -33,14 +33,16 @@ public class KahaDbVmConcurrentDispatchTest extends AbstractVmConcurrentDispatch private final boolean concurrentDispatch; private static boolean[] concurrentDispatchVals = booleanVals; - @Parameters(name="Type:{0}; ReduceMemoryFootPrint:{1}; ConcurrentDispatch:{2}") + @Parameters(name="Type:{0}; ReduceMemoryFootPrint:{1}; ConcurrentDispatch:{2}; UseTopic:{3}") public static Collection data() { List values = new ArrayList<>(); for (MessageType mt : MessageType.values()) { for (boolean rmfVal : reduceMemoryFootPrintVals) { for (boolean cdVal : concurrentDispatchVals) { - values.add(new Object[] {mt, rmfVal, cdVal}); + for (boolean tpVal : useTopicVals) { + values.add(new Object[] {mt, rmfVal, cdVal, tpVal}); + } } } } @@ -54,15 +56,19 @@ public class KahaDbVmConcurrentDispatchTest extends AbstractVmConcurrentDispatch * @param concurrentDispatch */ public KahaDbVmConcurrentDispatchTest(MessageType messageType, boolean reduceMemoryFootPrint, - boolean concurrentDispatch) { - super(messageType, reduceMemoryFootPrint); + boolean concurrentDispatch, boolean useTopic) { + super(messageType, reduceMemoryFootPrint, useTopic); this.concurrentDispatch = concurrentDispatch; } @Override protected void configurePersistenceAdapter(BrokerService broker) throws IOException { KahaDBPersistenceAdapter ad = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); - ad.setConcurrentStoreAndDispatchQueues(concurrentDispatch); + if (useTopic) { + ad.setConcurrentStoreAndDispatchTopics(concurrentDispatch); + } else { + ad.setConcurrentStoreAndDispatchQueues(concurrentDispatch); + } } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDbVmConcurrentDispatchTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDbVmConcurrentDispatchTest.java index 3d16ce7d56..efe8688edc 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDbVmConcurrentDispatchTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDbVmConcurrentDispatchTest.java @@ -33,14 +33,16 @@ public class MultiKahaDbVmConcurrentDispatchTest extends AbstractVmConcurrentDis private final boolean concurrentDispatch; private static boolean[] concurrentDispatchVals = booleanVals; - @Parameters(name="Type:{0}; ReduceMemoryFootPrint:{1}; ConcurrentDispatch:{2}") + @Parameters(name="Type:{0}; ReduceMemoryFootPrint:{1}; ConcurrentDispatch:{2}; UseTopic:{3}") public static Collection data() { List values = new ArrayList<>(); for (MessageType mt : MessageType.values()) { for (boolean rmfVal : reduceMemoryFootPrintVals) { for (boolean cdVal : concurrentDispatchVals) { - values.add(new Object[] {mt, rmfVal, cdVal}); + for (boolean tpVal : useTopicVals) { + values.add(new Object[] {mt, rmfVal, cdVal, tpVal}); + } } } } @@ -54,8 +56,8 @@ public class MultiKahaDbVmConcurrentDispatchTest extends AbstractVmConcurrentDis * @param concurrentDispatch */ public MultiKahaDbVmConcurrentDispatchTest(MessageType messageType, boolean reduceMemoryFootPrint, - boolean concurrentDispatch) { - super(messageType, reduceMemoryFootPrint); + boolean concurrentDispatch, boolean useTopic) { + super(messageType, reduceMemoryFootPrint, useTopic); this.concurrentDispatch = concurrentDispatch; } @@ -66,7 +68,11 @@ public class MultiKahaDbVmConcurrentDispatchTest extends AbstractVmConcurrentDis persistenceAdapter.setDirectory(dataFileDir.getRoot()); KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter(); - kahaStore.setConcurrentStoreAndDispatchQueues(concurrentDispatch); + if (useTopic) { + kahaStore.setConcurrentStoreAndDispatchTopics(concurrentDispatch); + } else { + kahaStore.setConcurrentStoreAndDispatchQueues(concurrentDispatch); + } FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter(); filtered.setPersistenceAdapter(kahaStore); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDbVmConcurrentDispatchTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDbVmConcurrentDispatchTest.java index d1b7e43d81..06e09becea 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDbVmConcurrentDispatchTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDbVmConcurrentDispatchTest.java @@ -31,13 +31,15 @@ import org.junit.runners.Parameterized.Parameters; @RunWith(Parameterized.class) public class LevelDbVmConcurrentDispatchTest extends AbstractVmConcurrentDispatchTest { - @Parameters(name="Type:{0}; ReduceMemoryFootPrint:{1}") + @Parameters(name="Type:{0}; ReduceMemoryFootPrint:{1}; UseTopic:{2}") public static Collection data() { List values = new ArrayList<>(); for (MessageType mt : MessageType.values()) { for (boolean rmfVal : reduceMemoryFootPrintVals) { - values.add(new Object[] {mt, rmfVal}); + for (boolean tpVal : useTopicVals) { + values.add(new Object[] {mt, rmfVal, tpVal}); + } } } @@ -49,8 +51,9 @@ public class LevelDbVmConcurrentDispatchTest extends AbstractVmConcurrentDispatc * @param reduceMemoryFootPrint * @param concurrentDispatch */ - public LevelDbVmConcurrentDispatchTest(MessageType messageType, boolean reduceMemoryFootPrint) { - super(messageType, reduceMemoryFootPrint); + public LevelDbVmConcurrentDispatchTest(MessageType messageType, boolean reduceMemoryFootPrint, + boolean useTopic) { + super(messageType, reduceMemoryFootPrint, useTopic); } @Override