Updating MemoryTopicMessageStore to decrement store statistics on cache
eviction.  Updating KahaDBMessageStoreSizeStatTest to account for the
fact that a LRU cache is used so the last 100 messages are kept in
memroy.
This commit is contained in:
Christopher L. Shannon (cshannon) 2015-08-08 17:55:41 +00:00
parent a3c8bee1f0
commit a49d46e3ca
6 changed files with 242 additions and 53 deletions

View File

@ -57,8 +57,7 @@ public class MemoryMessageStore extends AbstractMessageStore {
public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
synchronized (messageTable) {
messageTable.put(message.getMessageId(), message);
getMessageStoreStatistics().getMessageCount().increment();
getMessageStoreStatistics().getMessageSize().addSize(message.getSize());
incMessageStoreStatistics(message);
}
message.incrementReferenceCount();
message.getMessageId().setFutureOrSequenceLong(sequenceId++);
@ -94,8 +93,7 @@ public class MemoryMessageStore extends AbstractMessageStore {
Message removed = messageTable.remove(msgId);
if( removed !=null ) {
removed.decrementReferenceCount();
getMessageStoreStatistics().getMessageCount().decrement();
getMessageStoreStatistics().getMessageSize().addSize(-removed.getSize());
decMessageStoreStatistics(removed);
}
if ((lastBatchId != null && lastBatchId.equals(msgId)) || messageTable.isEmpty()) {
lastBatchId = null;
@ -200,4 +198,14 @@ public class MemoryMessageStore extends AbstractMessageStore {
}
}
protected final void incMessageStoreStatistics(Message message) {
getMessageStoreStatistics().getMessageCount().increment();
getMessageStoreStatistics().getMessageSize().addSize(message.getSize());
}
protected final void decMessageStoreStatistics(Message message) {
getMessageStoreStatistics().getMessageCount().decrement();
getMessageStoreStatistics().getMessageSize().addSize(-message.getSize());
}
}

View File

@ -22,6 +22,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
@ -29,6 +30,7 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStoreStatistics;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.util.LRUCache;
import org.apache.activemq.util.SubscriptionKey;
@ -40,15 +42,24 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
private Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase;
private Map<SubscriptionKey, MemoryTopicSub> topicSubMap;
private final Map<MessageId, Message> originalMessageTable;
public MemoryTopicMessageStore(ActiveMQDestination destination) {
this(destination, new LRUCache<MessageId, Message>(100, 100, 0.75f, false), makeSubscriptionInfoMap());
this(destination, new MemoryTopicMessageStoreLRUCache(100, 100, 0.75f, false), makeSubscriptionInfoMap());
//Set the messageStoreStatistics after the super class is initialized so that the stats can be
//properly updated on cache eviction
MemoryTopicMessageStoreLRUCache cache = (MemoryTopicMessageStoreLRUCache) originalMessageTable;
cache.setMessageStoreStatistics(messageStoreStatistics);
}
public MemoryTopicMessageStore(ActiveMQDestination destination, Map<MessageId, Message> messageTable, Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase) {
super(destination, messageTable);
this.subscriberDatabase = subscriberDatabase;
this.topicSubMap = makeSubMap();
//this is only necessary so that messageStoreStatistics can be set if necessary
//We need the original reference since messageTable is wrapped in a synchronized map in the parent class
this.originalMessageTable = messageTable;
}
protected static Map<SubscriptionKey, SubscriptionInfo> makeSubscriptionInfoMap() {
@ -59,6 +70,7 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
return Collections.synchronizedMap(new HashMap<SubscriptionKey, MemoryTopicSub>());
}
@Override
public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
super.addMessage(context, message);
for (Iterator<MemoryTopicSub> i = topicSubMap.values().iterator(); i.hasNext();) {
@ -67,6 +79,7 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
}
}
@Override
public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
MessageId messageId, MessageAck ack) throws IOException {
SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
@ -76,10 +89,12 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
}
}
@Override
public synchronized SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
return subscriberDatabase.get(new SubscriptionKey(clientId, subscriptionName));
}
@Override
public synchronized void addSubscription(SubscriptionInfo info, boolean retroactive) throws IOException {
SubscriptionKey key = new SubscriptionKey(info);
MemoryTopicSub sub = new MemoryTopicSub();
@ -93,12 +108,14 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
subscriberDatabase.put(key, info);
}
@Override
public synchronized void deleteSubscription(String clientId, String subscriptionName) {
org.apache.activemq.util.SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
subscriberDatabase.remove(key);
topicSubMap.remove(key);
}
@Override
public synchronized void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception {
MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
if (sub != null) {
@ -106,16 +123,19 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
}
}
@Override
public synchronized void delete() {
super.delete();
subscriberDatabase.clear();
topicSubMap.clear();
}
@Override
public SubscriptionInfo[] getAllSubscriptions() throws IOException {
return subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]);
}
@Override
public synchronized int getMessageCount(String clientId, String subscriberName) throws IOException {
int result = 0;
MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriberName));
@ -125,6 +145,7 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
return result;
}
@Override
public synchronized void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception {
MemoryTopicSub sub = this.topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
if (sub != null) {
@ -132,10 +153,39 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
}
}
@Override
public void resetBatching(String clientId, String subscriptionName) {
MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
if (sub != null) {
sub.resetBatching();
}
}
/**
* Since we initialize the store with a LRUCache in some cases, we need to account for cache evictions
* when computing the message store statistics.
*
*/
private static class MemoryTopicMessageStoreLRUCache extends LRUCache<MessageId, Message> {
private static final long serialVersionUID = -342098639681884413L;
private MessageStoreStatistics messageStoreStatistics;
public MemoryTopicMessageStoreLRUCache(int initialCapacity, int maximumCacheSize,
float loadFactor, boolean accessOrder) {
super(initialCapacity, maximumCacheSize, loadFactor, accessOrder);
}
public void setMessageStoreStatistics(
MessageStoreStatistics messageStoreStatistics) {
this.messageStoreStatistics = messageStoreStatistics;
}
@Override
protected void onCacheEviction(Map.Entry<MessageId, Message> eldest) {
if (messageStoreStatistics != null) {
messageStoreStatistics.getMessageCount().decrement();
messageStoreStatistics.getMessageSize().addSize(-eldest.getValue().getSize());
}
}
}
}

