https://issues.apache.org/activemq/browse/AMQ-2843 - first stab at supporting priority for durable subs

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@966712 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2010-07-22 15:46:18 +00:00
parent 40f921dab9
commit 4f5e620d1c
7 changed files with 116 additions and 50 deletions

View File

@ -506,9 +506,6 @@
<!-- breaks hudson: disable till we get a chance to give it the time that it needs - http://hudson.zones.apache.org/hudson/job/ActiveMQ/org.apache.activemq$activemq-core/199/testReport/org.apache.activemq.network/BrokerNetworkWithStuckMessagesTest/testBrokerNetworkWithStuckMessages/ -->
<exclude>**/BrokerNetworkWithStuckMessagesTest.*</exclude>
<!-- exclude until implemented -->
<exclude>**/JDBCMessagePriorityTest.*</exclude>
</excludes>
</configuration>
</plugin>

View File

@ -51,19 +51,19 @@ public interface JDBCAdapter {
void doRecover(TransactionContext c, ActiveMQDestination destination, JDBCMessageRecoveryListener listener) throws Exception;
void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, long seq) throws SQLException, IOException;
void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, long seq, long prio) throws SQLException, IOException;
void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, JDBCMessageRecoveryListener listener)
throws Exception;
void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, long seq, int maxReturned,
void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, long seq, long priority, int maxReturned,
JDBCMessageRecoveryListener listener) throws Exception;
void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo subscriptionInfo, boolean retroactive) throws SQLException, IOException;
SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName) throws SQLException, IOException;
long getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException;
long[] getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException;
void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException, IOException;

View File

@ -284,7 +284,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
long result = -1;
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
result = adapter.getStoreSequenceId(c, destination, messageId);
result = adapter.getStoreSequenceId(c, destination, messageId)[0];
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to get store sequenceId for messageId: " + messageId +", on: " + destination + ". Reason: " + e, e);

View File

