more improvements on https://issues.apache.org/activemq/browse/AMQ-2980 - split prioritised and regular statements, have ack row per priority and fix up deletion and interleaved durable subs. Simpler slq improves throughput

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1033076 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-11-09 16:34:54 +00:00
parent 3e8abcc43f
commit fa4481c421
6 changed files with 217 additions and 156 deletions

View File

@ -85,9 +85,11 @@ public interface JDBCAdapter {
void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception; void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception;
long[] doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriberName) throws SQLException, IOException; long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriberName) throws SQLException, IOException;
void doMessageIdScan(TransactionContext c, int limit, JDBCMessageIdScanListener listener) throws SQLException, IOException; void doMessageIdScan(TransactionContext c, int limit, JDBCMessageIdScanListener listener) throws SQLException, IOException;
long doGetLastProducerSequenceId(TransactionContext c, ProducerId id) throws SQLException, IOException; long doGetLastProducerSequenceId(TransactionContext c, ProducerId id) throws SQLException, IOException;
void doSetLastAckWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, long re, long re1) throws SQLException, IOException;
} }

View File

@ -43,8 +43,6 @@ import org.apache.commons.logging.LogFactory;
public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore { public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore {
private static final Log LOG = LogFactory.getLog(JDBCTopicMessageStore.class); private static final Log LOG = LogFactory.getLog(JDBCTopicMessageStore.class);
private Map<String, AtomicLong> subscriberLastMessageMap = new ConcurrentHashMap<String, AtomicLong>();
private Map<String, AtomicLong> subscriberLastPriorityMap = new ConcurrentHashMap<String, AtomicLong>();
public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) { public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) {
super(persistenceAdapter, adapter, wireFormat, topic, audit); super(persistenceAdapter, adapter, wireFormat, topic, audit);
@ -57,13 +55,16 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
} }
return; return;
} }
// Get a connection and insert the message into the DB.
TransactionContext c = persistenceAdapter.getTransactionContext(context); TransactionContext c = persistenceAdapter.getTransactionContext(context);
try { try {
long[] res = adapter.getStoreSequenceId(c, destination, messageId); long[] res = adapter.getStoreSequenceId(c, destination, messageId);
adapter.doSetLastAck(c, destination, clientId, subscriptionName, res[0], res[1]); if (this.isPrioritizedMessages()) {
adapter.doSetLastAckWithPriority(c, destination, clientId, subscriptionName, res[0], res[1]);
} else {
adapter.doSetLastAck(c, destination, clientId, subscriptionName, res[0], res[1]);
}
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("ack - seq: " + res[0] + ", priority: " + res[1]); LOG.trace(clientId + ":" + subscriptionName + " ack, seq: " + res[0] + ", priority: " + res[1]);
} }
} catch (SQLException e) { } catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e); JDBCPersistenceAdapter.log("JDBC Failure: ", e);
@ -102,31 +103,15 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
public synchronized void recoverNextMessages(final String clientId, final String subscriptionName, final int maxReturned, final MessageRecoveryListener listener) public synchronized void recoverNextMessages(final String clientId, final String subscriptionName, final int maxReturned, final MessageRecoveryListener listener)
throws Exception { throws Exception {
TransactionContext c = persistenceAdapter.getTransactionContext(); TransactionContext c = persistenceAdapter.getTransactionContext();
String subcriberId = getSubscriptionKey(clientId, subscriptionName);
AtomicLong last = subscriberLastMessageMap.get(subcriberId);
AtomicLong priority = subscriberLastPriorityMap.get(subcriberId);
if (last == null) {
long[] lastAcked = adapter.doGetLastAckedDurableSubscriberMessageId(c, destination, clientId, subscriptionName);
last = new AtomicLong(lastAcked[0]);
subscriberLastMessageMap.put(subcriberId, last);
priority = new AtomicLong(lastAcked[1]);
subscriberLastMessageMap.put(subcriberId, priority);
}
if (LOG.isTraceEnabled()) {
LOG.trace("recoverNextMessage - last: " + last.get() + ", priority: " + priority);
}
final AtomicLong finalLast = last;
final AtomicLong finalPriority = priority;
try { try {
adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName, last.get(), priority.get(), maxReturned, new JDBCMessageRecoveryListener() { adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName,
0, 0, maxReturned, new JDBCMessageRecoveryListener() {
public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
if (listener.hasSpace()) { if (listener.hasSpace()) {
Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data)); Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
msg.getMessageId().setBrokerSequenceId(sequenceId); msg.getMessageId().setBrokerSequenceId(sequenceId);
if (listener.recoverMessage(msg)) { if (listener.recoverMessage(msg)) {
finalLast.set(sequenceId);
finalPriority.set(msg.getPriority());
return true; return true;
} }
} }
@ -142,15 +127,11 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
JDBCPersistenceAdapter.log("JDBC Failure: ", e); JDBCPersistenceAdapter.log("JDBC Failure: ", e);
} finally { } finally {
c.close(); c.close();
subscriberLastMessageMap.put(subcriberId, finalLast);
subscriberLastPriorityMap.put(subcriberId, finalPriority);
} }
} }
public void resetBatching(String clientId, String subscriptionName) { public void resetBatching(String clientId, String subscriptionName) {
String subcriberId = getSubscriptionKey(clientId, subscriptionName); // DB always recovers from last ack
subscriberLastMessageMap.remove(subcriberId);
subscriberLastPriorityMap.remove(subcriberId);
} }
public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {

View File

@ -48,7 +48,7 @@ public class Statements {
private String createDurableSubStatement; private String createDurableSubStatement;
private String findDurableSubStatement; private String findDurableSubStatement;
private String findAllDurableSubsStatement; private String findAllDurableSubsStatement;
private String updateLastAckOfDurableSubStatement; private String updateLastPriorityAckRowOfDurableSubStatement;
private String deleteSubscriptionStatement; private String deleteSubscriptionStatement;
private String findAllDurableSubMessagesStatement; private String findAllDurableSubMessagesStatement;
private String findDurableSubMessagesStatement; private String findDurableSubMessagesStatement;
@ -74,6 +74,8 @@ public class Statements {
private String insertDurablePriorityAckStatement; private String insertDurablePriorityAckStatement;
private String updateDurableLastAckStatement; private String updateDurableLastAckStatement;
private String deleteOldMessagesStatementWithPriority;
private String durableSubscriberMessageCountStatementWithPriority;
public String[] getCreateSchemaStatements() { public String[] getCreateSchemaStatements() {
if (createSchemaStatements == null) { if (createSchemaStatements == null) {
@ -213,7 +215,7 @@ public class Statements {
public String getFindDurableSubStatement() { public String getFindDurableSubStatement() {
if (findDurableSubStatement == null) { if (findDurableSubStatement == null) {
findDurableSubStatement = "SELECT SELECTOR, SUB_DEST " + "FROM " + getFullAckTableName() findDurableSubStatement = "SELECT SELECTOR, SUB_DEST " + "FROM " + getFullAckTableName()
+ " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=? AND SUB_DEST IS NOT NULL"; + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
} }
return findDurableSubStatement; return findDurableSubStatement;
} }
@ -221,17 +223,17 @@ public class Statements {
public String getFindAllDurableSubsStatement() { public String getFindAllDurableSubsStatement() {
if (findAllDurableSubsStatement == null) { if (findAllDurableSubsStatement == null) {
findAllDurableSubsStatement = "SELECT SELECTOR, SUB_NAME, CLIENT_ID, SUB_DEST" + " FROM " findAllDurableSubsStatement = "SELECT SELECTOR, SUB_NAME, CLIENT_ID, SUB_DEST" + " FROM "
+ getFullAckTableName() + " WHERE CONTAINER=? AND SUB_DEST IS NOT NULL"; + getFullAckTableName() + " WHERE CONTAINER=? AND PRIORITY=0";
} }
return findAllDurableSubsStatement; return findAllDurableSubsStatement;
} }
public String getUpdateLastAckOfDurableSubStatement() { public String getUpdateLastPriorityAckRowOfDurableSubStatement() {
if (updateLastAckOfDurableSubStatement == null) { if (updateLastPriorityAckRowOfDurableSubStatement == null) {
updateLastAckOfDurableSubStatement = "UPDATE " + getFullAckTableName() + " SET LAST_ACKED_ID=?" updateLastPriorityAckRowOfDurableSubStatement = "UPDATE " + getFullAckTableName() + " SET LAST_ACKED_ID=?"
+ " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=? AND PRIORITY=?"; + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=? AND PRIORITY=?";
} }
return updateLastAckOfDurableSubStatement; return updateLastPriorityAckRowOfDurableSubStatement;
} }
public String getDeleteSubscriptionStatement() { public String getDeleteSubscriptionStatement() {
@ -248,7 +250,7 @@ public class Statements {
+ " M, " + getFullAckTableName() + " D " + " M, " + getFullAckTableName() + " D "
+ " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+ " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID"
+ " ORDER BY M.ID"; + " ORDER BY M.PRIORITY DESC, M.ID";
} }
return findAllDurableSubMessagesStatement; return findAllDurableSubMessagesStatement;
} }
@ -258,7 +260,7 @@ public class Statements {
findDurableSubMessagesStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, " findDurableSubMessagesStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, "
+ getFullAckTableName() + " D " + getFullAckTableName() + " D "
+ " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+ " AND M.CONTAINER=D.CONTAINER AND M.ID > ?" + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID"
+ " ORDER BY M.ID"; + " ORDER BY M.ID";
} }
return findDurableSubMessagesStatement; return findDurableSubMessagesStatement;
@ -269,20 +271,8 @@ public class Statements {
findDurableSubMessagesByPriorityStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, " findDurableSubMessagesByPriorityStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, "
+ getFullAckTableName() + " D " + getFullAckTableName() + " D "
+ " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+ " AND M.CONTAINER=D.CONTAINER AND " + " AND M.CONTAINER=D.CONTAINER"
+ "((M.ID > ? AND M.PRIORITY = ?) " + " AND M.PRIORITY=D.PRIORITY AND M.ID > D.LAST_ACKED_ID"
+ " OR (M.PRIORITY <> ? "
+ " AND ( M.ID >"
+ " ( SELECT LAST_ACKED_ID FROM " + getFullAckTableName()
+ " WHERE CONTAINER=D.CONTAINER AND CLIENT_ID=D.CLIENT_ID"
+ " AND SUB_NAME=D.SUB_NAME AND PRIORITY=M.PRIORITY )"
+ " OR "
+ " ( (SELECT COUNT(LAST_ACKED_ID) FROM " + getFullAckTableName()
+ " WHERE CONTAINER=D.CONTAINER AND CLIENT_ID=D.CLIENT_ID"
+ " AND SUB_NAME=D.SUB_NAME AND PRIORITY=M.PRIORITY) = 0)"
+ " )"
+ " )"
+ ")"
+ " ORDER BY M.PRIORITY DESC, M.ID"; + " ORDER BY M.PRIORITY DESC, M.ID";
} }
return findDurableSubMessagesByPriorityStatement; return findDurableSubMessagesByPriorityStatement;
@ -324,21 +314,33 @@ public class Statements {
+ " M, " + " M, "
+ getFullAckTableName() + getFullAckTableName()
+ " D " + " D "
+ " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=? AND D.SUB_DEST IS NOT NULL" + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+ " AND M.CONTAINER=D.CONTAINER " + " AND M.CONTAINER=D.CONTAINER "
+ " AND ( M.ID >" + " AND M.ID >"
+ " ( SELECT LAST_ACKED_ID FROM " + getFullAckTableName() + " ( SELECT LAST_ACKED_ID FROM " + getFullAckTableName()
+ " WHERE CONTAINER=D.CONTAINER AND CLIENT_ID=D.CLIENT_ID" + " WHERE CONTAINER=D.CONTAINER AND CLIENT_ID=D.CLIENT_ID"
+ " AND SUB_NAME=D.SUB_NAME AND PRIORITY=M.PRIORITY )" + " AND SUB_NAME=D.SUB_NAME )";
+ " OR "
+ " ( (SELECT COUNT(LAST_ACKED_ID) FROM " + getFullAckTableName()
+ " WHERE CONTAINER=D.CONTAINER AND CLIENT_ID=D.CLIENT_ID"
+ " AND SUB_NAME=D.SUB_NAME AND PRIORITY=M.PRIORITY) = 0)"
+ " )";
} }
return durableSubscriberMessageCountStatement; return durableSubscriberMessageCountStatement;
} }
public String getDurableSubscriberMessageCountStatementWithPriority() {
if (durableSubscriberMessageCountStatementWithPriority == null) {
durableSubscriberMessageCountStatementWithPriority = "SELECT COUNT(*) FROM "
+ getFullMessageTableName()
+ " M, "
+ getFullAckTableName()
+ " D "
+ " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+ " AND M.CONTAINER=D.CONTAINER "
+ " AND M.PRIORITY=D.PRIORITY "
+ " AND M.ID > D.LAST_ACKED_ID";
}
return durableSubscriberMessageCountStatementWithPriority;
}
public String getFindAllDestinationsStatement() { public String getFindAllDestinationsStatement() {
if (findAllDestinationsStatement == null) { if (findAllDestinationsStatement == null) {
findAllDestinationsStatement = "SELECT DISTINCT CONTAINER FROM " + getFullMessageTableName(); findAllDestinationsStatement = "SELECT DISTINCT CONTAINER FROM " + getFullMessageTableName();
@ -360,29 +362,37 @@ public class Statements {
return removeAllSubscriptionsStatement; return removeAllSubscriptionsStatement;
} }
public String getDeleteOldMessagesStatementWithPriority() {
if (deleteOldMessagesStatementWithPriority == null) {
deleteOldMessagesStatementWithPriority = "DELETE FROM " + getFullMessageTableName()
+ " WHERE ( EXPIRATION<>0 AND EXPIRATION<?)"
+ " OR (ID <= "
+ " ( SELECT min(" + getFullAckTableName() + ".LAST_ACKED_ID)"
+ " FROM " + getFullAckTableName() + " WHERE "
+ getFullAckTableName() + ".CONTAINER="
+ getFullMessageTableName() + ".CONTAINER"
+ " AND " + getFullAckTableName() + ".PRIORITY=" + getFullMessageTableName() + ".PRIORITY )"
+ " )";
}
return deleteOldMessagesStatementWithPriority;
}
public String getDeleteOldMessagesStatement() { public String getDeleteOldMessagesStatement() {
if (deleteOldMessagesStatement == null) { if (deleteOldMessagesStatement == null) {
deleteOldMessagesStatement = "DELETE FROM " + getFullMessageTableName() deleteOldMessagesStatement = "DELETE FROM " + getFullMessageTableName()
+ " WHERE ( EXPIRATION<>0 AND EXPIRATION<?)" + " WHERE ( EXPIRATION<>0 AND EXPIRATION<?)"
+ " OR (ID < " + " OR (ID <= "
+ " ( SELECT min(" + getFullAckTableName() + ".LAST_ACKED_ID)" + " ( SELECT min(" + getFullAckTableName() + ".LAST_ACKED_ID)"
+ " FROM " + getFullAckTableName() + " WHERE " + " FROM " + getFullAckTableName() + " WHERE "
+ getFullAckTableName() + ".CONTAINER=" + getFullAckTableName() + ".CONTAINER="
+ getFullMessageTableName() + ".CONTAINER" + getFullMessageTableName() + ".CONTAINER )"
+ " AND " + getFullAckTableName() + ".SUB_DEST IS NULL"
+ " AND " + getFullAckTableName() + ".PRIORITY=" + getFullMessageTableName() + ".PRIORITY )"
+ " AND ID <"
+ " ( SELECT min(" + getFullAckTableName() + ".LAST_ACKED_ID)"
+ " FROM " + getFullAckTableName() + " WHERE "
+ getFullAckTableName() + ".CONTAINER="
+ getFullMessageTableName() + ".CONTAINER"
+ " AND " + getFullAckTableName() + ".SUB_DEST IS NOT NULL )"
+ " )"; + " )";
} }
return deleteOldMessagesStatement; return deleteOldMessagesStatement;
} }
public String getLockCreateStatement() { public String getLockCreateStatement() {
if (lockCreateStatement == null) { if (lockCreateStatement == null) {
lockCreateStatement = "SELECT * FROM " + getFullLockTableName(); lockCreateStatement = "SELECT * FROM " + getFullLockTableName();
@ -441,11 +451,9 @@ public class Statements {
*/ */
public String getLastAckedDurableSubscriberMessageStatement() { public String getLastAckedDurableSubscriberMessageStatement() {
if (lastAckedDurableSubscriberMessageStatement == null) { if (lastAckedDurableSubscriberMessageStatement == null) {
lastAckedDurableSubscriberMessageStatement = "SELECT MAX(LAST_ACKED_ID), PRIORITY FROM " lastAckedDurableSubscriberMessageStatement = "SELECT MAX(LAST_ACKED_ID) FROM "
+ getFullAckTableName() + getFullAckTableName()
+ " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?" + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
+ " GROUP BY PRIORITY"
+ " ORDER BY PRIORITY ASC";
} }
return lastAckedDurableSubscriberMessageStatement; return lastAckedDurableSubscriberMessageStatement;
} }
@ -473,8 +481,7 @@ public class Statements {
public String getUpdateDurableLastAckStatement() { public String getUpdateDurableLastAckStatement() {
if (updateDurableLastAckStatement == null) { if (updateDurableLastAckStatement == null) {
updateDurableLastAckStatement = "UPDATE " + getFullAckTableName() updateDurableLastAckStatement = "UPDATE " + getFullAckTableName()
+ " SET LAST_ACKED_ID = ? WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?" + " SET LAST_ACKED_ID = ? WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
+ " AND PRIORITY = " + (Byte.MAX_VALUE - 1);
} }
return updateDurableLastAckStatement; return updateDurableLastAckStatement;
} }
@ -637,6 +644,10 @@ public class Statements {
this.deleteOldMessagesStatement = deleteOldMessagesStatment; this.deleteOldMessagesStatement = deleteOldMessagesStatment;
} }
public void setDeleteOldMessagesStatementWithPriority(String deleteOldMessagesStatmentWithPriority) {
this.deleteOldMessagesStatementWithPriority = deleteOldMessagesStatmentWithPriority;
}
public void setDeleteSubscriptionStatement(String deleteSubscriptionStatment) { public void setDeleteSubscriptionStatement(String deleteSubscriptionStatment) {
this.deleteSubscriptionStatement = deleteSubscriptionStatment; this.deleteSubscriptionStatement = deleteSubscriptionStatment;
} }
@ -697,8 +708,8 @@ public class Statements {
this.removeMessageStatement = removeMessageStatement; this.removeMessageStatement = removeMessageStatement;
} }
public void setUpdateLastAckOfDurableSubStatement(String updateLastAckOfDurableSub) { public void setUpdateLastPriorityAckRowOfDurableSubStatement(String updateLastPriorityAckRowOfDurableSubStatement) {
this.updateLastAckOfDurableSubStatement = updateLastAckOfDurableSub; this.updateLastPriorityAckRowOfDurableSubStatement = updateLastPriorityAckRowOfDurableSubStatement;
} }
public void setUpdateMessageStatement(String updateMessageStatment) { public void setUpdateMessageStatement(String updateMessageStatment) {
@ -743,6 +754,10 @@ public class Statements {
this.durableSubscriberMessageCountStatement = durableSubscriberMessageCountStatement; this.durableSubscriberMessageCountStatement = durableSubscriberMessageCountStatement;
} }
public void setDurableSubscriberMessageCountStatementWithPriority(String durableSubscriberMessageCountStatementWithPriority) {
this.durableSubscriberMessageCountStatementWithPriority = durableSubscriberMessageCountStatementWithPriority;
}
/** /**
* @param findNextMessagesStatement the findNextMessagesStatement to set * @param findNextMessagesStatement the findNextMessagesStatement to set
*/ */

View File

@ -409,15 +409,13 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
} }
} }
public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId, public void doSetLastAckWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId,
String subscriptionName, long seq, long prio) throws SQLException, IOException { String subscriptionName, long seq, long prio) throws SQLException, IOException {
doCreatePriorityAckRow(c, destination, clientId, subscriptionName, prio);
doUpdateLatestAckRow(c, destination, clientId, subscriptionName, seq, prio);
PreparedStatement s = c.getUpdateLastAckStatement(); PreparedStatement s = c.getUpdateLastAckStatement();
cleanupExclusiveLock.readLock().lock(); cleanupExclusiveLock.readLock().lock();
try { try {
if (s == null) { if (s == null) {
s = c.getConnection().prepareStatement(this.statements.getUpdateLastAckOfDurableSubStatement()); s = c.getConnection().prepareStatement(this.statements.getUpdateLastPriorityAckRowOfDurableSubStatement());
if (this.batchStatments) { if (this.batchStatments) {
c.setUpdateLastAckStatement(s); c.setUpdateLastAckStatement(s);
} }
@ -435,72 +433,39 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
} finally { } finally {
cleanupExclusiveLock.readLock().unlock(); cleanupExclusiveLock.readLock().unlock();
if (!this.batchStatments) { if (!this.batchStatments) {
s.close();
}
}
}
private void doCreatePriorityAckRow(TransactionContext c, ActiveMQDestination destination, String clientId,
String subscriptionName,long priority) throws SQLException, IOException{
PreparedStatement s = null;
ResultSet rs = null;
boolean exists = false;
cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getSelectDurablePriorityAckStatement());
s.setString(1, destination.getQualifiedName());
s.setString(2, clientId);
s.setString(3, subscriptionName);
s.setLong(4, priority);
rs = s.executeQuery();
exists = rs.next();
} finally {
cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}
if (!exists) {
cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getInsertDurablePriorityAckStatement());
s.setString(1, destination.getQualifiedName());
s.setString(2, clientId);
s.setString(3, subscriptionName);
s.setLong(4, priority);
if (s.executeUpdate() != 1) {
throw new IOException("Could not insert initial ack entry for priority: "
+ priority + ", for sub: " + subscriptionName);
}
} finally {
cleanupExclusiveLock.readLock().unlock();
close(s); close(s);
} }
} }
} }
private void doUpdateLatestAckRow(TransactionContext c, ActiveMQDestination destination, String clientId,
String subscriptionName, long seq, long priority) throws SQLException, IOException{ public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId,
PreparedStatement s = null; String subscriptionName, long seq, long priority) throws SQLException, IOException {
ResultSet rs = null; PreparedStatement s = c.getUpdateLastAckStatement();
cleanupExclusiveLock.readLock().lock(); cleanupExclusiveLock.readLock().lock();
try { try {
s = c.getConnection().prepareStatement(this.statements.getUpdateDurableLastAckStatement()); if (s == null) {
s = c.getConnection().prepareStatement(this.statements.getUpdateDurableLastAckStatement());
if (this.batchStatments) {
c.setUpdateLastAckStatement(s);
}
}
s.setLong(1, seq); s.setLong(1, seq);
s.setString(2, destination.getQualifiedName()); s.setString(2, destination.getQualifiedName());
s.setString(3, clientId); s.setString(3, clientId);
s.setString(4, subscriptionName); s.setString(4, subscriptionName);
if (s.executeUpdate() != 1) { if (this.batchStatments) {
s.addBatch();
} else if (s.executeUpdate() != 1) {
throw new IOException("Could not update last ack seq : " throw new IOException("Could not update last ack seq : "
+ seq + ", for sub: " + subscriptionName); + seq + ", for sub: " + subscriptionName);
} }
} finally { } finally {
cleanupExclusiveLock.readLock().unlock(); cleanupExclusiveLock.readLock().unlock();
close(rs); if (!this.batchStatments) {
close(s); close(s);
}
} }
} }
@ -553,11 +518,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
s.setString(1, destination.getQualifiedName()); s.setString(1, destination.getQualifiedName());
s.setString(2, clientId); s.setString(2, clientId);
s.setString(3, subscriptionName); s.setString(3, subscriptionName);
s.setLong(4, seq);
if (isPrioritizedMessages()) {
s.setLong(5, priority);
s.setLong(6, priority);
}
rs = s.executeQuery(); rs = s.executeQuery();
int count = 0; int count = 0;
if (this.statements.isUseExternalMessageReferences()) { if (this.statements.isUseExternalMessageReferences()) {
@ -587,7 +547,11 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
int result = 0; int result = 0;
cleanupExclusiveLock.readLock().lock(); cleanupExclusiveLock.readLock().lock();
try { try {
s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatement()); if (this.isPrioritizedMessages()) {
s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatementWithPriority());
} else {
s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatement());
}
s.setString(1, destination.getQualifiedName()); s.setString(1, destination.getQualifiedName());
s.setString(2, clientId); s.setString(2, clientId);
s.setString(3, subscriptionName); s.setString(3, subscriptionName);
@ -618,7 +582,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
cleanupExclusiveLock.readLock().lock(); cleanupExclusiveLock.readLock().lock();
try { try {
long lastMessageId = -1; long lastMessageId = -1;
long priority = Byte.MAX_VALUE - 1;
if (!retroactive) { if (!retroactive) {
s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement()); s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
ResultSet rs = null; ResultSet rs = null;
@ -633,16 +596,25 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
} }
} }
s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement()); s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement());
s.setString(1, info.getDestination().getQualifiedName()); int maxPriority = 1;
s.setString(2, info.getClientId()); if (this.isPrioritizedMessages()) {
s.setString(3, info.getSubscriptionName()); maxPriority = 10;
s.setString(4, info.getSelector());
s.setLong(5, lastMessageId);
s.setString(6, info.getSubscribedDestination().getQualifiedName());
s.setLong(7, priority);
if (s.executeUpdate() != 1) {
throw new IOException("Could not create durable subscription for: " + info.getClientId());
} }
for (int priority = 0; priority < maxPriority; priority++) {
s.setString(1, info.getDestination().getQualifiedName());
s.setString(2, info.getClientId());
s.setString(3, info.getSubscriptionName());
s.setString(4, info.getSelector());
s.setLong(5, lastMessageId);
s.setString(6, info.getSubscribedDestination().getQualifiedName());
s.setLong(7, priority);
if (s.executeUpdate() != 1) {
throw new IOException("Could not create durable subscription for: " + info.getClientId());
}
}
} finally { } finally {
cleanupExclusiveLock.readLock().unlock(); cleanupExclusiveLock.readLock().unlock();
close(s); close(s);
@ -744,8 +716,13 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
PreparedStatement s = null; PreparedStatement s = null;
cleanupExclusiveLock.writeLock().lock(); cleanupExclusiveLock.writeLock().lock();
try { try {
LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatement()); if (this.isPrioritizedMessages()) {
s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatement()); LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatementWithPriority());
s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatementWithPriority());
} else {
LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatement());
s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatement());
}
s.setLong(1, System.currentTimeMillis()); s.setLong(1, System.currentTimeMillis());
int i = s.executeUpdate(); int i = s.executeUpdate();
LOG.debug("Deleted " + i + " old message(s)."); LOG.debug("Deleted " + i + " old message(s).");
@ -755,11 +732,11 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
} }
} }
public long[] doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination, public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination,
String clientId, String subscriberName) throws SQLException, IOException { String clientId, String subscriberName) throws SQLException, IOException {
PreparedStatement s = null; PreparedStatement s = null;
ResultSet rs = null; ResultSet rs = null;
long[] result = new long[]{-1, Byte.MAX_VALUE - 1}; long result = -1;
cleanupExclusiveLock.readLock().lock(); cleanupExclusiveLock.readLock().lock();
try { try {
s = c.getConnection().prepareStatement(this.statements.getLastAckedDurableSubscriberMessageStatement()); s = c.getConnection().prepareStatement(this.statements.getLastAckedDurableSubscriberMessageStatement());
@ -768,8 +745,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
s.setString(3, subscriberName); s.setString(3, subscriberName);
rs = s.executeQuery(); rs = s.executeQuery();
if (rs.next()) { if (rs.next()) {
result[0] = rs.getLong(1); result = rs.getLong(1);
result[1] = rs.getLong(2);
} }
} finally { } finally {
cleanupExclusiveLock.readLock().unlock(); cleanupExclusiveLock.readLock().unlock();

View File

@ -18,6 +18,7 @@
package org.apache.activemq.store.jdbc; package org.apache.activemq.store.jdbc;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap;
import java.util.Vector; import java.util.Vector;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Message; import javax.jms.Message;
@ -117,10 +118,11 @@ public class JDBCMessagePriorityTest extends MessagePriorityTest {
} }
final int closeFrequency = MSG_NUM/2; final int closeFrequency = MSG_NUM/2;
HashMap dups = new HashMap();
sub = sess.createDurableSubscriber(topic, subName); sub = sess.createDurableSubscriber(topic, subName);
for (int i=0; i < MSG_NUM * maxPriority; i++) { for (int i=0; i < MSG_NUM * maxPriority; i++) {
Message msg = sub.receive(10000); Message msg = sub.receive(10000);
assertNull("no duplicate message", dups.put(msg.getJMSMessageID(), subName));
LOG.info("received i=" + i + ", m=" + (msg!=null? LOG.info("received i=" + i + ", m=" + (msg!=null?
msg.getJMSMessageID() + ", priority: " + msg.getJMSPriority() msg.getJMSMessageID() + ", priority: " + msg.getJMSPriority()
: null) ); : null) );

View File

@ -190,6 +190,14 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
assertEquals(sent, listener.count); assertEquals(sent, listener.count);
} }
public void initCombosForTestVerifyAllConsumedAreAcked() throws Exception {
this.addCombinationValues("defaultPersistenceAdapter",
new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
this.addCombinationValues("usePrioritySupport",
new Object[]{ Boolean.TRUE, Boolean.FALSE});
}
public void testVerifyAllConsumedAreAcked() throws Exception { public void testVerifyAllConsumedAreAcked() throws Exception {
// create durable subscription // create durable subscription
Connection con = createConnection(); Connection con = createConnection();
@ -377,6 +385,83 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
assertEquals(sent, listener.count); assertEquals(sent, listener.count);
assertEquals(sent, listener3.count); assertEquals(sent, listener3.count);
} }
public void initCombosForTestInterleavedOfflineSubscriptionCanConsume() throws Exception {
this.addCombinationValues("defaultPersistenceAdapter",
new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
}
public void testInterleavedOfflineSubscriptionCanConsume() throws Exception {
// create durable subscription 1
Connection con = createConnection("cliId1");
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
session.close();
con.close();
// send messages
con = createConnection();
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(null);
int sent = 0;
for (int i = 0; i < 10; i++) {
sent++;
Message message = session.createMessage();
message.setStringProperty("filter", "true");
producer.send(topic, message);
}
Thread.sleep(1 * 1000);
// create durable subscription 2
Connection con2 = createConnection("cliId2");
Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
Listener listener2 = new Listener();
consumer2.setMessageListener(listener2);
assertEquals(0, listener2.count);
session2.close();
con2.close();
// send some more
for (int i = 0; i < 10; i++) {
sent++;
Message message = session.createMessage();
message.setStringProperty("filter", "true");
producer.send(topic, message);
}
Thread.sleep(1 * 1000);
session.close();
con.close();
con2 = createConnection("cliId2");
session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
listener2 = new Listener();
consumer2.setMessageListener(listener2);
// test online subs
Thread.sleep(3 * 1000);
assertEquals(10, listener2.count);
// consume all messages
con = createConnection("cliId1");
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
Listener listener = new Listener();
consumer.setMessageListener(listener);
Thread.sleep(3 * 1000);
session.close();
con.close();
assertEquals("offline consumer got all", sent, listener.count);
}
public static class Listener implements MessageListener { public static class Listener implements MessageListener {
int count = 0; int count = 0;