https://issues.apache.org/jira/browse/AMQ-5853 - rework fix, have store reset tracked recovered priority when higer priority messages are stored. Additional perf fix that removes unnecessary 2x multiplier on db fetch size; seems periodic message expiry was throwing some tests when the cache was enabled

This commit is contained in:
gtully 2015-07-03 10:30:03 +01:00
parent fef8cac05f
commit eece28ac75
4 changed files with 60 additions and 7 deletions

View File

@ -18,10 +18,7 @@ package org.apache.activemq.store.jdbc;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.ActiveMQMessageAudit;
@ -168,6 +165,9 @@ public class JDBCMessageStore extends AbstractMessageStore {
if (xaXid == null) {
onAdd(message, sequenceId, message.getPriority());
}
if (this.isPrioritizedMessages() && message.getPriority() > lastRecoveredPriority.get()) {
resetTrackedLastRecoveredPriority();
}
}
// jdbc commit order is random with concurrent connections - limit scan to lowest pending
@ -374,10 +374,14 @@ public class JDBCMessageStore extends AbstractMessageStore {
LOG.trace(this + " resetBatching, existing last recovered seqId: " + lastRecoveredSequenceId.get());
}
lastRecoveredSequenceId.set(-1);
lastRecoveredPriority.set(Byte.MAX_VALUE - 1);
resetTrackedLastRecoveredPriority();
}
private final void resetTrackedLastRecoveredPriority() {
lastRecoveredPriority.set(Byte.MAX_VALUE - 1);
}
@Override
public void setBatch(MessageId messageId) {
try {

View File

@ -504,7 +504,7 @@ public class Statements {
findNextMessagesByPriorityStatement = "SELECT ID, MSG FROM " + getFullMessageTableName()
+ " WHERE CONTAINER=?"
+ " AND XID IS NULL"
+ " AND ((ID > ? AND ID < ? AND PRIORITY >= ?) OR PRIORITY < ?)"
+ " AND ((ID > ? AND ID < ? AND PRIORITY = ?) OR PRIORITY < ?)"
+ " ORDER BY PRIORITY DESC, ID";
}
return findNextMessagesByPriorityStatement;

View File

@ -1097,7 +1097,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
} else {
s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement());
}
s.setMaxRows(Math.min(maxReturned * 2, maxRows));
s.setMaxRows(Math.min(maxReturned, maxRows));
s.setString(1, destination.getQualifiedName());
s.setLong(2, lastRecoveredSeq);
s.setLong(3, maxSeq);

View File

@ -31,6 +31,8 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
@ -64,6 +66,7 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
public int MSG_NUM = 600;
public int HIGH_PRI = 7;
public int LOW_PRI = 3;
public int MED_PRI = 4;
abstract protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception;
@ -568,7 +571,7 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
}
public void testQueueBacklog() throws Exception {
final int backlog = 18000;
final int backlog = 1800;
ActiveMQQueue queue = (ActiveMQQueue)sess.createQueue("TEST");
ProducerThread lowPri = new ProducerThread(queue, backlog, LOW_PRI);
@ -588,6 +591,23 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
assertNotNull("Message " + i + " was null", msg);
assertEquals("Message " + i + " has wrong priority", i < 10 ? HIGH_PRI : LOW_PRI, msg.getJMSPriority());
}
final DestinationStatistics destinationStatistics = ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics();
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.info("Enqueues: " + destinationStatistics.getEnqueues().getCount() + ", Dequeues: " + destinationStatistics.getDequeues().getCount());
return destinationStatistics.getEnqueues().getCount() == backlog + 10 && destinationStatistics.getDequeues().getCount() == 500;
}
}, 10000));
}
public void initCombosForTestLowThenHighBatc() {
// the cache limits the priority ordering to available memory
addCombinationValues("useCache", new Object[] {new Boolean(false)});
// expiry processing can fill the cursor with a snapshot of the producer
// priority, before producers are complete
addCombinationValues("expireMessagePeriod", new Object[] {new Integer(0)});
}
public void testLowThenHighBatch() throws Exception {
@ -614,5 +634,34 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
assertEquals("correct priority", HIGH_PRI, message.getJMSPriority());
}
queueConsumer.close();
producerThread.priority = LOW_PRI;
producerThread.run();
producerThread.priority = MED_PRI;
producerThread.run();
queueConsumer = sess.createConsumer(queue);
for (int i = 0; i < 10; i++) {
Message message = queueConsumer.receive(10000);
assertNotNull("expect #" + i, message);
assertEquals("correct priority", MED_PRI, message.getJMSPriority());
}
for (int i = 0; i < 10; i++) {
Message message = queueConsumer.receive(10000);
assertNotNull("expect #" + i, message);
assertEquals("correct priority", LOW_PRI, message.getJMSPriority());
}
queueConsumer.close();
producerThread.priority = HIGH_PRI;
producerThread.run();
queueConsumer = sess.createConsumer(queue);
for (int i = 0; i < 10; i++) {
Message message = queueConsumer.receive(10000);
assertNotNull("expect #" + i, message);
assertEquals("correct priority", HIGH_PRI, message.getJMSPriority());
}
queueConsumer.close();
}
}