Applying reduceMemoryFootprint for persistent Topic messages
This commit is contained in:
Christopher L. Shannon (cshannon) 2016-06-14 14:36:37 +00:00
parent 9f9b0fb26a
commit a953f11d0e
5 changed files with 78 additions and 32 deletions

View File

@ -512,6 +512,10 @@ public class Topic extends BaseDestination implements Task {
waitForSpace(context,producerExchange, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
}
result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage());
if (isReduceMemoryFootprint()) {
message.clearMarshalledState();
}
}
message.incrementReferenceCount();

View File

@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
@ -40,6 +41,7 @@ import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
@ -48,6 +50,7 @@ import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@ -67,17 +70,22 @@ public abstract class AbstractVmConcurrentDispatchTest {
private final MessageType messageType;
private final boolean reduceMemoryFootPrint;
protected final boolean useTopic;
protected static enum MessageType {TEXT, MAP, OBJECT}
protected final static boolean[] booleanVals = {true, false};
protected static boolean[] reduceMemoryFootPrintVals = booleanVals;
protected static boolean[] useTopicVals = booleanVals;
private String testTopicName = "mytopic";
@Rule
public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
public AbstractVmConcurrentDispatchTest(MessageType messageType, boolean reduceMemoryFootPrint) {
public AbstractVmConcurrentDispatchTest(MessageType messageType, boolean reduceMemoryFootPrint,
boolean useTopic) {
this.messageType = messageType;
this.reduceMemoryFootPrint = reduceMemoryFootPrint;
this.useTopic = useTopic;
}
private BrokerService broker;
@ -92,7 +100,7 @@ public abstract class AbstractVmConcurrentDispatchTest {
private final int NUM_PRODUCERS = 1;
private final int NUM_TASKS = NUM_CONSUMERS + NUM_PRODUCERS;
private int i = 0;
private final AtomicInteger count = new AtomicInteger();
private String MessageId = null;
private int MessageCount = 0;
@ -127,23 +135,28 @@ public abstract class AbstractVmConcurrentDispatchTest {
@Test(timeout=180000)
public void testMessagesAreValid() throws Exception {
if (this.useTopic) {
Assume.assumeTrue(reduceMemoryFootPrint);
}
ExecutorService tasks = Executors.newFixedThreadPool(NUM_TASKS);
for (int i = 0; i < NUM_CONSUMERS; i++) {
LOG.info("Created Consumer: {}", i + 1);
tasks.execute(new HelloWorldConsumer());
tasks.execute(new HelloWorldConsumer(useTopic));
}
for (int i = 0; i < NUM_PRODUCERS; i++) {
LOG.info("Created Producer: {}", i + 1);
tasks.execute(new HelloWorldProducer());
tasks.execute(new HelloWorldProducer(useTopic));
}
assertTrue(ready.await(20, TimeUnit.SECONDS));
try {
tasks.shutdown();
tasks.awaitTermination(20, TimeUnit.SECONDS);
//run for 10 seconds as that seems to be enough time to cause an error
//if there is going to be one
tasks.awaitTermination(10, TimeUnit.SECONDS);
} catch (Exception e) {
//should get exception with no errors
}
@ -161,6 +174,12 @@ public abstract class AbstractVmConcurrentDispatchTest {
public class HelloWorldProducer implements Runnable {
final boolean useTopic;
public HelloWorldProducer(boolean useTopic) {
this.useTopic = useTopic;
}
@Override
public void run() {
try {
@ -172,7 +191,10 @@ public abstract class AbstractVmConcurrentDispatchTest {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic("VirtualTopic.AMQ6218Test");
//If using topics, just test a generic topic name
//If useTopic is false then we are testing virtual topics/queue consumes
Destination destination = useTopic ? session.createTopic(testTopicName) :
session.createTopic("VirtualTopic.AMQ6218Test");
MessageProducer producer = session.createProducer(destination);
@ -213,25 +235,30 @@ public abstract class AbstractVmConcurrentDispatchTest {
}
public class HelloWorldConsumer implements Runnable, ExceptionListener {
String queueName;
final boolean useTopic;
public HelloWorldConsumer(boolean useTopic) {
this.useTopic = useTopic;
}
@Override
public void run() {
try {
int i = count.incrementAndGet();
String destName = !useTopic ? "Consumer.Q" + i + ".VirtualTopic.AMQ6218Test" : testTopicName;
LOG.info(destName);
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(getBrokerURI());
Connection connection = connectionFactory.createConnection();
connection.setClientID("clientId" + i);
connection.start();
Session session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
synchronized (this) {
queueName = "Consumer.Q" + i + ".VirtualTopic.AMQ6218Test";
i++;
LOG.info(queueName);
}
Destination destination = session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(destination);
Destination destination = useTopic ? session.createTopic(destName) : session.createQueue(destName);
MessageConsumer consumer = useTopic ?
session.createDurableSubscriber((Topic) destination, "sub" + i) :
session.createConsumer(destination);
ready.countDown();
@ -266,14 +293,14 @@ public abstract class AbstractVmConcurrentDispatchTest {
MapMessage mapMessage = (MapMessage) message;
text = mapMessage.getString("text");
} else {
LOG.info(queueName + " Message is not a instanceof " + messageType + " message id: " + message.getJMSMessageID() + message);
LOG.info(destName + " Message is not a instanceof " + messageType + " message id: " + message.getJMSMessageID() + message);
}
if (text == null) {
LOG.warn(queueName + " text received as a null " + message);
LOG.warn(destName + " text received as a null " + message);
failure.set(true);
} else {
LOG.info(queueName + " text " + text + " message id: " + message.getJMSMessageID());
LOG.info(destName + " text " + text + " message id: " + message.getJMSMessageID());
}
message.acknowledge();

View File

@ -33,14 +33,16 @@ public class KahaDbVmConcurrentDispatchTest extends AbstractVmConcurrentDispatch
private final boolean concurrentDispatch;
private static boolean[] concurrentDispatchVals = booleanVals;
@Parameters(name="Type:{0}; ReduceMemoryFootPrint:{1}; ConcurrentDispatch:{2}")
@Parameters(name="Type:{0}; ReduceMemoryFootPrint:{1}; ConcurrentDispatch:{2}; UseTopic:{3}")
public static Collection<Object[]> data() {
List<Object[]> values = new ArrayList<>();
for (MessageType mt : MessageType.values()) {
for (boolean rmfVal : reduceMemoryFootPrintVals) {
for (boolean cdVal : concurrentDispatchVals) {
values.add(new Object[] {mt, rmfVal, cdVal});
for (boolean tpVal : useTopicVals) {
values.add(new Object[] {mt, rmfVal, cdVal, tpVal});
}
}
}
}
@ -54,15 +56,19 @@ public class KahaDbVmConcurrentDispatchTest extends AbstractVmConcurrentDispatch
* @param concurrentDispatch
*/
public KahaDbVmConcurrentDispatchTest(MessageType messageType, boolean reduceMemoryFootPrint,
boolean concurrentDispatch) {
super(messageType, reduceMemoryFootPrint);
boolean concurrentDispatch, boolean useTopic) {
super(messageType, reduceMemoryFootPrint, useTopic);
this.concurrentDispatch = concurrentDispatch;
}
@Override
protected void configurePersistenceAdapter(BrokerService broker) throws IOException {
KahaDBPersistenceAdapter ad = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
ad.setConcurrentStoreAndDispatchQueues(concurrentDispatch);
if (useTopic) {
ad.setConcurrentStoreAndDispatchTopics(concurrentDispatch);
} else {
ad.setConcurrentStoreAndDispatchQueues(concurrentDispatch);
}
}
}

View File

@ -33,14 +33,16 @@ public class MultiKahaDbVmConcurrentDispatchTest extends AbstractVmConcurrentDis
private final boolean concurrentDispatch;
private static boolean[] concurrentDispatchVals = booleanVals;
@Parameters(name="Type:{0}; ReduceMemoryFootPrint:{1}; ConcurrentDispatch:{2}")
@Parameters(name="Type:{0}; ReduceMemoryFootPrint:{1}; ConcurrentDispatch:{2}; UseTopic:{3}")
public static Collection<Object[]> data() {
List<Object[]> values = new ArrayList<>();
for (MessageType mt : MessageType.values()) {
for (boolean rmfVal : reduceMemoryFootPrintVals) {
for (boolean cdVal : concurrentDispatchVals) {
values.add(new Object[] {mt, rmfVal, cdVal});
for (boolean tpVal : useTopicVals) {
values.add(new Object[] {mt, rmfVal, cdVal, tpVal});
}
}
}
}
@ -54,8 +56,8 @@ public class MultiKahaDbVmConcurrentDispatchTest extends AbstractVmConcurrentDis
* @param concurrentDispatch
*/
public MultiKahaDbVmConcurrentDispatchTest(MessageType messageType, boolean reduceMemoryFootPrint,
boolean concurrentDispatch) {
super(messageType, reduceMemoryFootPrint);
boolean concurrentDispatch, boolean useTopic) {
super(messageType, reduceMemoryFootPrint, useTopic);
this.concurrentDispatch = concurrentDispatch;
}
@ -66,7 +68,11 @@ public class MultiKahaDbVmConcurrentDispatchTest extends AbstractVmConcurrentDis
persistenceAdapter.setDirectory(dataFileDir.getRoot());
KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter();
kahaStore.setConcurrentStoreAndDispatchQueues(concurrentDispatch);
if (useTopic) {
kahaStore.setConcurrentStoreAndDispatchTopics(concurrentDispatch);
} else {
kahaStore.setConcurrentStoreAndDispatchQueues(concurrentDispatch);
}
FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter();
filtered.setPersistenceAdapter(kahaStore);

View File

@ -31,13 +31,15 @@ import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class LevelDbVmConcurrentDispatchTest extends AbstractVmConcurrentDispatchTest {
@Parameters(name="Type:{0}; ReduceMemoryFootPrint:{1}")
@Parameters(name="Type:{0}; ReduceMemoryFootPrint:{1}; UseTopic:{2}")
public static Collection<Object[]> data() {
List<Object[]> values = new ArrayList<>();
for (MessageType mt : MessageType.values()) {
for (boolean rmfVal : reduceMemoryFootPrintVals) {
values.add(new Object[] {mt, rmfVal});
for (boolean tpVal : useTopicVals) {
values.add(new Object[] {mt, rmfVal, tpVal});
}
}
}
@ -49,8 +51,9 @@ public class LevelDbVmConcurrentDispatchTest extends AbstractVmConcurrentDispatc
* @param reduceMemoryFootPrint
* @param concurrentDispatch
*/
public LevelDbVmConcurrentDispatchTest(MessageType messageType, boolean reduceMemoryFootPrint) {
super(messageType, reduceMemoryFootPrint);
public LevelDbVmConcurrentDispatchTest(MessageType messageType, boolean reduceMemoryFootPrint,
boolean useTopic) {
super(messageType, reduceMemoryFootPrint, useTopic);
}
@Override