https://issues.apache.org/jira/browse/AMQ-3288 - JDBC persistence adapter, intermittent performance degradation when a durable subscriber of priority messages falls behind. Store recovery was sometimes ignoring batch limit and would recover till memory limit was reached. Resolved issue with out or order delvery and immediatePriorityDelivery policy around memory limit boundaries. Resolved issue with over eager cleanup when priority and non priority destinations are mixed. Removed potential full table scan from cleanup. Resolved issue with null message ref (prefetch=0) and expiry. Added a bunch of corresponding tests

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1095352 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-04-20 10:39:25 +00:00
parent 315b00fa14
commit fe31092b54
18 changed files with 479 additions and 121 deletions

View File

@ -207,10 +207,12 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
MessageDispatch md = super.createMessageDispatch(node, message);
if (node != QueueMessageReference.NULL_MESSAGE) {
Integer count = redeliveredMessages.get(node.getMessageId());
if (count != null) {
md.setRedeliveryCounter(count.intValue());
}
}
return md;
}

View File

@ -114,7 +114,7 @@ final class NullMessageReference implements QueueMessageReference {
}
public boolean isExpired() {
throw new RuntimeException("not implemented");
return false;
}
public boolean isPersistent() {

View File

@ -447,7 +447,7 @@ public class Topic extends BaseDestination implements Task {
@Override
public void afterCommit() throws Exception {
// It could take while before we receive the commit
// operration.. by that time the message could have
// operation.. by that time the message could have
// expired..
if (broker.isExpired(message)) {
getDestinationStatistics().getExpired().increment();

View File

@ -32,7 +32,7 @@ import org.slf4j.LoggerFactory;
public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor implements MessageRecoveryListener {
private static final Logger LOG = LoggerFactory.getLogger(AbstractStoreCursor.class);
protected final Destination regionDestination;
private final PendingList batchList;
protected final PendingList batchList;
private Iterator<MessageReference> iterator = null;
protected boolean batchResetNeeded = true;
private boolean storeHasMessages = false;
@ -102,7 +102,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
}
public final void reset() {
public final synchronized void reset() {
if (batchList.isEmpty()) {
try {
fillBatch();
@ -185,7 +185,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
if (LOG.isTraceEnabled()) {
LOG.trace(this + " - disabling cache"
+ ", lastCachedId: " + lastCachedId
+ " current node Id: " + node.getMessageId());
+ " current node Id: " + node.getMessageId() + " batchList size: " + batchList.size());
}
setBatch(lastCachedId);
lastCachedId = null;

View File

@ -290,15 +290,15 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
*/
@Override
public synchronized MessageReference next() {
Message message = (Message) iter.next();
last = message;
MessageReference reference = iter.next();
last = reference;
if (!isDiskListEmpty()) {
// got from disk
message.setRegionDestination(regionDestination);
message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
reference.getMessage().setRegionDestination(regionDestination);
reference.getMessage().setMemoryUsage(this.getSystemUsage().getMemoryUsage());
}
message.incrementReferenceCount();
return message;
reference.incrementReferenceCount();
return reference;
}
/**

View File

@ -42,7 +42,6 @@ import org.slf4j.LoggerFactory;
public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
private static final Logger LOG = LoggerFactory.getLogger(StoreDurableSubscriberCursor.class);
private static final int UNKNOWN = -1;
private final String clientId;
private final String subscriberName;
private final Map<Destination, TopicStorePrefetch> topics = new HashMap<Destination, TopicStorePrefetch>();
@ -50,7 +49,6 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
private final PendingMessageCursor nonPersistent;
private PendingMessageCursor currentCursor;
private final DurableTopicSubscription subscription;
private int cacheCurrentLowestPriority = UNKNOWN;
private boolean immediatePriorityDispatch = true;
/**
* @param broker Broker for this cursor
@ -125,6 +123,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
tsp.setMessageAudit(getMessageAudit());
tsp.setEnableAudit(isEnableAudit());
tsp.setMemoryUsageHighWaterMark(getMemoryUsageHighWaterMark());
tsp.setUseCache(isUseCache());
topics.put(destination, tsp);
storePrefetches.add(tsp);
if (isStarted()) {
@ -196,29 +195,17 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
Destination dest = msg.getRegionDestination();
TopicStorePrefetch tsp = topics.get(dest);
if (tsp != null) {
// cache can become high priority cache for immediate dispatch
final int priority = msg.getPriority();
if (isStarted() && this.prioritizedMessages && immediatePriorityDispatch && !tsp.isCacheEnabled()) {
if (priority > tsp.getCurrentLowestPriority()) {
if (LOG.isTraceEnabled()) {
LOG.trace("enabling cache for cursor on high priority message " + priority
+ ", current lowest: " + tsp.getCurrentLowestPriority());
}
tsp.setCacheEnabled(true);
cacheCurrentLowestPriority = tsp.getCurrentLowestPriority();
}
} else if (cacheCurrentLowestPriority != UNKNOWN && priority <= cacheCurrentLowestPriority) {
// go to the store to get next priority message as lower priority messages may be recovered
// already and need to acked sequence order
if (LOG.isTraceEnabled()) {
LOG.trace("disabling/clearing cache for cursor on lower priority message "
+ priority + ", tsp current lowest: " + tsp.getCurrentLowestPriority()
+ " cache lowest: " + cacheCurrentLowestPriority);
}
tsp.setCacheEnabled(false);
cacheCurrentLowestPriority = UNKNOWN;
}
tsp.addMessageLast(node);
if (prioritizedMessages && immediatePriorityDispatch && tsp.isPaging()) {
if (msg.getPriority() > tsp.getLastRecoveredPriority()) {
tsp.recoverMessage(node.getMessage(), true);
if (LOG.isTraceEnabled()) {
LOG.trace("cached high priority (" + msg.getPriority() + ") message:" +
msg.getMessageId() + ", current paged batch priority: " +
tsp.getLastRecoveredPriority() + ", cache size:" + tsp.batchList.size());
}
}
}
}
}
@ -330,7 +317,6 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
for (PendingMessageCursor tsp : storePrefetches) {
tsp.gc();
}
cacheCurrentLowestPriority = UNKNOWN;
}
@Override

View File

@ -38,8 +38,7 @@ class TopicStorePrefetch extends AbstractStoreCursor {
private final String clientId;
private final String subscriberName;
private final Subscription subscription;
private int currentLowestPriority;
private byte lastRecoveredPriority = 9;
/**
* @param topic
* @param clientId
@ -53,15 +52,6 @@ class TopicStorePrefetch extends AbstractStoreCursor {
this.subscriberName = subscriberName;
this.maxProducersToAudit=32;
this.maxAuditDepth=10000;
resetCurrentLowestPriority();
}
private void resetCurrentLowestPriority() {
currentLowestPriority = 9;
}
public synchronized int getCurrentLowestPriority() {
return currentLowestPriority;
}
public boolean recoverMessageReference(MessageId messageReference) throws Exception {
@ -80,8 +70,8 @@ class TopicStorePrefetch extends AbstractStoreCursor {
messageEvaluationContext.setMessageReference(message);
if (this.subscription.matches(message, messageEvaluationContext)) {
recovered = super.recoverMessage(message, cached);
if (recovered) {
currentLowestPriority = Math.min(currentLowestPriority, message.getPriority());
if (recovered && !cached) {
lastRecoveredPriority = message.getPriority();
}
}
return recovered;
@ -100,14 +90,9 @@ class TopicStorePrefetch extends AbstractStoreCursor {
@Override
protected synchronized boolean isStoreEmpty() {
try {
boolean empty = this.store.isEmpty();
if (empty) {
resetCurrentLowestPriority();
}
return empty;
return this.store.isEmpty();
} catch (Exception e) {
LOG.error("Failed to get message count", e);
LOG.error("Failed to determine if store is empty", e);
throw new RuntimeException(e);
}
}
@ -118,18 +103,20 @@ class TopicStorePrefetch extends AbstractStoreCursor {
this.store.resetBatching(clientId, subscriberName);
}
@Override
public synchronized void gc() {
super.gc();
resetCurrentLowestPriority();
}
@Override
protected void doFillBatch() throws Exception {
this.store.recoverNextMessages(clientId, subscriberName,
maxBatchSize, this);
}
public byte getLastRecoveredPriority() {
return lastRecoveredPriority;
}
public final boolean isPaging() {
return !isCacheEnabled() && !batchList.isEmpty();
}
@Override
public String toString() {
return "TopicStorePrefetch(" + clientId + "," + subscriberName + ")" + super.toString();

View File

@ -34,6 +34,7 @@ import org.apache.activemq.kaha.Store;
*/
public class StorePendingDurableSubscriberMessageStoragePolicy implements PendingDurableSubscriberMessageStoragePolicy {
boolean immediatePriorityDispatch = true;
boolean useCache = true;
public boolean isImmediatePriorityDispatch() {
return immediatePriorityDispatch;
@ -42,6 +43,7 @@ public class StorePendingDurableSubscriberMessageStoragePolicy implements Pendin
/**
* Ensure that new higher priority messages will get an immediate dispatch
* rather than wait for the end of the current cursor batch.
* Useful when there is a large message backlog and intermittent high priority messages.
*
* @param immediatePriorityDispatch
*/
@ -49,6 +51,14 @@ public class StorePendingDurableSubscriberMessageStoragePolicy implements Pendin
this.immediatePriorityDispatch = immediatePriorityDispatch;
}
public boolean isUseCache() {
return useCache;
}
public void setUseCache(boolean useCache) {
this.useCache = useCache;
}
/**
* Retrieve the configured pending message storage cursor;
* @param broker
@ -61,6 +71,7 @@ public class StorePendingDurableSubscriberMessageStoragePolicy implements Pendin
*/
public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name, int maxBatchSize, DurableTopicSubscription sub) {
StoreDurableSubscriberCursor cursor = new StoreDurableSubscriberCursor(broker,clientId, name, maxBatchSize, sub);
cursor.setUseCache(isUseCache());
cursor.setImmediatePriorityDispatch(isImmediatePriorityDispatch());
return cursor;
}

View File

@ -70,7 +70,7 @@ public interface JDBCAdapter {
void doDeleteSubscription(TransactionContext c, ActiveMQDestination destinationName, String clientId, String subscriptionName) throws SQLException, IOException;
void doDeleteOldMessages(TransactionContext c, boolean isPrioritizedMessages) throws SQLException, IOException;
void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException;
long doGetLastMessageStoreSequenceId(TransactionContext c) throws SQLException, IOException;

View File

@ -101,7 +101,8 @@ public class JDBCMessageStore extends AbstractMessageStore {
// Get a connection and insert the message into the DB.
TransactionContext c = persistenceAdapter.getTransactionContext(context);
try {
adapter.doAddMessage(c,sequenceId, messageId, destination, data, message.getExpiration(), message.getPriority());
adapter.doAddMessage(c,sequenceId, messageId, destination, data, message.getExpiration(),
this.isPrioritizedMessages() ? message.getPriority() : 0);
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);

View File

@ -334,8 +334,7 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
try {
LOG.debug("Cleaning up old messages.");
c = getTransactionContext();
getAdapter().doDeleteOldMessages(c, false);
getAdapter().doDeleteOldMessages(c, true);
getAdapter().doDeleteOldMessages(c);
} catch (IOException e) {
LOG.warn("Old message cleanup failed due to: " + e, e);
} catch (SQLException e) {

View File

@ -183,11 +183,11 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
}
public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
if (delegate.hasSpace()) {
if (delegate.hasSpace() && recoveredCount < maxMessages) {
Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
msg.getMessageId().setBrokerSequenceId(sequenceId);
if (delegate.recoverMessage(msg)) {
lastRecovered.recovered = sequenceId;
if (delegate.recoverMessage(msg)) {
recoveredCount++;
return true;
}
@ -236,7 +236,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
//Duration microDuration = new Duration("recoverNextMessages:loop");
adapter.doRecoverNextMessagesWithPriority(c, destination, clientId, subscriptionName,
entry.recovered, entry.priority, maxReturned, recoveredAwareListener);
//microDuration.end(entry);
//microDuration.end(new String(entry + " recoveredCount:" + recoveredAwareListener.recoveredCount));
if (recoveredAwareListener.stalled()) {
if (recoveredAwareListener.complete()) {
break;

View File

@ -56,7 +56,6 @@ public class Statements {
private String findAllDestinationsStatement;
private String removeAllMessagesStatement;
private String removeAllSubscriptionsStatement;
private String deleteOldMessagesStatement;
private String[] createSchemaStatements;
private String[] dropSchemaStatements;
private String lockCreateStatement;
@ -379,34 +378,17 @@ public class Statements {
public String getDeleteOldMessagesStatementWithPriority() {
if (deleteOldMessagesStatementWithPriority == null) {
deleteOldMessagesStatementWithPriority = "DELETE FROM " + getFullMessageTableName()
+ " WHERE ( EXPIRATION<>0 AND EXPIRATION<?)"
+ " OR (ID <= "
+ " WHERE (PRIORITY=? AND ID <= "
+ " ( SELECT min(" + getFullAckTableName() + ".LAST_ACKED_ID)"
+ " FROM " + getFullAckTableName() + " WHERE "
+ getFullAckTableName() + ".CONTAINER="
+ getFullMessageTableName() + ".CONTAINER"
+ " AND " + getFullAckTableName() + ".PRIORITY=" + getFullMessageTableName() + ".PRIORITY )"
+ " AND " + getFullAckTableName() + ".PRIORITY=?)"
+ " )";
}
return deleteOldMessagesStatementWithPriority;
}
public String getDeleteOldMessagesStatement() {
if (deleteOldMessagesStatement == null) {
deleteOldMessagesStatement = "DELETE FROM " + getFullMessageTableName()
+ " WHERE ( EXPIRATION<>0 AND EXPIRATION<?)"
+ " OR (ID <= "
+ " ( SELECT min(" + getFullAckTableName() + ".LAST_ACKED_ID)"
+ " FROM " + getFullAckTableName() + " WHERE "
+ getFullAckTableName() + ".CONTAINER="
+ getFullMessageTableName() + ".CONTAINER )"
+ " )";
}
return deleteOldMessagesStatement;
}
public String getLockCreateStatement() {
if (lockCreateStatement == null) {
lockCreateStatement = "SELECT * FROM " + getFullLockTableName();
@ -654,12 +636,8 @@ public class Statements {
this.createSchemaStatements = createSchemaStatments;
}
public void setDeleteOldMessagesStatement(String deleteOldMessagesStatment) {
this.deleteOldMessagesStatement = deleteOldMessagesStatment;
}
public void setDeleteOldMessagesStatementWithPriority(String deleteOldMessagesStatmentWithPriority) {
this.deleteOldMessagesStatementWithPriority = deleteOldMessagesStatmentWithPriority;
public void setDeleteOldMessagesStatementWithPriority(String deleteOldMessagesStatementWithPriority) {
this.deleteOldMessagesStatementWithPriority = deleteOldMessagesStatementWithPriority;
}
public void setDeleteSubscriptionStatement(String deleteSubscriptionStatment) {

View File

@ -54,7 +54,6 @@ public class AxionJDBCAdapter extends StreamJDBCAdapter {
+ ", PRIMARY KEY ( CONTAINER, CLIENT_ID, SUB_NAME))"
};
statements.setCreateSchemaStatements(createStatements);
statements.setDeleteOldMessagesStatement("DELETE FROM " + statements.getFullMessageTableName() + " WHERE ( EXPIRATION<>0 AND EXPIRATION<?)");
statements.setLongDataType("LONG");
statements.setSequenceDataType("LONG");

View File

@ -539,7 +539,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesByPriorityStatement());
s.setMaxRows(maxRows);
s.setMaxRows(Math.max(maxReturned * 2, maxRows));
s.setString(1, destination.getQualifiedName());
s.setString(2, clientId);
s.setString(3, subscriptionName);
@ -739,20 +739,18 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
}
public void doDeleteOldMessages(TransactionContext c, boolean isPrioritizedMessages) throws SQLException, IOException {
int priorityIterator = 0;
public void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException {
PreparedStatement s = null;
cleanupExclusiveLock.writeLock().lock();
try {
if (isPrioritizedMessages) {
LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatementWithPriority());
s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatementWithPriority());
} else {
LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatement());
s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatement());
}
s.setLong(1, System.currentTimeMillis());
int priority = priorityIterator++%10;
s.setInt(1, priority);
s.setInt(2, priority);
int i = s.executeUpdate();
LOG.debug("Deleted " + i + " old message(s).");
LOG.debug("Deleted " + i + " old message(s) at priority: " + priority);
} finally {
cleanupExclusiveLock.writeLock().unlock();
close(s);

View File

@ -33,6 +33,7 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
@ -75,10 +76,19 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
StorePendingDurableSubscriberMessageStoragePolicy durableSubPending =
new StorePendingDurableSubscriberMessageStoragePolicy();
durableSubPending.setImmediatePriorityDispatch(immediatePriorityDispatch);
durableSubPending.setUseCache(useCache);
policy.setPendingDurableSubscriberPolicy(durableSubPending);
PolicyMap policyMap = new PolicyMap();
policyMap.put(new ActiveMQQueue("TEST"), policy);
policyMap.put(new ActiveMQTopic("TEST"), policy);
// do not process expired for one test
PolicyEntry ignoreExpired = new PolicyEntry();
SharedDeadLetterStrategy ignoreExpiredStrategy = new SharedDeadLetterStrategy();
ignoreExpiredStrategy.setProcessExpired(false);
ignoreExpired.setDeadLetterStrategy(ignoreExpiredStrategy);
policyMap.put(new ActiveMQTopic("TEST_CLEANUP_NO_PRIORITY"), ignoreExpired);
broker.setDestinationPolicy(policyMap);
broker.start();
broker.waitUntilStarted();
@ -305,7 +315,7 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
int lowLowCount = 0;
for (int i=0; i<numToProduce; i++) {
Message msg = sub.receive(15000);
LOG.info("received i=" + i + ", " + (msg!=null? msg.getJMSMessageID() + "-" + msg.getJMSPriority() : null));
LOG.info("received i=" + i + ", " + (msg!=null? msg.getJMSMessageID() + ", priority:" + msg.getJMSPriority() : null));
assertNotNull("Message " + i + " was null", msg);
assertEquals("Message " + i + " has wrong priority", LOW_PRI+1, msg.getJMSPriority());
assertTrue("not duplicate ", dups[i] == 0);
@ -353,4 +363,177 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
}
}
public void initCombosForTestHighPriorityDeliveryInterleaved() {
addCombinationValues("useCache", new Object[] {Boolean.TRUE, Boolean.FALSE});
}
public void testHighPriorityDeliveryInterleaved() throws Exception {
// get zero prefetch
ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
prefetch.setAll(0);
factory.setPrefetchPolicy(prefetch);
conn.close();
conn = factory.createConnection();
conn.setClientID("priority");
conn.start();
sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQTopic topic = (ActiveMQTopic)sess.createTopic("TEST");
final String subName = "priorityDisconnect";
TopicSubscriber sub = sess.createDurableSubscriber(topic, subName);
sub.close();
ProducerThread producerThread = new ProducerThread(topic, 1, HIGH_PRI);
producerThread.run();
producerThread.setMessagePriority(HIGH_PRI -1);
producerThread.setMessageCount(1);
producerThread.run();
producerThread.setMessagePriority(LOW_PRI);
producerThread.setMessageCount(1);
producerThread.run();
LOG.info("Ordered priority messages sent");
sub = sess.createDurableSubscriber(topic, subName);
int count = 0;
Message msg = sub.receive(15000);
assertNotNull("Message was null", msg);
LOG.info("received " + msg.getJMSMessageID() + ", priority:" + msg.getJMSPriority());
assertEquals("Message has wrong priority", HIGH_PRI, msg.getJMSPriority());
producerThread.setMessagePriority(LOW_PRI+1);
producerThread.setMessageCount(1);
producerThread.run();
msg = sub.receive(15000);
assertNotNull("Message was null", msg);
LOG.info("received " + msg.getJMSMessageID() + ", priority:" + msg.getJMSPriority());
assertEquals("high priority", HIGH_PRI -1, msg.getJMSPriority());
msg = sub.receive(15000);
assertNotNull("Message was null", msg);
LOG.info("received hi? : " + msg);
assertEquals("high priority", LOW_PRI +1, msg.getJMSPriority());
msg = sub.receive(15000);
assertNotNull("Message was null", msg);
LOG.info("received hi? : " + msg);
assertEquals("high priority", LOW_PRI, msg.getJMSPriority());
msg = sub.receive(4000);
assertNull("Message was null", msg);
}
// immediatePriorityDispatch is only relevant when cache is exhausted
public void initCombosForTestHighPriorityDeliveryThroughBackLog() {
addCombinationValues("useCache", new Object[] {Boolean.FALSE});
addCombinationValues("immediatePriorityDispatch", new Object[] {Boolean.TRUE});
}
public void testHighPriorityDeliveryThroughBackLog() throws Exception {
// get zero prefetch
ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
prefetch.setAll(0);
factory.setPrefetchPolicy(prefetch);
conn.close();
conn = factory.createConnection();
conn.setClientID("priority");
conn.start();
sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQTopic topic = (ActiveMQTopic)sess.createTopic("TEST");
final String subName = "priorityDisconnect";
TopicSubscriber sub = sess.createDurableSubscriber(topic, subName);
sub.close();
ProducerThread producerThread = new ProducerThread(topic, 600, LOW_PRI);
producerThread.run();
sub = sess.createDurableSubscriber(topic, subName);
int count = 0;
for (;count < 300; count++) {
Message msg = sub.receive(15000);
assertNotNull("Message was null", msg);
assertEquals("high priority", LOW_PRI, msg.getJMSPriority());
}
producerThread.setMessagePriority(HIGH_PRI);
producerThread.setMessageCount(1);
producerThread.run();
Message msg = sub.receive(15000);
assertNotNull("Message was null", msg);
assertEquals("high priority", HIGH_PRI, msg.getJMSPriority());
for (;count < 600; count++) {
msg = sub.receive(15000);
assertNotNull("Message was null", msg);
assertEquals("high priority", LOW_PRI, msg.getJMSPriority());
}
}
public void initCombosForTestHighPriorityNonDeliveryThroughBackLog() {
addCombinationValues("useCache", new Object[] {Boolean.FALSE});
addCombinationValues("immediatePriorityDispatch", new Object[] {Boolean.FALSE});
}
public void testHighPriorityNonDeliveryThroughBackLog() throws Exception {
// get zero prefetch
ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
prefetch.setAll(0);
factory.setPrefetchPolicy(prefetch);
conn.close();
conn = factory.createConnection();
conn.setClientID("priority");
conn.start();
sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQTopic topic = (ActiveMQTopic)sess.createTopic("TEST");
final String subName = "priorityDisconnect";
TopicSubscriber sub = sess.createDurableSubscriber(topic, subName);
sub.close();
ProducerThread producerThread = new ProducerThread(topic, 600, LOW_PRI);
producerThread.run();
sub = sess.createDurableSubscriber(topic, subName);
int count = 0;
for (;count < 300; count++) {
Message msg = sub.receive(15000);
assertNotNull("Message was null", msg);
assertEquals("high priority", LOW_PRI, msg.getJMSPriority());
}
producerThread.setMessagePriority(HIGH_PRI);
producerThread.setMessageCount(1);
producerThread.run();
for (;count < 400; count++) {
Message msg = sub.receive(15000);
assertNotNull("Message was null", msg);
assertEquals("high priority", LOW_PRI, msg.getJMSPriority());
}
Message msg = sub.receive(15000);
assertNotNull("Message was null", msg);
assertEquals("high priority", HIGH_PRI, msg.getJMSPriority());
for (;count < 600; count++) {
msg = sub.receive(15000);
assertNotNull("Message was null", msg);
assertEquals("high priority", LOW_PRI, msg.getJMSPriority());
}
}
}

View File

@ -17,9 +17,13 @@
package org.apache.activemq.store.jdbc;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Vector;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
@ -30,9 +34,11 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TopicSubscriber;
import junit.framework.Test;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.MessagePriorityTest;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.util.ThreadTracker;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -42,10 +48,11 @@ public class JDBCMessagePriorityTest extends MessagePriorityTest {
private static final Logger LOG = LoggerFactory.getLogger(JDBCMessagePriorityTest.class);
EmbeddedDataSource dataSource;
JDBCPersistenceAdapter jdbc;
@Override
protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception {
JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
jdbc = new JDBCPersistenceAdapter();
dataSource = new EmbeddedDataSource();
dataSource.setDatabaseName("derbyDb");
dataSource.setCreateDatabase("create");
@ -136,10 +143,12 @@ public class JDBCMessagePriorityTest extends MessagePriorityTest {
final int maxPriority = 5;
final AtomicInteger[] messageCounts = new AtomicInteger[maxPriority];
final long[] messageIds = new long[maxPriority];
Vector<ProducerThread> producers = new Vector<ProducerThread>();
for (int priority = 0; priority < maxPriority; priority++) {
producers.add(new ProducerThread(topic, MSG_NUM, priority));
messageCounts[priority] = new AtomicInteger(0);
messageIds[priority] = 1l;
}
for (ProducerThread producer : producers) {
@ -154,9 +163,12 @@ public class JDBCMessagePriorityTest extends MessagePriorityTest {
LOG.debug("received i=" + i + ", m=" + (msg != null ?
msg.getJMSMessageID() + ", priority: " + msg.getJMSPriority()
: null));
assertNotNull("Message " + i + " was null, counts: " + Arrays.toString(messageCounts), msg);
assertNull("no duplicate message failed on : " + msg.getJMSMessageID(), dups.put(msg.getJMSMessageID(), subName));
assertNotNull("Message " + i + " was null", msg);
messageCounts[msg.getJMSPriority()].incrementAndGet();
assertEquals("message is in order : " + msg,
messageIds[msg.getJMSPriority()],((ActiveMQMessage)msg).getMessageId().getProducerSequenceId());
messageIds[msg.getJMSPriority()]++;
if (i > 0 && i % closeFrequency == 0) {
LOG.info("Closing durable sub.. on: " + i + ", counts: " + Arrays.toString(messageCounts));
sub.close();
@ -273,6 +285,162 @@ public class JDBCMessagePriorityTest extends MessagePriorityTest {
assertEquals("got all messages", TO_SEND * 2, count.get());
}
public void testCleanupPriorityDestination() throws Exception {
assertEquals("no messages pending", 0, messageTableCount());
ActiveMQTopic topic = (ActiveMQTopic) sess.createTopic("TEST");
final String subName = "priorityConcurrent";
Connection consumerConn = factory.createConnection();
consumerConn.setClientID("subName");
consumerConn.start();
Session consumerSession = consumerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSubscriber sub = consumerSession.createDurableSubscriber(topic, subName);
sub.close();
MessageProducer messageProducer = sess.createProducer(topic);
Message message = sess.createTextMessage();
message.setJMSPriority(2);
messageProducer.send(message, DeliveryMode.PERSISTENT, message.getJMSPriority(), 0);
message.setJMSPriority(5);
messageProducer.send(message, DeliveryMode.PERSISTENT, message.getJMSPriority(), 0);
assertEquals("two messages pending", 2, messageTableCount());
sub = consumerSession.createDurableSubscriber(topic, subName);
message = sub.receive(5000);
assertEquals("got high priority", 5, message.getJMSPriority());
waitForAck(5);
for (int i=0; i<10; i++) {
jdbc.cleanup();
}
assertEquals("one messages pending", 1, messageTableCount());
message = sub.receive(5000);
assertEquals("got high priority", 2, message.getJMSPriority());
waitForAck(2);
for (int i=0; i<10; i++) {
jdbc.cleanup();
}
assertEquals("no messages pending", 0, messageTableCount());
}
public void testCleanupNonPriorityDestination() throws Exception {
assertEquals("no messages pending", 0, messageTableCount());
ActiveMQTopic topic = (ActiveMQTopic) sess.createTopic("TEST_CLEANUP_NO_PRIORITY");
final String subName = "subName";
Connection consumerConn = factory.createConnection();
consumerConn.setClientID("subName");
consumerConn.start();
Session consumerSession = consumerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSubscriber sub = consumerSession.createDurableSubscriber(topic, subName);
sub.close();
MessageProducer messageProducer = sess.createProducer(topic);
Message message = sess.createTextMessage("ToExpire");
messageProducer.send(message, DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, 4000);
message = sess.createTextMessage("A");
messageProducer.send(message);
message = sess.createTextMessage("B");
messageProducer.send(message);
message = null;
assertEquals("three messages pending", 3, messageTableCount());
// let first message expire
TimeUnit.SECONDS.sleep(5);
sub = consumerSession.createDurableSubscriber(topic, subName);
message = sub.receive(5000);
assertNotNull("got message", message);
LOG.info("Got: " + message);
waitForAck(0, 1);
for (int i=0; i<10; i++) {
jdbc.cleanup();
}
assertEquals("one messages pending", 1, messageTableCount());
message = sub.receive(5000);
assertNotNull("got message two", message);
LOG.info("Got: " + message);
waitForAck(0, 2);
for (int i=0; i<10; i++) {
jdbc.cleanup();
}
assertEquals("no messages pending", 0, messageTableCount());
}
private int messageTableCount() throws Exception {
int count = -1;
java.sql.Connection c = dataSource.getConnection();
try {
PreparedStatement s = c.prepareStatement("SELECT COUNT(*) FROM ACTIVEMQ_MSGS");
ResultSet rs = s.executeQuery();
if (rs.next()) {
count = rs.getInt(1);
}
} finally {
if (c!=null) {
c.close();
}
}
return count;
}
private void waitForAck(final int priority) throws Exception {
waitForAck(priority, 0);
}
private void waitForAck(final int priority, final int minId) throws Exception {
assertTrue("got ack for " + priority, Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
int id = 0;
java.sql.Connection c = dataSource.getConnection();
try {
PreparedStatement s = c.prepareStatement("SELECT LAST_ACKED_ID FROM ACTIVEMQ_ACKS WHERE PRIORITY=" + priority);
ResultSet rs = s.executeQuery();
if (rs.next()) {
id = rs.getInt(1);
}
} finally {
if (c!=null) {
c.close();
}
}
return id>minId;
}
}));
}
private int messageTableDump() throws Exception {
int count = -1;
java.sql.Connection c = dataSource.getConnection();
try {
PreparedStatement s = c.prepareStatement("SELECT * FROM ACTIVEMQ_MSGS");
ResultSet rs = s.executeQuery();
if (rs.next()) {
count = rs.getInt(1);
}
} finally {
if (c!=null) {
c.close();
}
}
return count;
}
public static Test suite() {
return suite(JDBCMessagePriorityTest.class);
}

View File

@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
@ -48,6 +49,8 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
//import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.util.MessageIdList;
import org.apache.activemq.util.Wait;
//import org.apache.commons.dbcp.BasicDataSource;
@ -113,7 +116,7 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport {
});
double[] statsWithActive = produceMessages(destination, 300, 10, session, producer, addConsumerSignal);
double[] statsWithActive = produceMessages(destination, 500, 10, session, producer, addConsumerSignal);
LOG.info(" with concurrent activate, ave: " + statsWithActive[1] + ", max: " + statsWithActive[0] + ", multiplier: " + (statsWithActive[0]/ statsWithActive[1]));
@ -130,7 +133,7 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport {
LOG.info("Ave time to first message =" + timeToFirstAccumulator/consumers.size());
for (TimedMessageListener listener : consumers.values()) {
LOG.info("Ave batch receipt time: " + listener.waitForReceivedLimit(5000) + " max receipt: " + listener.maxReceiptTime);
LOG.info("Ave batch receipt time: " + listener.waitForReceivedLimit(10000) + " max receipt: " + listener.maxReceiptTime);
}
//assertTrue("max (" + statsWithActive[0] + ") within reasonable multiplier of ave (" + statsWithActive[1] + ")",
@ -249,7 +252,9 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport {
for (int j=0; j < toSend; j++) {
long singleSendstart = System.currentTimeMillis();
TextMessage msg = createTextMessage(session, "" + j);
producer.send(msg);
// rotate
int priority = ((int)count%10);
producer.send(msg, DeliveryMode.PERSISTENT, priority, 0);
max = Math.max(max, (System.currentTimeMillis() - singleSendstart));
if (++count % 500 == 0) {
if (addConsumerSignal != null) {
@ -328,6 +333,12 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport {
policy.setPrioritizedMessages(true);
policy.setMaxPageSize(500);
StorePendingDurableSubscriberMessageStoragePolicy durableSubPending =
new StorePendingDurableSubscriberMessageStoragePolicy();
durableSubPending.setImmediatePriorityDispatch(true);
durableSubPending.setUseCache(true);
policy.setPendingDurableSubscriberPolicy(durableSubPending);
PolicyMap policyMap = new PolicyMap();
policyMap.setDefaultEntry(policy);
brokerService.setDestinationPolicy(policyMap);
@ -390,19 +401,27 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport {
long batchReceiptAccumulator = 0;
long maxReceiptTime = 0;
AtomicLong count = new AtomicLong(0);
Map<Integer, MessageIdList> messageLists = new ConcurrentHashMap<Integer, MessageIdList>(new HashMap<Integer, MessageIdList>());
@Override
public void onMessage(Message message) {
final long current = System.currentTimeMillis();
final long duration = current - mark;
receiptAccumulator += duration;
allMessagesList.onMessage(message);
int priority = 0;
try {
priority = message.getJMSPriority();
} catch (JMSException ignored) {}
if (!messageLists.containsKey(priority)) {
messageLists.put(priority, new MessageIdList());
}
messageLists.get(priority).onMessage(message);
if (count.incrementAndGet() == 1) {
firstReceipt = duration;
firstReceiptLatch.countDown();
LOG.info("First receipt in " + firstReceipt + "ms");
} else if (count.get() % batchSize == 0) {
LOG.info("Consumed " + batchSize + " in " + batchReceiptAccumulator + "ms");
LOG.info("Consumed " + count.get() + " in " + batchReceiptAccumulator + "ms" + ", priority:" + priority);
batchReceiptAccumulator=0;
}
maxReceiptTime = Math.max(maxReceiptTime, duration);
@ -427,9 +446,36 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport {
throw new RuntimeException("Expired waiting for X messages, " + limit);
}
TimeUnit.SECONDS.sleep(2);
String missing = findFirstMissingMessage();
if (missing != null) {
LOG.info("first missing = " + missing);
throw new RuntimeException("We have a missing message. " + missing);
}
}
return receiptAccumulator/(limit/batchSize);
}
private String findFirstMissingMessage() {
MessageId current = new MessageId();
for (MessageIdList priorityList : messageLists.values()) {
MessageId previous = null;
for (String id : priorityList.getMessageIds()) {
current.setValue(id);
if (previous == null) {
previous = current.copy();
} else {
if (current.getProducerSequenceId() - 1 != previous.getProducerSequenceId() &&
current.getProducerSequenceId() - 10 != previous.getProducerSequenceId()) {
return "Missing next after: " + previous + ", got: " + current;
} else {
previous = current.copy();
}
}
}
}
return null;
}
}
}