From 12f019591369bb6f8f6c8eadf82339d3391c5af0 Mon Sep 17 00:00:00 2001 From: Bosanac Dejan Date: Wed, 21 Jul 2010 16:06:39 +0000 Subject: [PATCH] https://issues.apache.org/activemq/browse/AMQ-2843 - first stab at adding priority for queues in JDBC store git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@966291 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/store/jdbc/JDBCAdapter.java | 4 +++- .../activemq/store/jdbc/JDBCMessageStore.java | 8 +++++-- .../activemq/store/jdbc/Statements.java | 17 +++++++++++-- .../jdbc/adapter/DefaultJDBCAdapter.java | 24 +++++++++++++++---- .../activemq/store/MessagePriorityTest.java | 2 +- 5 files changed, 45 insertions(+), 10 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java index cefebb0885..559db70500 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java @@ -30,12 +30,14 @@ import org.apache.activemq.command.SubscriptionInfo; public interface JDBCAdapter { void setStatements(Statements statementProvider); + + void setPrioritizedMessages(boolean prioritizedMessages); void doCreateTables(TransactionContext c) throws SQLException, IOException; void doDropTables(TransactionContext c) throws SQLException, IOException; - void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data, long expiration) throws SQLException, IOException; + void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data, long expiration, byte priority) throws SQLException, IOException; void doAddMessageReference(TransactionContext c, long sequence, MessageId messageId, ActiveMQDestination destination, long expirationTime, String messageRef) throws SQLException, IOException; diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java index e17d70d65b..413af7eba2 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java @@ -82,7 +82,7 @@ public class JDBCMessageStore extends AbstractMessageStore { // Get a connection and insert the message into the DB. TransactionContext c = persistenceAdapter.getTransactionContext(context); try { - adapter.doAddMessage(c,sequenceId, messageId, destination, data, message.getExpiration()); + adapter.doAddMessage(c,sequenceId, messageId, destination, data, message.getExpiration(), message.getPriority()); } catch (SQLException e) { JDBCPersistenceAdapter.log("JDBC Failure: ", e); throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); @@ -224,7 +224,6 @@ public class JDBCMessageStore extends AbstractMessageStore { */ public void recoverNextMessages(int maxReturned, final MessageRecoveryListener listener) throws Exception { TransactionContext c = persistenceAdapter.getTransactionContext(); - try { adapter.doRecoverNextMessages(c, destination, lastStoreSequenceId.get(), maxReturned, new JDBCMessageRecoveryListener() { @@ -294,4 +293,9 @@ public class JDBCMessageStore extends AbstractMessageStore { } return result; } + + public void setPrioritizedMessages(boolean prioritizedMessages) { + super.setPrioritizedMessages(prioritizedMessages); + adapter.setPrioritizedMessages(prioritizedMessages); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java index c500c44c48..7a55f64264 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java @@ -65,6 +65,7 @@ public class Statements { private String lastAckedDurableSubscriberMessageStatement; private String destinationMessageCountStatement; private String findNextMessagesStatement; + private String findNextMessagesByPriorityStatement; private boolean useLockCreateWhereClause; private String findAllMessageIdsStatement; private String lastProducerSequenceIdStatement; @@ -74,12 +75,13 @@ public class Statements { createSchemaStatements = new String[] { "CREATE TABLE " + getFullMessageTableName() + "(" + "ID " + sequenceDataType + " NOT NULL" + ", CONTAINER " + containerNameDataType + ", MSGID_PROD " + msgIdDataType + ", MSGID_SEQ " - + sequenceDataType + ", EXPIRATION " + longDataType + ", MSG " + + sequenceDataType + ", EXPIRATION " + longDataType + ", PRIORITY " + sequenceDataType + ", MSG " + (useExternalMessageReferences ? stringIdDataType : binaryDataType) + ", PRIMARY KEY ( ID ) )", "CREATE INDEX " + getFullMessageTableName() + "_MIDX ON " + getFullMessageTableName() + " (MSGID_PROD,MSGID_SEQ)", "CREATE INDEX " + getFullMessageTableName() + "_CIDX ON " + getFullMessageTableName() + " (CONTAINER)", "CREATE INDEX " + getFullMessageTableName() + "_EIDX ON " + getFullMessageTableName() + " (EXPIRATION)", + "CREATE INDEX " + getFullMessageTableName() + "_PIDX ON " + getFullMessageTableName() + " (PRIORITY)", "CREATE TABLE " + getFullAckTableName() + "(" + "CONTAINER " + containerNameDataType + " NOT NULL" + ", SUB_DEST " + stringIdDataType + ", CLIENT_ID " + stringIdDataType + " NOT NULL" + ", SUB_NAME " + stringIdDataType @@ -107,7 +109,7 @@ public class Statements { if (addMessageStatement == null) { addMessageStatement = "INSERT INTO " + getFullMessageTableName() - + "(ID, MSGID_PROD, MSGID_SEQ, CONTAINER, EXPIRATION, MSG) VALUES (?, ?, ?, ?, ?, ?)"; + + "(ID, MSGID_PROD, MSGID_SEQ, CONTAINER, EXPIRATION, PRIORITY, MSG) VALUES (?, ?, ?, ?, ?, ?, ?)"; } return addMessageStatement; } @@ -368,6 +370,17 @@ public class Statements { return findNextMessagesStatement; } + /** + * @return the findNextMessagesStatement + */ + public String getFindNextMessagesByPriorityStatement() { + if (findNextMessagesByPriorityStatement == null) { + findNextMessagesByPriorityStatement = "SELECT ID, MSG FROM " + getFullMessageTableName() + + " WHERE CONTAINER=? ORDER BY PRIORITY DESC, ID"; + } + return findNextMessagesByPriorityStatement; + } + /** * @return the lastAckedDurableSubscriberMessageStatement */ diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java index 51dd3086df..d9fb805355 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java @@ -56,6 +56,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { private static final Log LOG = LogFactory.getLog(DefaultJDBCAdapter.class); protected Statements statements; protected boolean batchStatments = true; + protected boolean prioritizedMessages; protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException { s.setBytes(index, data); @@ -190,7 +191,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data, - long expiration) throws SQLException, IOException { + long expiration, byte priority) throws SQLException, IOException { PreparedStatement s = c.getAddMessageStatement(); try { if (s == null) { @@ -204,7 +205,8 @@ public class DefaultJDBCAdapter implements JDBCAdapter { s.setLong(3, messageID.getProducerSequenceId()); s.setString(4, destination.getQualifiedName()); s.setLong(5, expiration); - setBinaryData(s, 6, data); + s.setLong(6, priority); + setBinaryData(s, 7, data); if (this.batchStatments) { s.addBatch(); } else if (s.executeUpdate() != 1) { @@ -710,6 +712,14 @@ public class DefaultJDBCAdapter implements JDBCAdapter { public void setStatements(Statements statements) { this.statements = statements; + } + + public boolean isPrioritizedMessages() { + return prioritizedMessages; + } + + public void setPrioritizedMessages(boolean prioritizedMessages) { + this.prioritizedMessages = prioritizedMessages; } /** @@ -765,10 +775,16 @@ public class DefaultJDBCAdapter implements JDBCAdapter { PreparedStatement s = null; ResultSet rs = null; try { - s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement()); + if (isPrioritizedMessages()) { + s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesByPriorityStatement()); + } else { + s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement()); + } s.setMaxRows(maxReturned * 2); s.setString(1, destination.getQualifiedName()); - s.setLong(2, nextSeq); + if (!isPrioritizedMessages()) { + s.setLong(2, nextSeq); + } rs = s.executeQuery(); int count = 0; if (this.statements.isUseExternalMessageReferences()) { diff --git a/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java b/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java index c5fedd23e0..cd439575c9 100644 --- a/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java @@ -148,7 +148,7 @@ abstract public class MessagePriorityTest extends CombinationTestSupport { MessageConsumer queueConsumer = sess.createConsumer(queue); for (int i = 0; i < MSG_NUM * 2; i++) { Message msg = queueConsumer.receive(1000); - assertNotNull(msg); + assertNotNull("Message " + i + " was null", msg); assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ? HIGH_PRI : LOW_PRI, msg.getJMSPriority()); } }