https://issues.apache.org/jira/browse/AMQ-5853 - track per priority sequence on load from the store. Allow db to select from entire prority 0-9 range. fix and additonal test

This commit is contained in:
gtully 2015-07-06 15:32:23 +01:00
parent b78ef954d2
commit a2697b844e
5 changed files with 95 additions and 33 deletions

View File

@ -87,7 +87,7 @@ public interface JDBCAdapter {
int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException;
void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long maxSeq, long nextSeq, long priority, int maxReturned, boolean isPrioritizeMessages, JDBCMessageRecoveryListener listener) throws Exception;
void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long[] lastRecoveredEntries, long maxSeq, int maxReturned, boolean isPrioritizeMessages, JDBCMessageRecoveryListener listener) throws Exception;
long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriberName) throws SQLException, IOException;

View File

@ -18,8 +18,8 @@ package org.apache.activemq.store.jdbc;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.ConnectionContext;
@ -66,11 +66,10 @@ public class JDBCMessageStore extends AbstractMessageStore {
protected final WireFormat wireFormat;
protected final JDBCAdapter adapter;
protected final JDBCPersistenceAdapter persistenceAdapter;
protected AtomicLong lastRecoveredSequenceId = new AtomicLong(-1);
protected AtomicLong lastRecoveredPriority = new AtomicLong(Byte.MAX_VALUE -1);
protected ActiveMQMessageAudit audit;
protected final LinkedList<Long> pendingAdditions = new LinkedList<Long>();
final long[] perPriorityLastRecovered = new long[10];
public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) throws IOException {
super(destination);
this.persistenceAdapter = persistenceAdapter;
@ -81,6 +80,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
if (destination.isQueue() && persistenceAdapter.getBrokerService().shouldRecordVirtualDestination(destination)) {
recordDestinationCreation(destination);
}
resetBatching();
}
private void recordDestinationCreation(ActiveMQDestination destination) throws IOException {
@ -165,9 +165,6 @@ public class JDBCMessageStore extends AbstractMessageStore {
if (xaXid == null) {
onAdd(message, sequenceId, message.getPriority());
}
if (this.isPrioritizedMessages() && message.getPriority() > lastRecoveredPriority.get()) {
resetTrackedLastRecoveredPriority();
}
}
// jdbc commit order is random with concurrent connections - limit scan to lowest pending
@ -334,9 +331,9 @@ public class JDBCMessageStore extends AbstractMessageStore {
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
if (LOG.isTraceEnabled()) {
LOG.trace(this + " recoverNext lastRecovered:" + lastRecoveredSequenceId.get() + ", minPending:" + minPendingSequeunceId());
LOG.trace(this + " recoverNext lastRecovered:" + Arrays.toString(perPriorityLastRecovered) + ", minPending:" + minPendingSequeunceId());
}
adapter.doRecoverNextMessages(c, destination, minPendingSequeunceId(), lastRecoveredSequenceId.get(), lastRecoveredPriority.get(),
adapter.doRecoverNextMessages(c, destination, perPriorityLastRecovered, minPendingSequeunceId(),
maxReturned, isPrioritizedMessages(), new JDBCMessageRecoveryListener() {
public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
@ -344,8 +341,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
msg.getMessageId().setBrokerSequenceId(sequenceId);
msg.getMessageId().setFutureOrSequenceLong(sequenceId);
listener.recoverMessage(msg);
lastRecoveredSequenceId.set(sequenceId);
lastRecoveredPriority.set(msg.getPriority());
trackLastRecovered(sequenceId, msg.getPriority());
return true;
}
@ -366,35 +362,33 @@ public class JDBCMessageStore extends AbstractMessageStore {
}
private void trackLastRecovered(long sequenceId, int priority) {
perPriorityLastRecovered[isPrioritizedMessages() ? priority : 0] = sequenceId;
}
/**
* @see org.apache.activemq.store.MessageStore#resetBatching()
*/
public void resetBatching() {
if (LOG.isTraceEnabled()) {
LOG.trace(this + " resetBatching, existing last recovered seqId: " + lastRecoveredSequenceId.get());
LOG.trace(this + " resetBatching. last recovered: " + Arrays.toString(perPriorityLastRecovered));
}
for (int i=0;i<perPriorityLastRecovered.length;i++) {
perPriorityLastRecovered[i] = -1;
}
lastRecoveredSequenceId.set(-1);
resetTrackedLastRecoveredPriority();
}
private final void resetTrackedLastRecoveredPriority() {
lastRecoveredPriority.set(Byte.MAX_VALUE - 1);
}
@Override
public void setBatch(MessageId messageId) {
try {
long[] storedValues = persistenceAdapter.getStoreSequenceIdForMessageId(null, messageId, destination);
lastRecoveredSequenceId.set(storedValues[0]);
lastRecoveredPriority.set(storedValues[1]);
trackLastRecovered(storedValues[0], (int)storedValues[1]);
} catch (IOException ignoredAsAlreadyLogged) {
lastRecoveredSequenceId.set(-1);
lastRecoveredPriority.set(Byte.MAX_VALUE -1);
resetBatching();
}
if (LOG.isTraceEnabled()) {
LOG.trace(this + " setBatch: new sequenceId: " + lastRecoveredSequenceId.get()
+ ", priority: " + lastRecoveredPriority.get());
LOG.trace(this + " setBatch: new last recovered: " + Arrays.toString(perPriorityLastRecovered));
}
}

View File

@ -491,7 +491,7 @@ public class Statements {
public String getFindNextMessagesStatement() {
if (findNextMessagesStatement == null) {
findNextMessagesStatement = "SELECT ID, MSG FROM " + getFullMessageTableName()
+ " WHERE CONTAINER=? AND ID > ? AND ID < ? AND XID IS NULL ORDER BY ID";
+ " WHERE CONTAINER=? AND ID < ? AND ID > ? AND XID IS NULL ORDER BY ID";
}
return findNextMessagesStatement;
}
@ -504,7 +504,17 @@ public class Statements {
findNextMessagesByPriorityStatement = "SELECT ID, MSG FROM " + getFullMessageTableName()
+ " WHERE CONTAINER=?"
+ " AND XID IS NULL"
+ " AND ((ID > ? AND ID < ? AND PRIORITY = ?) OR PRIORITY < ?)"
+ " AND ID < ? "
+ " AND ( (ID > ? AND PRIORITY = 9) "
+ " OR (ID > ? AND PRIORITY = 8) "
+ " OR (ID > ? AND PRIORITY = 7) "
+ " OR (ID > ? AND PRIORITY = 6) "
+ " OR (ID > ? AND PRIORITY = 5) "
+ " OR (ID > ? AND PRIORITY = 4) "
+ " OR (ID > ? AND PRIORITY = 3) "
+ " OR (ID > ? AND PRIORITY = 2) "
+ " OR (ID > ? AND PRIORITY = 1) "
+ " OR (ID > ? AND PRIORITY = 0) )"
+ " ORDER BY PRIORITY DESC, ID";
}
return findNextMessagesByPriorityStatement;

View File

@ -37,6 +37,7 @@ import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.store.jdbc.JDBCAdapter;
import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener;
import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener;
import org.apache.activemq.store.jdbc.JDBCMessageStore;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.jdbc.JdbcMemoryTransactionStore;
import org.apache.activemq.store.jdbc.Statements;
@ -1086,8 +1087,8 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
return result;
}
public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long maxSeq, long lastRecoveredSeq,
long priority, int maxReturned, boolean isPrioritizedMessages, JDBCMessageRecoveryListener listener) throws Exception {
public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long[] lastRecoveredEntries,
long maxSeq, int maxReturned, boolean isPrioritizedMessages, JDBCMessageRecoveryListener listener) throws Exception {
PreparedStatement s = null;
ResultSet rs = null;
cleanupExclusiveLock.readLock().lock();
@ -1099,11 +1100,14 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
s.setMaxRows(Math.min(maxReturned, maxRows));
s.setString(1, destination.getQualifiedName());
s.setLong(2, lastRecoveredSeq);
s.setLong(3, maxSeq);
s.setLong(2, maxSeq);
int paramId = 3;
if (isPrioritizedMessages) {
s.setLong(4, priority);
s.setLong(5, priority);
for (int i=9;i>=0;i--) {
s.setLong(paramId++, lastRecoveredEntries[i]);
}
} else {
s.setLong(paramId, lastRecoveredEntries[0]);
}
rs = s.executeQuery();
int count = 0;

View File

@ -17,10 +17,12 @@
package org.apache.activemq.store;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
@ -664,4 +666,56 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
}
queueConsumer.close();
}
public void initCombosForTestEveryXHi() {
// the cache limits the priority ordering to available memory
addCombinationValues("useCache", new Object[] {new Boolean(false)});
// expiry processing can fill the cursor with a snapshot of the producer
// priority, before producers are complete
addCombinationValues("expireMessagePeriod", new Object[] {new Integer(0)});
}
public void testEveryXHi() throws Exception {
final int numMessages = 50;
ActiveMQQueue queue = (ActiveMQQueue)sess.createQueue("TEST_LOW_THEN_HIGH_10");
final AtomicInteger received = new AtomicInteger(0);
MessageConsumer queueConsumer = sess.createConsumer(queue);
queueConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
received.incrementAndGet();
}
});
MessageProducer producer = sess.createProducer(queue);
for (int i = 0; i < numMessages; i++) {
Message message = sess.createMessage();
if (i % 5 == 0) {
message.setJMSPriority(9);
} else {
message.setJMSPriority(4);
}
producer.send(message, Message.DEFAULT_DELIVERY_MODE, message.getJMSPriority(), Message.DEFAULT_TIME_TO_LIVE);
}
assertTrue("Got all", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return numMessages == received.get();
}
}));
final DestinationStatistics destinationStatistics = ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics();
assertTrue("Nothing else Like dlq involved", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.info("Enqueues: " + destinationStatistics.getEnqueues().getCount() + ", Dequeues: " + destinationStatistics.getDequeues().getCount());
return destinationStatistics.getEnqueues().getCount() == numMessages && destinationStatistics.getDequeues().getCount() == numMessages;
}
}, 10000));
queueConsumer.close();
}
}