View File

@ -35,6 +35,7 @@ import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;
@ -47,6 +48,8 @@ import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.Wait;
import org.apache.activemq.util.Wait.Condition;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -67,6 +70,7 @@ public abstract class AbstractMessageStoreSizeStatTest {
protected BrokerService broker;
protected URI brokerConnectURI;
protected String defaultQueueName = "test.queue";
protected String defaultTopicName = "test.topic";
protected static int messageSize = 1000;
@Before
@ -100,34 +104,67 @@ public abstract class AbstractMessageStoreSizeStatTest {
@Test
public void testMessageSize() throws Exception {
Destination dest = publishTestMessages(200);
Destination dest = publishTestQueueMessages(200);
verifyStats(dest, 200, 200 * messageSize);
}
@Test
public void testMessageSizeAfterConsumption() throws Exception {
Destination dest = publishTestMessages(200);
Destination dest = publishTestQueueMessages(200);
verifyStats(dest, 200, 200 * messageSize);
consumeTestMessages();
Thread.sleep(3000);
consumeTestQueueMessages();
verifyStats(dest, 0, 0);
}
@Test
public void testMessageSizeDurable() throws Exception {
public void testMessageSizeOneDurable() throws Exception {
Destination dest = publishTestMessagesDurable();
Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
connection.setClientID("clientId");
connection.start();
Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, 200);
//verify the count and size
verifyStats(dest, 200, 200 * messageSize);
//consume all messages
consumeDurableTestMessages(connection, "sub1", 200);
//All messages should now be gone
verifyStats(dest, 0, 0);
connection.close();
}
@Test(timeout=10000)
public void testMessageSizeTwoDurables() throws Exception {
Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
connection.setClientID("clientId");
connection.start();
Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200, 200);
//verify the count and size
verifyStats(dest, 200, 200 * messageSize);
//consume messages just for sub1
consumeDurableTestMessages(connection, "sub1", 200);
//There is still a durable that hasn't consumed so the messages should exist
verifyStats(dest, 200, 200 * messageSize);
connection.stop();
}
@Test
public void testMessageSizeAfterDestinationDeletion() throws Exception {
Destination dest = publishTestMessages(200);
Destination dest = publishTestQueueMessages(200);
verifyStats(dest, 200, 200 * messageSize);
//check that the size is 0 after deletion
@ -135,18 +172,34 @@ public abstract class AbstractMessageStoreSizeStatTest {
verifyStats(dest, 0, 0);
}
protected void verifyStats(Destination dest, int count, long minimumSize) throws Exception {
MessageStore messageStore = dest.getMessageStore();
MessageStoreStatistics storeStats = dest.getMessageStore().getMessageStoreStatistics();
assertEquals(messageStore.getMessageCount(), count);
assertEquals(messageStore.getMessageCount(),
storeStats.getMessageCount().getCount());
assertEquals(messageStore.getMessageSize(),
protected void verifyStats(Destination dest, final int count, final long minimumSize) throws Exception {
final MessageStore messageStore = dest.getMessageStore();
final MessageStoreStatistics storeStats = dest.getMessageStore().getMessageStoreStatistics();
Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
return (count == messageStore.getMessageCount()) && (messageStore.getMessageCount() ==
storeStats.getMessageCount().getCount()) && (messageStore.getMessageSize() ==
messageStore.getMessageStoreStatistics().getMessageSize().getTotalSize());
}
});
if (count > 0) {
assertTrue(storeStats.getMessageSize().getTotalSize() > minimumSize);
Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
return storeStats.getMessageSize().getTotalSize() > minimumSize;
}
});
} else {
assertEquals(storeStats.getMessageSize().getTotalSize(), 0);
Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
return storeStats.getMessageSize().getTotalSize() == 0;
}
});
}
}
@ -166,11 +219,11 @@ public abstract class AbstractMessageStoreSizeStatTest {
}
protected Destination publishTestMessages(int count) throws Exception {
return publishTestMessages(count, defaultQueueName);
protected Destination publishTestQueueMessages(int count) throws Exception {
return publishTestQueueMessages(count, defaultQueueName);
}
protected Destination publishTestMessages(int count, String queueName) throws Exception {
protected Destination publishTestQueueMessages(int count, String queueName) throws Exception {
// create a new queue
final ActiveMQDestination activeMqQueue = new ActiveMQQueue(
queueName);
@ -196,17 +249,21 @@ public abstract class AbstractMessageStoreSizeStatTest {
}
} finally {
connection.stop();
connection.close();
}
return dest;
}
protected Destination consumeTestMessages() throws Exception {
return consumeTestMessages(defaultQueueName);
protected Destination consumeTestQueueMessages() throws Exception {
return consumeTestQueueMessages(defaultQueueName);
}
protected Destination consumeTestMessages(String queueName) throws Exception {
protected Destination consumeDurableTestMessages(Connection connection, String sub, int size) throws Exception {
return consumeDurableTestMessages(connection, sub, size, defaultTopicName);
}
protected Destination consumeTestQueueMessages(String queueName) throws Exception {
// create a new queue
final ActiveMQDestination activeMqQueue = new ActiveMQQueue(
queueName);
@ -235,22 +292,45 @@ public abstract class AbstractMessageStoreSizeStatTest {
return dest;
}
protected Destination publishTestMessagesDurable() throws Exception {
protected Destination consumeDurableTestMessages(Connection connection, String sub, int size, String topicName) throws Exception {
// create a new queue
final ActiveMQDestination activeMqTopic = new ActiveMQTopic(
"test.topic");
topicName);
Destination dest = broker.getDestination(activeMqTopic);
Session session = connection.createSession(false,
QueueSession.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(topicName);
try {
TopicSubscriber consumer = session.createDurableSubscriber(topic, sub);
for (int i = 0; i < size; i++) {
consumer.receive();
}
} finally {
session.close();
}
return dest;
}
protected Destination publishTestMessagesDurable(Connection connection, String[] subNames, int publishSize, int expectedSize) throws Exception {
// create a new queue
final ActiveMQDestination activeMqTopic = new ActiveMQTopic(
defaultTopicName);
Destination dest = broker.getDestination(activeMqTopic);
// Start the connection
Connection connection = new ActiveMQConnectionFactory(brokerConnectURI)
.createConnection();
connection.setClientID("clientId");
connection.start();
Session session = connection.createSession(false,
TopicSession.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("test.topic");
session.createDurableSubscriber(topic, "sub1");
Topic topic = session.createTopic(defaultTopicName);
for (String subName : subNames) {
session.createDurableSubscriber(topic, subName);
}
// browse the durable sub - this test is to verify that browsing (which calls createTopicMessageStore)
//in KahaDBStore will not create a brand new store (ie uses the cache) If the cache is not used,
@ -263,21 +343,21 @@ public abstract class AbstractMessageStoreSizeStatTest {
// store
MessageProducer prod = session.createProducer(topic);
prod.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 0; i < 200; i++) {
for (int i = 0; i < publishSize; i++) {
prod.send(createMessage(session));
}
//verify the view has 200 messages
assertEquals(1, subs.length);
//verify the view has expected messages
assertEquals(subNames.length, subs.length);
ObjectName subName = subs[0];
DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean)
broker.getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true);
CompositeData[] data = sub.browse();
assertNotNull(data);
assertEquals(200, data.length);
assertEquals(expectedSize, data.length);
} finally {
connection.stop();
session.close();
}
return dest;

View File

@ -64,7 +64,7 @@ public class KahaDBMessageStoreSizeStatTest extends
@Test
public void testMessageSizeAfterRestartAndPublish() throws Exception {
Destination dest = publishTestMessages(200);
Destination dest = publishTestQueueMessages(200);
// verify the count and size
verifyStats(dest, 200, 200 * messageSize);
@ -72,7 +72,7 @@ public class KahaDBMessageStoreSizeStatTest extends
// stop, restart broker and publish more messages
stopBroker();
this.setUpBroker(false);
dest = publishTestMessages(200);
dest = publishTestQueueMessages(200);
// verify the count and size
verifyStats(dest, 400, 400 * messageSize);

View File

@ -84,7 +84,7 @@ public class MultiKahaDBMessageStoreSizeStatTest extends
@Test
public void testMessageSizeAfterRestartAndPublish() throws Exception {
Destination dest = publishTestMessages(200);
Destination dest = publishTestQueueMessages(200);
// verify the count and size
verifyStats(dest, 200, 200 * messageSize);
@ -92,7 +92,7 @@ public class MultiKahaDBMessageStoreSizeStatTest extends
// stop, restart broker and publish more messages
stopBroker();
this.setUpBroker(false);
dest = publishTestMessages(200);
dest = publishTestQueueMessages(200);
// verify the count and size
verifyStats(dest, 400, 400 * messageSize);
@ -102,13 +102,13 @@ public class MultiKahaDBMessageStoreSizeStatTest extends
@Test
public void testMessageSizeAfterRestartAndPublishMultiQueue() throws Exception {
Destination dest = publishTestMessages(200);
Destination dest = publishTestQueueMessages(200);
// verify the count and size
verifyStats(dest, 200, 200 * messageSize);
assertTrue(broker.getPersistenceAdapter().size() > 200 * messageSize);
Destination dest2 = publishTestMessages(200, "test.queue2");
Destination dest2 = publishTestQueueMessages(200, "test.queue2");
// verify the count and size
verifyStats(dest2, 200, 200 * messageSize);
@ -117,8 +117,8 @@ public class MultiKahaDBMessageStoreSizeStatTest extends
// stop, restart broker and publish more messages
stopBroker();
this.setUpBroker(false);
dest = publishTestMessages(200);
dest2 = publishTestMessages(200, "test.queue2");
dest = publishTestQueueMessages(200);
dest2 = publishTestQueueMessages(200, "test.queue2");
// verify the count and size after publishing messages
verifyStats(dest, 400, 400 * messageSize);

View File

@ -18,8 +18,13 @@ package org.apache.activemq.store.memory;
import java.io.IOException;
import javax.jms.Connection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.store.AbstractMessageStoreSizeStatTest;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -39,6 +44,52 @@ public class MemoryMessageStoreSizeStatTest extends AbstractMessageStoreSizeStat
broker.setPersistenceAdapter(new MemoryPersistenceAdapter());
}
@Override
@Test(timeout=10000)
public void testMessageSizeOneDurable() throws Exception {
Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
connection.setClientID("clientId");
connection.start();
//The expected value is only 100 because for durables a LRUCache is being used
//with a max size of 100
Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, 100);
//verify the count and size, should be 100 because of the LRUCache
verifyStats(dest, 100, 100 * messageSize);
consumeDurableTestMessages(connection, "sub1", 100);
//Since an LRU cache is used and messages are kept in memory, this should be 100 still
verifyStats(dest, 100, 100 * messageSize);
connection.stop();
}
@Override
@Test(timeout=10000)
public void testMessageSizeTwoDurables() throws Exception {
Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
connection.setClientID("clientId");
connection.start();
//The expected value is only 100 because for durables a LRUCache is being used
//with a max size of 100, so only 100 messages are kept
Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200, 100);
//verify the count and size
verifyStats(dest, 100, 100 * messageSize);
//consume for sub1
consumeDurableTestMessages(connection, "sub1", 100);
//Should be 100 messages still
verifyStats(dest, 100, 100 * messageSize);
connection.stop();
}