diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java index 7c4cc3bdfb..2a4a788a4c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java @@ -85,9 +85,11 @@ public interface JDBCAdapter { void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception; - long[] doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriberName) throws SQLException, IOException; + long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriberName) throws SQLException, IOException; void doMessageIdScan(TransactionContext c, int limit, JDBCMessageIdScanListener listener) throws SQLException, IOException; long doGetLastProducerSequenceId(TransactionContext c, ProducerId id) throws SQLException, IOException; + + void doSetLastAckWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, long re, long re1) throws SQLException, IOException; } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java index d49cd6b501..90047271f4 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java @@ -43,8 +43,6 @@ import org.apache.commons.logging.LogFactory; public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore { private static final Log LOG = LogFactory.getLog(JDBCTopicMessageStore.class); - private Map subscriberLastMessageMap = new ConcurrentHashMap(); - private Map subscriberLastPriorityMap = new ConcurrentHashMap(); public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) { super(persistenceAdapter, adapter, wireFormat, topic, audit); @@ -57,13 +55,16 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess } return; } - // Get a connection and insert the message into the DB. TransactionContext c = persistenceAdapter.getTransactionContext(context); try { - long[] res = adapter.getStoreSequenceId(c, destination, messageId); - adapter.doSetLastAck(c, destination, clientId, subscriptionName, res[0], res[1]); + long[] res = adapter.getStoreSequenceId(c, destination, messageId); + if (this.isPrioritizedMessages()) { + adapter.doSetLastAckWithPriority(c, destination, clientId, subscriptionName, res[0], res[1]); + } else { + adapter.doSetLastAck(c, destination, clientId, subscriptionName, res[0], res[1]); + } if (LOG.isTraceEnabled()) { - LOG.trace("ack - seq: " + res[0] + ", priority: " + res[1]); + LOG.trace(clientId + ":" + subscriptionName + " ack, seq: " + res[0] + ", priority: " + res[1]); } } catch (SQLException e) { JDBCPersistenceAdapter.log("JDBC Failure: ", e); @@ -102,31 +103,15 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess public synchronized void recoverNextMessages(final String clientId, final String subscriptionName, final int maxReturned, final MessageRecoveryListener listener) throws Exception { TransactionContext c = persistenceAdapter.getTransactionContext(); - String subcriberId = getSubscriptionKey(clientId, subscriptionName); - AtomicLong last = subscriberLastMessageMap.get(subcriberId); - AtomicLong priority = subscriberLastPriorityMap.get(subcriberId); - if (last == null) { - long[] lastAcked = adapter.doGetLastAckedDurableSubscriberMessageId(c, destination, clientId, subscriptionName); - last = new AtomicLong(lastAcked[0]); - subscriberLastMessageMap.put(subcriberId, last); - priority = new AtomicLong(lastAcked[1]); - subscriberLastMessageMap.put(subcriberId, priority); - } - if (LOG.isTraceEnabled()) { - LOG.trace("recoverNextMessage - last: " + last.get() + ", priority: " + priority); - } - final AtomicLong finalLast = last; - final AtomicLong finalPriority = priority; try { - adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName, last.get(), priority.get(), maxReturned, new JDBCMessageRecoveryListener() { + adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName, + 0, 0, maxReturned, new JDBCMessageRecoveryListener() { public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { if (listener.hasSpace()) { Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data)); msg.getMessageId().setBrokerSequenceId(sequenceId); if (listener.recoverMessage(msg)) { - finalLast.set(sequenceId); - finalPriority.set(msg.getPriority()); return true; } } @@ -142,15 +127,11 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess JDBCPersistenceAdapter.log("JDBC Failure: ", e); } finally { c.close(); - subscriberLastMessageMap.put(subcriberId, finalLast); - subscriberLastPriorityMap.put(subcriberId, finalPriority); } } public void resetBatching(String clientId, String subscriptionName) { - String subcriberId = getSubscriptionKey(clientId, subscriptionName); - subscriberLastMessageMap.remove(subcriberId); - subscriberLastPriorityMap.remove(subcriberId); + // DB always recovers from last ack } public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java index 7c125f3961..c5d282d88b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java @@ -48,7 +48,7 @@ public class Statements { private String createDurableSubStatement; private String findDurableSubStatement; private String findAllDurableSubsStatement; - private String updateLastAckOfDurableSubStatement; + private String updateLastPriorityAckRowOfDurableSubStatement; private String deleteSubscriptionStatement; private String findAllDurableSubMessagesStatement; private String findDurableSubMessagesStatement; @@ -74,6 +74,8 @@ public class Statements { private String insertDurablePriorityAckStatement; private String updateDurableLastAckStatement; + private String deleteOldMessagesStatementWithPriority; + private String durableSubscriberMessageCountStatementWithPriority; public String[] getCreateSchemaStatements() { if (createSchemaStatements == null) { @@ -213,7 +215,7 @@ public class Statements { public String getFindDurableSubStatement() { if (findDurableSubStatement == null) { findDurableSubStatement = "SELECT SELECTOR, SUB_DEST " + "FROM " + getFullAckTableName() - + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=? AND SUB_DEST IS NOT NULL"; + + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?"; } return findDurableSubStatement; } @@ -221,17 +223,17 @@ public class Statements { public String getFindAllDurableSubsStatement() { if (findAllDurableSubsStatement == null) { findAllDurableSubsStatement = "SELECT SELECTOR, SUB_NAME, CLIENT_ID, SUB_DEST" + " FROM " - + getFullAckTableName() + " WHERE CONTAINER=? AND SUB_DEST IS NOT NULL"; + + getFullAckTableName() + " WHERE CONTAINER=? AND PRIORITY=0"; } return findAllDurableSubsStatement; } - public String getUpdateLastAckOfDurableSubStatement() { - if (updateLastAckOfDurableSubStatement == null) { - updateLastAckOfDurableSubStatement = "UPDATE " + getFullAckTableName() + " SET LAST_ACKED_ID=?" + public String getUpdateLastPriorityAckRowOfDurableSubStatement() { + if (updateLastPriorityAckRowOfDurableSubStatement == null) { + updateLastPriorityAckRowOfDurableSubStatement = "UPDATE " + getFullAckTableName() + " SET LAST_ACKED_ID=?" + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=? AND PRIORITY=?"; } - return updateLastAckOfDurableSubStatement; + return updateLastPriorityAckRowOfDurableSubStatement; } public String getDeleteSubscriptionStatement() { @@ -248,7 +250,7 @@ public class Statements { + " M, " + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" - + " ORDER BY M.ID"; + + " ORDER BY M.PRIORITY DESC, M.ID"; } return findAllDurableSubMessagesStatement; } @@ -258,7 +260,7 @@ public class Statements { findDurableSubMessagesStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, " + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" - + " AND M.CONTAINER=D.CONTAINER AND M.ID > ?" + + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" + " ORDER BY M.ID"; } return findDurableSubMessagesStatement; @@ -269,20 +271,8 @@ public class Statements { findDurableSubMessagesByPriorityStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, " + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" - + " AND M.CONTAINER=D.CONTAINER AND " - + "((M.ID > ? AND M.PRIORITY = ?) " - + " OR (M.PRIORITY <> ? " - + " AND ( M.ID >" - + " ( SELECT LAST_ACKED_ID FROM " + getFullAckTableName() - + " WHERE CONTAINER=D.CONTAINER AND CLIENT_ID=D.CLIENT_ID" - + " AND SUB_NAME=D.SUB_NAME AND PRIORITY=M.PRIORITY )" - + " OR " - + " ( (SELECT COUNT(LAST_ACKED_ID) FROM " + getFullAckTableName() - + " WHERE CONTAINER=D.CONTAINER AND CLIENT_ID=D.CLIENT_ID" - + " AND SUB_NAME=D.SUB_NAME AND PRIORITY=M.PRIORITY) = 0)" - + " )" - + " )" - + ")" + + " AND M.CONTAINER=D.CONTAINER" + + " AND M.PRIORITY=D.PRIORITY AND M.ID > D.LAST_ACKED_ID" + " ORDER BY M.PRIORITY DESC, M.ID"; } return findDurableSubMessagesByPriorityStatement; @@ -324,21 +314,33 @@ public class Statements { + " M, " + getFullAckTableName() + " D " - + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=? AND D.SUB_DEST IS NOT NULL" + + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" + " AND M.CONTAINER=D.CONTAINER " - + " AND ( M.ID >" + + " AND M.ID >" + " ( SELECT LAST_ACKED_ID FROM " + getFullAckTableName() + " WHERE CONTAINER=D.CONTAINER AND CLIENT_ID=D.CLIENT_ID" - + " AND SUB_NAME=D.SUB_NAME AND PRIORITY=M.PRIORITY )" - + " OR " - + " ( (SELECT COUNT(LAST_ACKED_ID) FROM " + getFullAckTableName() - + " WHERE CONTAINER=D.CONTAINER AND CLIENT_ID=D.CLIENT_ID" - + " AND SUB_NAME=D.SUB_NAME AND PRIORITY=M.PRIORITY) = 0)" - + " )"; + + " AND SUB_NAME=D.SUB_NAME )"; + } return durableSubscriberMessageCountStatement; } + public String getDurableSubscriberMessageCountStatementWithPriority() { + if (durableSubscriberMessageCountStatementWithPriority == null) { + durableSubscriberMessageCountStatementWithPriority = "SELECT COUNT(*) FROM " + + getFullMessageTableName() + + " M, " + + getFullAckTableName() + + " D " + + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" + + " AND M.CONTAINER=D.CONTAINER " + + " AND M.PRIORITY=D.PRIORITY " + + " AND M.ID > D.LAST_ACKED_ID"; + } + + return durableSubscriberMessageCountStatementWithPriority; + } + public String getFindAllDestinationsStatement() { if (findAllDestinationsStatement == null) { findAllDestinationsStatement = "SELECT DISTINCT CONTAINER FROM " + getFullMessageTableName(); @@ -360,29 +362,37 @@ public class Statements { return removeAllSubscriptionsStatement; } + public String getDeleteOldMessagesStatementWithPriority() { + if (deleteOldMessagesStatementWithPriority == null) { + deleteOldMessagesStatementWithPriority = "DELETE FROM " + getFullMessageTableName() + + " WHERE ( EXPIRATION<>0 AND EXPIRATION0 AND EXPIRATION