https://issues.apache.org/activemq/browse/AMQ-2843 - first stab at adding priority for queues in JDBC store

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@966291 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2010-07-21 16:06:39 +00:00
parent c4425caa2f
commit 12f0195913
5 changed files with 45 additions and 10 deletions

View File

@ -30,12 +30,14 @@ import org.apache.activemq.command.SubscriptionInfo;
public interface JDBCAdapter {
void setStatements(Statements statementProvider);
void setPrioritizedMessages(boolean prioritizedMessages);
void doCreateTables(TransactionContext c) throws SQLException, IOException;
void doDropTables(TransactionContext c) throws SQLException, IOException;
void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data, long expiration) throws SQLException, IOException;
void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data, long expiration, byte priority) throws SQLException, IOException;
void doAddMessageReference(TransactionContext c, long sequence, MessageId messageId, ActiveMQDestination destination, long expirationTime, String messageRef) throws SQLException, IOException;

View File

@ -82,7 +82,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
// Get a connection and insert the message into the DB.
TransactionContext c = persistenceAdapter.getTransactionContext(context);
try {
adapter.doAddMessage(c,sequenceId, messageId, destination, data, message.getExpiration());
adapter.doAddMessage(c,sequenceId, messageId, destination, data, message.getExpiration(), message.getPriority());
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
@ -224,7 +224,6 @@ public class JDBCMessageStore extends AbstractMessageStore {
*/
public void recoverNextMessages(int maxReturned, final MessageRecoveryListener listener) throws Exception {
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
adapter.doRecoverNextMessages(c, destination, lastStoreSequenceId.get(), maxReturned, new JDBCMessageRecoveryListener() {
@ -294,4 +293,9 @@ public class JDBCMessageStore extends AbstractMessageStore {
}
return result;
}
public void setPrioritizedMessages(boolean prioritizedMessages) {
super.setPrioritizedMessages(prioritizedMessages);
adapter.setPrioritizedMessages(prioritizedMessages);
}
}

View File

@ -65,6 +65,7 @@ public class Statements {
private String lastAckedDurableSubscriberMessageStatement;
private String destinationMessageCountStatement;
private String findNextMessagesStatement;
private String findNextMessagesByPriorityStatement;
private boolean useLockCreateWhereClause;
private String findAllMessageIdsStatement;
private String lastProducerSequenceIdStatement;
@ -74,12 +75,13 @@ public class Statements {
createSchemaStatements = new String[] {
"CREATE TABLE " + getFullMessageTableName() + "(" + "ID " + sequenceDataType + " NOT NULL"
+ ", CONTAINER " + containerNameDataType + ", MSGID_PROD " + msgIdDataType + ", MSGID_SEQ "
+ sequenceDataType + ", EXPIRATION " + longDataType + ", MSG "
+ sequenceDataType + ", EXPIRATION " + longDataType + ", PRIORITY " + sequenceDataType + ", MSG "
+ (useExternalMessageReferences ? stringIdDataType : binaryDataType)
+ ", PRIMARY KEY ( ID ) )",
"CREATE INDEX " + getFullMessageTableName() + "_MIDX ON " + getFullMessageTableName() + " (MSGID_PROD,MSGID_SEQ)",
"CREATE INDEX " + getFullMessageTableName() + "_CIDX ON " + getFullMessageTableName() + " (CONTAINER)",
"CREATE INDEX " + getFullMessageTableName() + "_EIDX ON " + getFullMessageTableName() + " (EXPIRATION)",
"CREATE INDEX " + getFullMessageTableName() + "_PIDX ON " + getFullMessageTableName() + " (PRIORITY)",
"CREATE TABLE " + getFullAckTableName() + "(" + "CONTAINER " + containerNameDataType + " NOT NULL"
+ ", SUB_DEST " + stringIdDataType
+ ", CLIENT_ID " + stringIdDataType + " NOT NULL" + ", SUB_NAME " + stringIdDataType
@ -107,7 +109,7 @@ public class Statements {
if (addMessageStatement == null) {
addMessageStatement = "INSERT INTO "
+ getFullMessageTableName()
+ "(ID, MSGID_PROD, MSGID_SEQ, CONTAINER, EXPIRATION, MSG) VALUES (?, ?, ?, ?, ?, ?)";
+ "(ID, MSGID_PROD, MSGID_SEQ, CONTAINER, EXPIRATION, PRIORITY, MSG) VALUES (?, ?, ?, ?, ?, ?, ?)";
}
return addMessageStatement;
}
@ -368,6 +370,17 @@ public class Statements {
return findNextMessagesStatement;
}
/**
* @return the findNextMessagesStatement
*/
public String getFindNextMessagesByPriorityStatement() {
if (findNextMessagesByPriorityStatement == null) {
findNextMessagesByPriorityStatement = "SELECT ID, MSG FROM " + getFullMessageTableName()
+ " WHERE CONTAINER=? ORDER BY PRIORITY DESC, ID";
}
return findNextMessagesByPriorityStatement;
}
/**
* @return the lastAckedDurableSubscriberMessageStatement
*/

View File

@ -56,6 +56,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
private static final Log LOG = LogFactory.getLog(DefaultJDBCAdapter.class);
protected Statements statements;
protected boolean batchStatments = true;
protected boolean prioritizedMessages;
protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
s.setBytes(index, data);
@ -190,7 +191,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data,
long expiration) throws SQLException, IOException {
long expiration, byte priority) throws SQLException, IOException {
PreparedStatement s = c.getAddMessageStatement();
try {
if (s == null) {
@ -204,7 +205,8 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
s.setLong(3, messageID.getProducerSequenceId());
s.setString(4, destination.getQualifiedName());
s.setLong(5, expiration);
setBinaryData(s, 6, data);
s.setLong(6, priority);
setBinaryData(s, 7, data);
if (this.batchStatments) {
s.addBatch();
} else if (s.executeUpdate() != 1) {
@ -710,6 +712,14 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
public void setStatements(Statements statements) {
this.statements = statements;
}
public boolean isPrioritizedMessages() {
return prioritizedMessages;
}
public void setPrioritizedMessages(boolean prioritizedMessages) {
this.prioritizedMessages = prioritizedMessages;
}
/**
@ -765,10 +775,16 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
PreparedStatement s = null;
ResultSet rs = null;
try {
s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement());
if (isPrioritizedMessages()) {
s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesByPriorityStatement());
} else {
s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement());
}
s.setMaxRows(maxReturned * 2);
s.setString(1, destination.getQualifiedName());
s.setLong(2, nextSeq);
if (!isPrioritizedMessages()) {
s.setLong(2, nextSeq);
}
rs = s.executeQuery();
int count = 0;
if (this.statements.isUseExternalMessageReferences()) {

View File

@ -148,7 +148,7 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
MessageConsumer queueConsumer = sess.createConsumer(queue);
for (int i = 0; i < MSG_NUM * 2; i++) {
Message msg = queueConsumer.receive(1000);
assertNotNull(msg);
assertNotNull("Message " + i + " was null", msg);
assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ? HIGH_PRI : LOW_PRI, msg.getJMSPriority());
}
}