AMQ-5748 - Fixing MessageStore cache

This fixes KahaDBStore to properly check for an existing MessageStore
in the cache before creating a new one.  This will prevent potential
issues with metrics.
This commit is contained in:
Christopher L. Shannon (cshannon) 2015-08-07 17:13:37 +00:00
parent 41ee3ec8de
commit de24980a62
2 changed files with 41 additions and 5 deletions

View File

@ -1004,16 +1004,32 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
@Override
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
MessageStore store = this.transactionStore.proxy(new KahaDBMessageStore(destination));
storeCache.put(key(convert(destination)), store);
String key = key(convert(destination));
MessageStore store = storeCache.get(key(convert(destination)));
if (store == null) {
final MessageStore queueStore = this.transactionStore.proxy(new KahaDBMessageStore(destination));
store = storeCache.putIfAbsent(key, queueStore);
if (store == null) {
store = queueStore;
}
}
return store;
}
@Override
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
TopicMessageStore store = this.transactionStore.proxy(new KahaDBTopicMessageStore(destination));
storeCache.put(key(convert(destination)), store);
return store;
String key = key(convert(destination));
MessageStore store = storeCache.get(key(convert(destination)));
if (store == null) {
final TopicMessageStore topicStore = this.transactionStore.proxy(new KahaDBTopicMessageStore(destination));
store = storeCache.putIfAbsent(key, topicStore);
if (store == null) {
store = topicStore;
}
}
return (TopicMessageStore) store;
}
/**

View File

@ -17,6 +17,7 @@
package org.apache.activemq.store;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
@ -34,10 +35,14 @@ import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSession;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
@ -247,6 +252,12 @@ public abstract class AbstractMessageStoreSizeStatTest {
Topic topic = session.createTopic("test.topic");
session.createDurableSubscriber(topic, "sub1");
// browse the durable sub - this test is to verify that browsing (which calls createTopicMessageStore)
//in KahaDBStore will not create a brand new store (ie uses the cache) If the cache is not used,
//then the statistics won't be updated properly because a new store would overwrite the old store
//which is still in use
ObjectName[] subs = broker.getAdminView().getDurableTopicSubscribers();
try {
// publish a bunch of non-persistent messages to fill up the temp
// store
@ -256,6 +267,15 @@ public abstract class AbstractMessageStoreSizeStatTest {
prod.send(createMessage(session));
}
//verify the view has 200 messages
assertEquals(1, subs.length);
ObjectName subName = subs[0];
DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean)
broker.getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true);
CompositeData[] data = sub.browse();
assertNotNull(data);
assertEquals(200, data.length);
} finally {
connection.stop();
}