https://issues.apache.org/jira/browse/AMQ-5853 - fix and test, statement was not configurable from xml also so there is no workaround.

This commit is contained in:
gtully 2015-06-19 16:08:54 +01:00
parent 1a3ade0414
commit f2a335c27d
3 changed files with 41 additions and 14 deletions

View File

@ -228,6 +228,10 @@ public class Statements {
return findAllMessageIdsStatement;
}
public void setFindAllMessageIdsStatement(String val) {
findAllMessageIdsStatement = val;
}
public String getFindLastSequenceIdInMsgsStatement() {
if (findLastSequenceIdInMsgsStatement == null) {
findLastSequenceIdInMsgsStatement = "SELECT MAX(ID) FROM " + getFullMessageTableName();
@ -331,17 +335,6 @@ public class Statements {
return findDurableSubMessagesByPriorityStatement;
}
public String findAllDurableSubMessagesStatement() {
if (findAllDurableSubMessagesStatement == null) {
findAllDurableSubMessagesStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName()
+ " M, " + getFullAckTableName() + " D "
+ " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+ " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID"
+ " ORDER BY M.ID";
}
return findAllDurableSubMessagesStatement;
}
public String getNextDurableSubscriberMessageStatement() {
if (nextDurableSubscriberMessageStatement == null) {
nextDurableSubscriberMessageStatement = "SELECT M.ID, M.MSG FROM "
@ -511,12 +504,16 @@ public class Statements {
findNextMessagesByPriorityStatement = "SELECT ID, MSG FROM " + getFullMessageTableName()
+ " WHERE CONTAINER=?"
+ " AND XID IS NULL"
+ " AND ((ID > ? AND ID < ? AND PRIORITY = ?) OR PRIORITY < ?)"
+ " AND ((ID > ? AND ID < ? AND PRIORITY >= ?) OR PRIORITY < ?)"
+ " ORDER BY PRIORITY DESC, ID";
}
return findNextMessagesByPriorityStatement;
}
public void setFindNextMessagesByPriorityStatement(String val) {
findNextMessagesByPriorityStatement = val;
}
/**
* @return the lastAckedDurableSubscriberMessageStatement
*/

View File

@ -94,6 +94,11 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
ignoreExpired.setDeadLetterStrategy(ignoreExpiredStrategy);
policyMap.put(new ActiveMQTopic("TEST_CLEANUP_NO_PRIORITY"), ignoreExpired);
PolicyEntry noCachePolicy = new PolicyEntry();
noCachePolicy.setUseCache(false);
noCachePolicy.setPrioritizedMessages(true);
policyMap.put(new ActiveMQQueue("TEST_LOW_THEN_HIGH_10"), noCachePolicy);
broker.setDestinationPolicy(policyMap);
broker.start();
broker.waitUntilStarted();
@ -584,4 +589,30 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
assertEquals("Message " + i + " has wrong priority", i < 10 ? HIGH_PRI : LOW_PRI, msg.getJMSPriority());
}
}
public void testLowThenHighBatch() throws Exception {
ActiveMQQueue queue = (ActiveMQQueue)sess.createQueue("TEST_LOW_THEN_HIGH_10");
ProducerThread producerThread = new ProducerThread(queue, 10, LOW_PRI);
producerThread.run();
MessageConsumer queueConsumer = sess.createConsumer(queue);
for (int i = 0; i < 10; i++) {
Message message = queueConsumer.receive(10000);
assertNotNull("expect #" + i, message);
assertEquals("correct priority", LOW_PRI, message.getJMSPriority());
}
queueConsumer.close();
producerThread.priority = HIGH_PRI;
producerThread.run();
queueConsumer = sess.createConsumer(queue);
for (int i = 0; i < 10; i++) {
Message message = queueConsumer.receive(10000);
assertNotNull("expect #" + i, message);
assertEquals("correct priority", HIGH_PRI, message.getJMSPriority());
}
queueConsumer.close();
}
}

View File

@ -42,7 +42,6 @@ import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.MessagePriorityTest;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.util.Wait;
import org.apache.derby.jdbc.EmbeddedDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;