respect storeHasMessages such that the store batch size, form max page size is respected, improves preformance by negating the replay of high priority inflight messages as a the tail of a backlog is recovered. Fix order issue with high priority cache such that sequence order of lower priority messages is respected. additional test for same and default audit size to 10 when priority is enabled

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1045219 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-12-13 16:20:40 +00:00
parent 054fc6aca5
commit 3ddb71c65b
12 changed files with 208 additions and 53 deletions

View File

@ -49,6 +49,9 @@ public abstract class BaseDestination implements Destination {
public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2;
public static final long EXPIRE_MESSAGE_PERIOD = 30 * 1000;
public static final long DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC = 60 * 1000;
public static final int MAX_PRODUCERS_TO_AUDIT = 64;
public static final int MAX_AUDIT_DEPTH = 2048;
protected final ActiveMQDestination destination;
protected final Broker broker;
protected final MessageStore store;

View File

@ -63,7 +63,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
}
public boolean isActive() {
public final boolean isActive() {
return active.get();
}
@ -220,6 +220,12 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
super.add(node);
}
protected void dispatchPending() throws IOException {
if (isActive()) {
super.dispatchPending();
}
}
protected void doAddRecoveredMessage(MessageReference message) throws Exception {
synchronized(pending) {
pending.addRecoveredMessage(message);
@ -239,7 +245,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
}
protected boolean canDispatch(MessageReference node) {
return active.get();
return isActive();
}
protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException {

View File

@ -40,8 +40,8 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs
protected int memoryUsageHighWaterMark = 70;
protected int maxBatchSize = BaseDestination.MAX_PAGE_SIZE;
protected SystemUsage systemUsage;
protected int maxProducersToAudit=1024;
protected int maxAuditDepth=1000;
protected int maxProducersToAudit = BaseDestination.MAX_PRODUCERS_TO_AUDIT;
protected int maxAuditDepth = BaseDestination.MAX_AUDIT_DEPTH;
protected boolean enableAudit=true;
protected ActiveMQMessageAudit audit;
protected boolean useCache=true;

View File

@ -36,10 +36,11 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
private Iterator<MessageReference> iterator = null;
protected boolean cacheEnabled=false;
protected boolean batchResetNeeded = true;
protected boolean storeHasMessages = false;
private boolean storeHasMessages = false;
protected int size;
private MessageId lastCachedId;
private boolean hadSpace = false;
protected AbstractStoreCursor(Destination destination) {
super((destination != null ? destination.isPrioritizedMessages():false));
this.regionDestination=destination;
@ -89,6 +90,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
batchList.addMessageLast(message);
clearIterator(true);
recovered = true;
storeHasMessages = true;
} else {
/*
* we should expect to get these - as the message is recorded as it before it goes into
@ -99,7 +101,6 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
LOG.trace(regionDestination.getActiveMQDestination().getPhysicalName()
+ " cursor got duplicate: " + message.getMessageId() + ", " + message.getPriority());
}
storeHasMessages = true;
}
return recovered;
}
@ -187,6 +188,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
}
}
}
this.storeHasMessages = true;
size++;
}
@ -229,7 +231,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
}
public final synchronized void gc() {
public synchronized void gc() {
for (Iterator<MessageReference>i = batchList.iterator();i.hasNext();) {
MessageReference msg = i.next();
rollback(msg.getMessageId());
@ -240,8 +242,13 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
batchResetNeeded = true;
this.cacheEnabled=false;
}
@Override
public boolean hasSpace() {
hadSpace = super.hasSpace();
return hadSpace;
}
protected final synchronized void fillBatch() {
if (LOG.isTraceEnabled()) {
LOG.trace("fillBatch - batchResetNeeded=" + batchResetNeeded
@ -251,7 +258,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
resetBatch();
this.batchResetNeeded = false;
}
if( this.batchList.isEmpty() && (this.storeHasMessages ||this.size >0)) {
if (this.batchList.isEmpty() && this.storeHasMessages && this.size >0) {
this.storeHasMessages = false;
try {
doFillBatch();
@ -259,7 +266,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
LOG.error("Failed to fill batch", e);
throw new RuntimeException(e);
}
if (!this.batchList.isEmpty()) {
if (!this.batchList.isEmpty() || !hadSpace) {
this.storeHasMessages=true;
}
}

View File

@ -50,7 +50,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
private final PendingMessageCursor nonPersistent;
private PendingMessageCursor currentCursor;
private final Subscription subscription;
private int cacheCurrentPriority = UNKNOWN;
private int cacheCurrentLowestPriority = UNKNOWN;
private boolean immediatePriorityDispatch = true;
/**
* @param broker Broker for this cursor
@ -187,27 +187,27 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
Destination dest = msg.getRegionDestination();
TopicStorePrefetch tsp = topics.get(dest);
if (tsp != null) {
// tps becomes a highest priority only cache when we have a new higher priority
// message and we are not currently caching
// cache can be come high priority cache for immediate dispatch
final int priority = msg.getPriority();
if (isStarted() && this.prioritizedMessages && immediatePriorityDispatch && !tsp.cacheEnabled) {
if (priority > tsp.getLastDispatchPriority()) {
// go get the latest priority message
if (priority > tsp.getCurrentLowestPriority()) {
if (LOG.isTraceEnabled()) {
LOG.trace("enabling cache for cursor on high priority message " + priority);
LOG.trace("enabling cache for cursor on high priority message " + priority
+ ", current lowest: " + tsp.getCurrentLowestPriority());
}
tsp.cacheEnabled = true;
cacheCurrentPriority = priority;
cacheCurrentLowestPriority = tsp.getCurrentLowestPriority();
}
} else if (cacheCurrentPriority > 0 && priority < cacheCurrentPriority) {
} else if (cacheCurrentLowestPriority != UNKNOWN && priority <= cacheCurrentLowestPriority) {
// go to the store to get next priority message as lower priority messages may be recovered
// already
tsp.clear();
cacheCurrentPriority = UNKNOWN;
// already and need to acked sequence order
if (LOG.isTraceEnabled()) {
LOG.trace("disabling/clearing cache for cursor on lower priority message " + priority);
LOG.trace("disabling/clearing cache for cursor on lower priority message "
+ priority + ", tsp current lowest: " + tsp.getCurrentLowestPriority()
+ " cache lowest: " + cacheCurrentLowestPriority);
}
tsp.cacheEnabled = false;
cacheCurrentLowestPriority = UNKNOWN;
}
tsp.addMessageLast(node);
}
@ -299,6 +299,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
for (PendingMessageCursor tsp : storePrefetches) {
tsp.gc();
}
cacheCurrentLowestPriority = UNKNOWN;
}
@Override

View File

@ -38,6 +38,7 @@ class TopicStorePrefetch extends AbstractStoreCursor {
private final String clientId;
private final String subscriberName;
private final Subscription subscription;
private int currentLowestPriority;
/**
* @param topic
@ -52,6 +53,15 @@ class TopicStorePrefetch extends AbstractStoreCursor {
this.subscriberName = subscriberName;
this.maxProducersToAudit=32;
this.maxAuditDepth=10000;
resetCurrentLowestPriority();
}
private void resetCurrentLowestPriority() {
currentLowestPriority = 9;
}
public synchronized int getCurrentLowestPriority() {
return currentLowestPriority;
}
public boolean recoverMessageReference(MessageId messageReference) throws Exception {
@ -62,13 +72,19 @@ class TopicStorePrefetch extends AbstractStoreCursor {
@Override
public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception {
if (LOG.isTraceEnabled()) {
LOG.trace("recover: " + message.getMessageId() + ", priority: " + message.getPriority());
}
boolean recovered = false;
MessageEvaluationContext messageEvaluationContext = new NonCachedMessageEvaluationContext();
messageEvaluationContext.setMessageReference(message);
if (this.subscription.matches(message, messageEvaluationContext)) {
return super.recoverMessage(message, cached);
recovered = super.recoverMessage(message, cached);
if (recovered) {
currentLowestPriority = Math.min(currentLowestPriority, message.getPriority());
}
}
return false;
return recovered;
}
@Override
@ -84,7 +100,11 @@ class TopicStorePrefetch extends AbstractStoreCursor {
@Override
protected synchronized boolean isStoreEmpty() {
try {
return this.store.isEmpty();
boolean empty = this.store.isEmpty();
if (empty) {
resetCurrentLowestPriority();
}
return empty;
} catch (Exception e) {
LOG.error("Failed to get message count", e);
@ -97,6 +117,12 @@ class TopicStorePrefetch extends AbstractStoreCursor {
protected void resetBatch() {
this.store.resetBatching(clientId, subscriberName);
}
@Override
public synchronized void gc() {
super.gc();
resetCurrentLowestPriority();
}
@Override
protected void doFillBatch() throws Exception {
@ -104,10 +130,6 @@ class TopicStorePrefetch extends AbstractStoreCursor {
maxBatchSize, this);
}
public int getLastDispatchPriority() {
return last != null? last.getMessage().getPriority() : 9;
}
@Override
public String toString() {
return "TopicStorePrefetch" + System.identityHashCode(this) + "(" + clientId + "," + subscriberName + ")";

View File

@ -55,9 +55,9 @@ public class PolicyEntry extends DestinationMapEntry {
private PendingQueueMessageStoragePolicy pendingQueuePolicy;
private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy;
private PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy;
private int maxProducersToAudit=32;
private int maxAuditDepth=2048;
private int maxQueueAuditDepth=2048;
private int maxProducersToAudit=BaseDestination.MAX_PRODUCERS_TO_AUDIT;
private int maxAuditDepth=BaseDestination.MAX_AUDIT_DEPTH;
private int maxQueueAuditDepth=BaseDestination.MAX_AUDIT_DEPTH;
private boolean enableAudit=true;
private boolean producerFlowControl = true;
private long blockedProducerWarningInterval = Destination.DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL;
@ -217,7 +217,12 @@ public class PolicyEntry extends DestinationMapEntry {
cursor.setSystemUsage(memoryManager);
sub.setPending(cursor);
}
sub.setMaxAuditDepth(getMaxAuditDepth());
int auditDepth = getMaxAuditDepth();
if (auditDepth == BaseDestination.MAX_AUDIT_DEPTH && this.isPrioritizedMessages()) {
sub.setMaxAuditDepth(auditDepth * 10);
} else {
sub.setMaxAuditDepth(auditDepth);
}
sub.setMaxProducersToAudit(getMaxProducersToAudit());
sub.setUsePrefetchExtension(isUsePrefetchExtension());
}

View File

@ -144,6 +144,9 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
}
};
if (LOG.isTraceEnabled()) {
LOG.trace(key + " existing last recovered: " + lastRecovered);
}
if (isPrioritizedMessages()) {
adapter.doRecoverNextMessagesWithPriority(c, destination, clientId, subscriptionName,
lastRecovered.sequence, lastRecovered.priority, maxReturned, jdbcListener);
@ -223,7 +226,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
int result = 0;
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
result = adapter.doGetDurableSubscriberMessageCount(c, destination, clientId, subscriberName, isPrioritizedMessages());
result = adapter.doGetDurableSubscriberMessageCount(c, destination, clientId, subscriberName, isPrioritizedMessages());
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to get Message Count: " + clientId + ". Reason: " + e, e);

View File

@ -98,7 +98,7 @@ public class NegativeQueueTest extends AutoFailTestSupport {
protected static int PREFETCH_SIZE = 1000;
protected BrokerService broker;
protected String bindAddress = "tcp://localhost:60706";
protected String bindAddress = "tcp://localhost:0";
public void testWithDefaultPrefetch() throws Exception{
PREFETCH_SIZE = 1000;
@ -311,6 +311,7 @@ public class NegativeQueueTest extends AutoFailTestSupport {
configureBroker(answer);
answer.start();
answer.waitUntilStarted();
bindAddress = answer.getTransportConnectors().get(0).getConnectUri().toString();
return answer;
}
@ -329,7 +330,7 @@ public class NegativeQueueTest extends AutoFailTestSupport {
pMap.setDefaultEntry(policy);
answer.setDestinationPolicy(pMap);
answer.setDeleteAllMessagesOnStartup(true);
answer.addConnector(bindAddress);
answer.addConnector("tcp://localhost:0");
MemoryUsage memoryUsage = new MemoryUsage();
memoryUsage.setLimit(MEMORY_USAGE);

View File

@ -54,6 +54,7 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
public boolean useCache = true;
public boolean dispatchAsync = true;
public boolean prioritizeMessages = true;
public boolean immediatePriorityDispatch = true;
public int prefetchVal = 500;
public int MSG_NUM = 600;
@ -73,7 +74,7 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
policy.setUseCache(useCache);
StorePendingDurableSubscriberMessageStoragePolicy durableSubPending =
new StorePendingDurableSubscriberMessageStoragePolicy();
durableSubPending.setImmediatePriorityDispatch(true);
durableSubPending.setImmediatePriorityDispatch(immediatePriorityDispatch);
policy.setPendingDurableSubscriberPolicy(durableSubPending);
PolicyMap policyMap = new PolicyMap();
policyMap.put(new ActiveMQQueue("TEST"), policy);

View File

@ -22,13 +22,18 @@ import java.util.HashMap;
import java.util.Vector;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TopicSubscriber;
import junit.framework.Test;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.MessagePriorityTest;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.util.Wait;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.derby.jdbc.EmbeddedDataSource;
@ -36,7 +41,7 @@ import org.apache.derby.jdbc.EmbeddedDataSource;
public class JDBCMessagePriorityTest extends MessagePriorityTest {
private static final Log LOG = LogFactory.getLog(JDBCMessagePriorityTest.class);
@Override
protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception {
JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
@ -81,9 +86,9 @@ public class JDBCMessagePriorityTest extends MessagePriorityTest {
sub = sess.createDurableSubscriber(topic, subName);
for (int i = 0; i < MSG_NUM * 4; i++) {
Message msg = sub.receive(10000);
LOG.debug("received i=" + i + ", m=" + (msg!=null?
LOG.debug("received i=" + i + ", m=" + (msg != null ?
msg.getJMSMessageID() + ", priority: " + msg.getJMSPriority()
: null) );
: null));
assertNotNull("Message " + i + " was null", msg);
assertEquals("Message " + i + " has wrong priority", priorities[i / MSG_NUM], msg.getJMSPriority());
if (i > 0 && i % closeFrequency == 0) {
@ -97,7 +102,7 @@ public class JDBCMessagePriorityTest extends MessagePriorityTest {
}
public void initCombosForTestConcurrentDurableSubsReconnectWithXLevels() {
addCombinationValues("prioritizeMessages", new Object[] {Boolean.TRUE, Boolean.FALSE});
addCombinationValues("prioritizeMessages", new Object[]{Boolean.TRUE, Boolean.FALSE});
}
public void testConcurrentDurableSubsReconnectWithXLevels() throws Exception {
@ -115,7 +120,7 @@ public class JDBCMessagePriorityTest extends MessagePriorityTest {
final AtomicInteger[] messageCounts = new AtomicInteger[maxPriority];
Vector<ProducerThread> producers = new Vector<ProducerThread>();
for (int priority=0; priority <maxPriority; priority++) {
for (int priority = 0; priority < maxPriority; priority++) {
producers.add(new ProducerThread(topic, MSG_NUM, priority));
messageCounts[priority] = new AtomicInteger(0);
}
@ -124,15 +129,15 @@ public class JDBCMessagePriorityTest extends MessagePriorityTest {
producer.start();
}
final int closeFrequency = MSG_NUM/2;
final int closeFrequency = MSG_NUM / 2;
HashMap dups = new HashMap();
sub = consumerSession.createDurableSubscriber(topic, subName);
for (int i=0; i < MSG_NUM * maxPriority; i++) {
for (int i = 0; i < MSG_NUM * maxPriority; i++) {
Message msg = sub.receive(10000);
LOG.debug("received i=" + i + ", m=" + (msg!=null?
LOG.debug("received i=" + i + ", m=" + (msg != null ?
msg.getJMSMessageID() + ", priority: " + msg.getJMSPriority()
: null) );
assertNull("no duplicate message failed on : " + msg.getJMSMessageID(), dups.put(msg.getJMSMessageID(), subName));
: null));
assertNull("no duplicate message failed on : " + msg.getJMSMessageID(), dups.put(msg.getJMSMessageID(), subName));
assertNotNull("Message " + i + " was null", msg);
messageCounts[msg.getJMSPriority()].incrementAndGet();
if (i > 0 && i % closeFrequency == 0) {
@ -151,6 +156,105 @@ public class JDBCMessagePriorityTest extends MessagePriorityTest {
}
}
public void initCombosForTestConcurrentRate() {
addCombinationValues("prefetchVal", new Object[]{new Integer(1), new Integer(500)});
}
public void testConcurrentRate() throws Exception {
ActiveMQTopic topic = (ActiveMQTopic) sess.createTopic("TEST");
final String subName = "priorityConcurrent";
Connection consumerConn = factory.createConnection();
consumerConn.setClientID("subName");
consumerConn.start();
Session consumerSession = consumerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSubscriber sub = consumerSession.createDurableSubscriber(topic, subName);
sub.close();
final int TO_SEND = 2000;
final Vector<Message> duplicates = new Vector<Message>();
final int[] dups = new int[TO_SEND * 4];
long start;
double max = 0, sum = 0;
MessageProducer messageProducer = sess.createProducer(topic);
TextMessage message = sess.createTextMessage();
for (int i = 0; i < TO_SEND; i++) {
int priority = i % 10;
message.setText(i + "-" + priority);
message.setIntProperty("seq", i);
message.setJMSPriority(priority);
if (i > 0 && i % 1000 == 0) {
LOG.info("Max send time: " + max + ". Sending message: " + message.getText());
}
start = System.currentTimeMillis();
messageProducer.send(message, DeliveryMode.PERSISTENT, message.getJMSPriority(), 0);
long duration = System.currentTimeMillis() - start;
max = Math.max(max, duration);
if (duration == max) {
LOG.info("new max: " + max + " on i=" + i + ", " + message.getText());
}
sum += duration;
}
LOG.info("Sent: " + TO_SEND + ", max send time: " + max);
double noConsumerAve = (sum * 100 / TO_SEND);
sub = consumerSession.createDurableSubscriber(topic, subName);
final AtomicInteger count = new AtomicInteger();
sub.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
try {
count.incrementAndGet();
if (count.get() % 100 == 0) {
LOG.info("onMessage: count: " + count.get() + ", " + ((TextMessage) message).getText() + ", seqNo " + message.getIntProperty("seq") + ", " + message.getJMSMessageID());
}
int seqNo = message.getIntProperty("seq");
if (dups[seqNo] == 0) {
dups[seqNo] = 1;
} else {
LOG.error("Duplicate: " + ((TextMessage) message).getText() + ", " + message.getJMSMessageID());
duplicates.add(message);
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
LOG.info("Activated consumer");
sum = max = 0;
for (int i = TO_SEND; i < (TO_SEND * 2); i++) {
int priority = i % 10;
message.setText(i + "-" + priority);
message.setIntProperty("seq", i);
message.setJMSPriority(priority);
if (i > 0 && i % 1000 == 0) {
LOG.info("Max send time: " + max + ". Sending message: " + message.getText());
}
start = System.currentTimeMillis();
messageProducer.send(message, DeliveryMode.PERSISTENT, message.getJMSPriority(), 0);
long duration = System.currentTimeMillis() - start;
max = Math.max(max, duration);
if (duration == max) {
LOG.info("new max: " + max + " on i=" + i + ", " + message.getText());
}
sum += duration;
}
LOG.info("Sent another: " + TO_SEND + ", max send time: " + max);
double withConsumerAve = (sum * 100 / TO_SEND);
assertTrue("max three times as slow with consumer:" + withConsumerAve + " , noConsumerMax:" + noConsumerAve,
withConsumerAve < noConsumerAve * 3);
Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
LOG.info("count: " + count.get());
return TO_SEND * 2 == count.get();
}
}, 60 * 1000);
assertTrue("No duplicates : " + duplicates, duplicates.isEmpty());
assertEquals("got all messages", TO_SEND * 2, count.get());
}
public static Test suite() {
return suite(JDBCMessagePriorityTest.class);
}

View File

@ -50,7 +50,8 @@ public class JdbcDurableSubDupTest {
private static final Log LOG = LogFactory.getLog(JdbcDurableSubDupTest.class);
final int prefetchVal = 150;
String url = "tcp://localhost:61616?jms.watchTopicAdvisories=false";
String urlOptions = "jms.watchTopicAdvisories=false";
String url = null;
String queueName = "topicTest?consumer.prefetchSize=" + prefetchVal;
String xmlMessage = "<Example 01234567890123456789012345678901234567890123456789 MessageText>";
@ -83,10 +84,11 @@ public class JdbcDurableSubDupTest {
policyMap.setDefaultEntry(policyEntry);
broker.setDestinationPolicy(policyMap);
broker.addConnector("tcp://localhost:61616");
broker.addConnector("tcp://localhost:0");
broker.setDeleteAllMessagesOnStartup(true);
broker.start();
broker.waitUntilStarted();
url = broker.getTransportConnectors().get(0).getConnectUri().toString() + "?" + urlOptions;
}
@After