https://issues.apache.org/jira/browse/AMQ-3397: Improve scalability of active durable subs with JDBC message store. Cache topic message sequence ids to avoid each ack going to the store twice, boost through put for active durable subs; 100 subs, 550->2200 for 2min test with mysql

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1144340 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-07-08 14:37:10 +00:00
parent fc36cd7d7b
commit 1f816d439e
2 changed files with 38 additions and 4 deletions

View File

@ -109,10 +109,10 @@ public class JDBCMessageStore extends AbstractMessageStore {
} finally { } finally {
c.close(); c.close();
} }
onAdd(sequenceId, message.getPriority()); onAdd(messageId, sequenceId, message.getPriority());
} }
protected void onAdd(long sequenceId, byte priority) { protected void onAdd(MessageId messageId, long sequenceId, byte priority) {
} }
public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException { public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {

View File

@ -20,11 +20,14 @@ import java.io.IOException;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.ActiveMQMessageAudit; import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
@ -46,6 +49,17 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
private static final Logger LOG = LoggerFactory.getLogger(JDBCTopicMessageStore.class); private static final Logger LOG = LoggerFactory.getLogger(JDBCTopicMessageStore.class);
private Map<String, LastRecovered> subscriberLastRecoveredMap = new ConcurrentHashMap<String, LastRecovered>(); private Map<String, LastRecovered> subscriberLastRecoveredMap = new ConcurrentHashMap<String, LastRecovered>();
public static final String PROPERTY_SEQUENCE_ID_CACHE_SIZE = "org.apache.activemq.store.jdbc.SEQUENCE_ID_CACHE_SIZE";
private static final int SEQUENCE_ID_CACHE_SIZE = Integer.parseInt(System.getProperty(
PROPERTY_SEQUENCE_ID_CACHE_SIZE, "1000"), 10);
private final ReentrantReadWriteLock sequenceIdCacheSizeLock = new ReentrantReadWriteLock();
private Map<MessageId, long[]> sequenceIdCache = new LinkedHashMap<MessageId, long[]>() {
protected boolean removeEldestEntry(Map.Entry<MessageId, long[]> eldest) {
return size() > SEQUENCE_ID_CACHE_SIZE;
}
};
public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) { public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) {
super(persistenceAdapter, adapter, wireFormat, topic, audit); super(persistenceAdapter, adapter, wireFormat, topic, audit);
} }
@ -59,7 +73,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
} }
TransactionContext c = persistenceAdapter.getTransactionContext(context); TransactionContext c = persistenceAdapter.getTransactionContext(context);
try { try {
long[] res = adapter.getStoreSequenceId(c, destination, messageId); long[] res = getCachedStoreSequenceId(c, destination, messageId);
if (this.isPrioritizedMessages()) { if (this.isPrioritizedMessages()) {
adapter.doSetLastAckWithPriority(c, destination, clientId, subscriptionName, res[0], res[1]); adapter.doSetLastAckWithPriority(c, destination, clientId, subscriptionName, res[0], res[1]);
} else { } else {
@ -76,6 +90,20 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
} }
} }
private long[] getCachedStoreSequenceId(TransactionContext transactionContext, ActiveMQDestination destination, MessageId messageId) throws SQLException, IOException {
long[] val = null;
sequenceIdCacheSizeLock.readLock().lock();
try {
val = sequenceIdCache.get(messageId);
} finally {
sequenceIdCacheSizeLock.readLock().unlock();
}
if (val == null) {
val = adapter.getStoreSequenceId(transactionContext, destination, messageId);
}
return val;
}
/** /**
* @throws Exception * @throws Exception
*/ */
@ -266,11 +294,17 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
subscriberLastRecoveredMap.remove(getSubscriptionKey(clientId, subscriptionName)); subscriberLastRecoveredMap.remove(getSubscriptionKey(clientId, subscriptionName));
} }
protected void onAdd(long sequenceId, byte priority) { protected void onAdd(MessageId messageId, long sequenceId, byte priority) {
// update last recovered state // update last recovered state
for (LastRecovered last : subscriberLastRecoveredMap.values()) { for (LastRecovered last : subscriberLastRecoveredMap.values()) {
last.updateStored(sequenceId, priority); last.updateStored(sequenceId, priority);
} }
sequenceIdCacheSizeLock.writeLock().lock();
try {
sequenceIdCache.put(messageId, new long[]{sequenceId, priority});
} finally {
sequenceIdCacheSizeLock.writeLock().unlock();
}
} }