@ -40,6 +40,7 @@ import org.apache.activemq.wireformat.WireFormat;
public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore {
private Map<String, AtomicLong> subscriberLastMessageMap = new ConcurrentHashMap<String, AtomicLong>();
private Map<String, AtomicLong> subscriberLastPriorityMap = new ConcurrentHashMap<String, AtomicLong>();
public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) {
super(persistenceAdapter, adapter, wireFormat, topic, audit);
@ -49,8 +50,8 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
// Get a connection and insert the message into the DB.
TransactionContext c = persistenceAdapter.getTransactionContext(context);
try {
long seq = adapter.getStoreSequenceId(c, destination, messageId);
adapter.doSetLastAck(c, destination, clientId, subscriptionName, seq);
long[] res = adapter.getStoreSequenceId(c, destination, messageId);
adapter.doSetLastAck(c, destination, clientId, subscriptionName, res[0], res[1]);
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to store acknowledgment for: " + clientId + " on message " + messageId + " in container: " + e, e);
@ -63,7 +64,6 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
* @throws Exception
*/
public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
adapter.doRecoverSubscription(c, destination, clientId, subscriptionName, new JDBCMessageRecoveryListener() {
@ -91,14 +91,18 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
TransactionContext c = persistenceAdapter.getTransactionContext();
String subcriberId = getSubscriptionKey(clientId, subscriptionName);
AtomicLong last = subscriberLastMessageMap.get(subcriberId);
AtomicLong priority = subscriberLastPriorityMap.get(subcriberId);
if (last == null) {
long lastAcked = adapter.doGetLastAckedDurableSubscriberMessageId(c, destination, clientId, subscriptionName);
last = new AtomicLong(lastAcked);
subscriberLastMessageMap.put(subcriberId, last);
priority = new AtomicLong(Byte.MAX_VALUE - 1);
subscriberLastMessageMap.put(subcriberId, priority);
}
final AtomicLong finalLast = last;
final AtomicLong finalPriority = priority;
try {
adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName, last.get(), maxReturned, new JDBCMessageRecoveryListener() {
adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName, last.get(), priority.get(), maxReturned, new JDBCMessageRecoveryListener() {
public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
if (listener.hasSpace()) {
@ -106,6 +110,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
msg.getMessageId().setBrokerSequenceId(sequenceId);
listener.recoverMessage(msg);
finalLast.set(sequenceId);
finalPriority.set(msg.getPriority());
return true;
}
return false;
@ -120,13 +125,15 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
} finally {
c.close();
last.set(finalLast.get());
subscriberLastMessageMap.put(subcriberId, finalLast);
subscriberLastPriorityMap.put(subcriberId, finalPriority);
}
}
public void resetBatching(String clientId, String subscriptionName) {
String subcriberId = getSubscriptionKey(clientId, subscriptionName);
subscriberLastMessageMap.remove(subcriberId);
subscriberLastPriorityMap.remove(subcriberId);
}
/**

View File

@ -52,6 +52,7 @@ public class Statements {
private String deleteSubscriptionStatement;
private String findAllDurableSubMessagesStatement;
private String findDurableSubMessagesStatement;
private String findDurableSubMessagesByPriorityStatement;
private String findAllDestinationsStatement;
private String removeAllMessagesStatement;
private String removeAllSubscriptionsStatement;
@ -86,7 +87,7 @@ public class Statements {
+ ", SUB_DEST " + stringIdDataType
+ ", CLIENT_ID " + stringIdDataType + " NOT NULL" + ", SUB_NAME " + stringIdDataType
+ " NOT NULL" + ", SELECTOR " + stringIdDataType + ", LAST_ACKED_ID " + sequenceDataType
+ ", PRIMARY KEY ( CONTAINER, CLIENT_ID, SUB_NAME))",
+ ", PRIORITY " + sequenceDataType + ", PRIMARY KEY ( CONTAINER, CLIENT_ID, SUB_NAME))",
"CREATE TABLE " + getFullLockTableName()
+ "( ID " + longDataType + " NOT NULL, TIME " + longDataType
+ ", BROKER_NAME " + stringIdDataType + ", PRIMARY KEY (ID) )",
@ -130,7 +131,7 @@ public class Statements {
public String getFindMessageSequenceIdStatement() {
if (findMessageSequenceIdStatement == null) {
findMessageSequenceIdStatement = "SELECT ID FROM " + getFullMessageTableName()
findMessageSequenceIdStatement = "SELECT ID, PRIORITY FROM " + getFullMessageTableName()
+ " WHERE MSGID_PROD=? AND MSGID_SEQ=? AND CONTAINER=?";
}
return findMessageSequenceIdStatement;
@ -195,8 +196,8 @@ public class Statements {
if (createDurableSubStatement == null) {
createDurableSubStatement = "INSERT INTO "
+ getFullAckTableName()
+ "(CONTAINER, CLIENT_ID, SUB_NAME, SELECTOR, LAST_ACKED_ID, SUB_DEST) "
+ "VALUES (?, ?, ?, ?, ?, ?)";
+ "(CONTAINER, CLIENT_ID, SUB_NAME, SELECTOR, LAST_ACKED_ID, SUB_DEST, PRIORITY) "
+ "VALUES (?, ?, ?, ?, ?, ?, ?)";
}
return createDurableSubStatement;
}
@ -219,7 +220,7 @@ public class Statements {
public String getUpdateLastAckOfDurableSubStatement() {
if (updateLastAckOfDurableSubStatement == null) {
updateLastAckOfDurableSubStatement = "UPDATE " + getFullAckTableName() + " SET LAST_ACKED_ID=?"
updateLastAckOfDurableSubStatement = "UPDATE " + getFullAckTableName() + " SET LAST_ACKED_ID=?, PRIORITY=?"
+ " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
}
return updateLastAckOfDurableSubStatement;
@ -254,6 +255,18 @@ public class Statements {
}
return findDurableSubMessagesStatement;
}
public String getFindDurableSubMessagesByPriorityStatement() {
if (findDurableSubMessagesByPriorityStatement == null) {
findDurableSubMessagesByPriorityStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, "
+ getFullAckTableName() + " D "
+ " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+ " AND M.CONTAINER=D.CONTAINER AND "
+ "((M.ID > ? AND M.PRIORITY = ?) OR M.PRIORITY < ?)"
+ " ORDER BY M.PRIORITY DESC, M.ID";
}
return findDurableSubMessagesByPriorityStatement;
}
public String findAllDurableSubMessagesStatement() {
if (findAllDurableSubMessagesStatement == null) {

View File

@ -17,8 +17,11 @@
package org.apache.activemq.store.jdbc.adapter;
import java.io.IOException;
import java.io.PrintStream;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
@ -26,6 +29,8 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.Set;
import javax.jms.Message;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
@ -249,7 +254,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
}
public long getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException {
public long[] getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
try {
@ -259,9 +264,9 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
s.setString(3, destination.getQualifiedName());
rs = s.executeQuery();
if (!rs.next()) {
return 0;
return new long[]{0,0};
}
return rs.getLong(1);
return new long[]{rs.getLong(1), rs.getLong(2)};
} finally {
close(rs);
close(s);
@ -378,7 +383,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId,
String subscriptionName, long seq) throws SQLException, IOException {
String subscriptionName, long seq, long prio) throws SQLException, IOException {
PreparedStatement s = c.getUpdateLastAckStatement();
try {
if (s == null) {
@ -388,9 +393,10 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
}
s.setLong(1, seq);
s.setString(2, destination.getQualifiedName());
s.setString(3, clientId);
s.setString(4, subscriptionName);
s.setLong(2, prio);
s.setString(3, destination.getQualifiedName());
s.setString(4, clientId);
s.setString(5, subscriptionName);
if (this.batchStatments) {
s.addBatch();
} else if (s.executeUpdate() != 1) {
@ -435,16 +441,25 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId,
String subscriptionName, long seq, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
PreparedStatement s = null;
ResultSet rs = null;
try {
s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement());
if (isPrioritizedMessages()) {
s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesByPriorityStatement());
} else {
s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement());
}
s.setMaxRows(maxReturned);
s.setString(1, destination.getQualifiedName());
s.setString(2, clientId);
s.setString(3, subscriptionName);
s.setLong(4, seq);
if (isPrioritizedMessages()) {
s.setLong(5, priority);
s.setLong(6, priority);
}
rs = s.executeQuery();
int count = 0;
if (this.statements.isUseExternalMessageReferences()) {
@ -507,6 +522,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
PreparedStatement s = null;
try {
long lastMessageId = -1;
long priority = Byte.MAX_VALUE - 1;
if (!retroactive) {
s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
ResultSet rs = null;
@ -527,6 +543,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
s.setString(4, info.getSelector());
s.setLong(5, lastMessageId);
s.setString(6, info.getSubscribedDestination().getQualifiedName());
s.setLong(7, priority);
if (s.executeUpdate() != 1) {
throw new IOException("Could not create durable subscription for: " + info.getClientId());
}
@ -813,29 +830,61 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
close(s);
}
}
/*
* Useful for debugging. public void dumpTables(Connection c, String destinationName, String clientId, String
* subscriptionName) throws SQLException { printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); printQuery(c,
* "Select * from ACTIVEMQ_ACKS", System.out); PreparedStatement s = c.prepareStatement("SELECT M.ID,
* D.LAST_ACKED_ID FROM " +"ACTIVEMQ_MSGS M, " +"ACTIVEMQ_ACKS D " +"WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND
* D.SUB_NAME=?" +" AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" +" ORDER BY M.ID");
* s.setString(1,destinationName); s.setString(2,clientId); s.setString(3,subscriptionName);
* printQuery(s,System.out); }
*
* public void dumpTables(Connection c) throws SQLException { printQuery(c, "Select * from ACTIVEMQ_MSGS",
* System.out); printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); }
*
* private void printQuery(Connection c, String query, PrintStream out) throws SQLException {
* printQuery(c.prepareStatement(query), out); }
*
* private void printQuery(PreparedStatement s, PrintStream out) throws SQLException {
*
* ResultSet set=null; try { set = s.executeQuery(); ResultSetMetaData metaData = set.getMetaData(); for( int i=1; i<=
* metaData.getColumnCount(); i++ ) { if(i==1) out.print("||"); out.print(metaData.getColumnName(i)+"||"); }
* out.println(); while(set.next()) { for( int i=1; i<= metaData.getColumnCount(); i++ ) { if(i==1) out.print("|");
* out.print(set.getString(i)+"|"); } out.println(); } } finally { try { set.close(); } catch (Throwable ignore) {}
* try { s.close(); } catch (Throwable ignore) {} } }
*/
/* public void dumpTables(Connection c, String destinationName, String clientId, String
subscriptionName) throws SQLException {
printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out);
printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out);
PreparedStatement s = c.prepareStatement("SELECT M.ID, D.LAST_ACKED_ID FROM "
+ "ACTIVEMQ_MSGS M, " +"ACTIVEMQ_ACKS D "
+ "WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+ " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID"
+ " ORDER BY M.ID");
s.setString(1,destinationName); s.setString(2,clientId); s.setString(3,subscriptionName);
printQuery(s,System.out); }
public void dumpTables(Connection c) throws SQLException {
printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out);
printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out);
}
private void printQuery(Connection c, String query, PrintStream out)
throws SQLException {
printQuery(c.prepareStatement(query), out);
}
private void printQuery(PreparedStatement s, PrintStream out)
throws SQLException {
ResultSet set = null;
try {
set = s.executeQuery();
ResultSetMetaData metaData = set.getMetaData();
for (int i = 1; i <= metaData.getColumnCount(); i++) {
if (i == 1)
out.print("||");
out.print(metaData.getColumnName(i) + "||");
}
out.println();
while (set.next()) {
for (int i = 1; i <= metaData.getColumnCount(); i++) {
if (i == 1)
out.print("|");
out.print(set.getString(i) + "|");
}
out.println();
}
} finally {
try {
set.close();
} catch (Throwable ignore) {
}
try {
s.close();
} catch (Throwable ignore) {
}
}
} */
public long doGetLastProducerSequenceId(TransactionContext c, ProducerId id)
throws SQLException, IOException {

View File

@ -177,7 +177,7 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
sub = sess.createDurableSubscriber(topic, "priority");
for (int i = 0; i < MSG_NUM * 2; i++) {
Message msg = sub.receive(1000);
assertNotNull(msg);
assertNotNull("Message " + i + " was null", msg);
assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ? HIGH_PRI : LOW_PRI, msg.getJMSPriority());
}