resolve regression with offline durable subs and jdbc, if there are no messages in the store and some consumed messages, the durable is not reactivated and subsequent messages can get deleted before they are consumed. Reduce the duplicate replays to a topic store cursor by tracking the last recovered. additional test

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1039108 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-11-25 17:29:02 +00:00
parent 7fd8cc03d3
commit 8191f19672
4 changed files with 108 additions and 16 deletions

View File

@ -20,6 +20,7 @@ import java.io.IOException;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.ActiveMQMessageAudit; import org.apache.activemq.ActiveMQMessageAudit;
@ -43,6 +44,7 @@ import org.apache.commons.logging.LogFactory;
public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore { public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore {
private static final Log LOG = LogFactory.getLog(JDBCTopicMessageStore.class); private static final Log LOG = LogFactory.getLog(JDBCTopicMessageStore.class);
private Map<String, LastRecovered> subscriberLastRecoveredMap = new ConcurrentHashMap<String, LastRecovered>();
public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) { public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) {
super(persistenceAdapter, adapter, wireFormat, topic, audit); super(persistenceAdapter, adapter, wireFormat, topic, audit);
@ -100,17 +102,37 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
} }
} }
private class LastRecovered {
long sequence = 0;
byte priority = 9;
public void update(long sequence, Message msg) {
this.sequence = sequence;
this.priority = msg.getPriority();
}
public String toString() {
return "" + sequence + ":" + priority;
}
}
public synchronized void recoverNextMessages(final String clientId, final String subscriptionName, final int maxReturned, final MessageRecoveryListener listener) public synchronized void recoverNextMessages(final String clientId, final String subscriptionName, final int maxReturned, final MessageRecoveryListener listener)
throws Exception { throws Exception {
TransactionContext c = persistenceAdapter.getTransactionContext(); TransactionContext c = persistenceAdapter.getTransactionContext();
String key = getSubscriptionKey(clientId, subscriptionName);
if (!subscriberLastRecoveredMap.containsKey(key)) {
subscriberLastRecoveredMap.put(key, new LastRecovered());
}
final LastRecovered lastRecovered = subscriberLastRecoveredMap.get(key);
try { try {
JDBCMessageRecoveryListener jdbcListener = new JDBCMessageRecoveryListener() { JDBCMessageRecoveryListener jdbcListener = new JDBCMessageRecoveryListener() {
public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
if (listener.hasSpace()) { if (listener.hasSpace()) {
Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data)); Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
msg.getMessageId().setBrokerSequenceId(sequenceId); msg.getMessageId().setBrokerSequenceId(sequenceId);
if (listener.recoverMessage(msg)) { if (listener.recoverMessage(msg)) {
lastRecovered.update(sequenceId, msg);
return true; return true;
} }
} }
@ -124,10 +146,10 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
}; };
if (isPrioritizedMessages()) { if (isPrioritizedMessages()) {
adapter.doRecoverNextMessagesWithPriority(c, destination, clientId, subscriptionName, adapter.doRecoverNextMessagesWithPriority(c, destination, clientId, subscriptionName,
0, 0, maxReturned, jdbcListener); lastRecovered.sequence, lastRecovered.priority, maxReturned, jdbcListener);
} else { } else {
adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName, adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName,
0, 0, maxReturned, jdbcListener); lastRecovered.sequence, 0, maxReturned, jdbcListener);
} }
} catch (SQLException e) { } catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e); JDBCPersistenceAdapter.log("JDBC Failure: ", e);
@ -137,7 +159,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
} }
public void resetBatching(String clientId, String subscriptionName) { public void resetBatching(String clientId, String subscriptionName) {
// DB always recovers from last ack subscriberLastRecoveredMap.remove(getSubscriptionKey(clientId, subscriptionName));
} }
public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {

View File

@ -261,6 +261,7 @@ public class Statements {
+ getFullAckTableName() + " D " + getFullAckTableName() + " D "
+ " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+ " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID"
+ " AND M.ID > ?"
+ " ORDER BY M.ID"; + " ORDER BY M.ID";
} }
return findDurableSubMessagesStatement; return findDurableSubMessagesStatement;
@ -273,6 +274,7 @@ public class Statements {
+ " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+ " AND M.CONTAINER=D.CONTAINER" + " AND M.CONTAINER=D.CONTAINER"
+ " AND M.PRIORITY=D.PRIORITY AND M.ID > D.LAST_ACKED_ID" + " AND M.PRIORITY=D.PRIORITY AND M.ID > D.LAST_ACKED_ID"
+ " AND ( (M.ID > ?) OR (M.PRIORITY < ?) )"
+ " ORDER BY M.PRIORITY DESC, M.ID"; + " ORDER BY M.PRIORITY DESC, M.ID";
} }
return findDurableSubMessagesByPriorityStatement; return findDurableSubMessagesByPriorityStatement;
@ -343,7 +345,7 @@ public class Statements {
public String getFindAllDestinationsStatement() { public String getFindAllDestinationsStatement() {
if (findAllDestinationsStatement == null) { if (findAllDestinationsStatement == null) {
findAllDestinationsStatement = "SELECT DISTINCT CONTAINER FROM " + getFullMessageTableName(); findAllDestinationsStatement = "SELECT DISTINCT CONTAINER FROM " + getFullAckTableName();
} }
return findAllDestinationsStatement; return findAllDestinationsStatement;
} }

View File

@ -169,10 +169,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
long seq2 = 0; long seq2 = 0;
if (rs.next()) { if (rs.next()) {
seq2 = rs.getLong(1); seq2 = rs.getLong(1);
// if there is no such message, ignore the value
if (this.doGetMessageById(c, seq2) == null) {
seq2 = 0;
}
} }
long seq = Math.max(seq1, seq2); long seq = Math.max(seq1, seq2);
return seq; return seq;
@ -512,6 +508,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
s.setString(1, destination.getQualifiedName()); s.setString(1, destination.getQualifiedName());
s.setString(2, clientId); s.setString(2, clientId);
s.setString(3, subscriptionName); s.setString(3, subscriptionName);
s.setLong(4, seq);
rs = s.executeQuery(); rs = s.executeQuery();
int count = 0; int count = 0;
if (this.statements.isUseExternalMessageReferences()) { if (this.statements.isUseExternalMessageReferences()) {
@ -542,12 +539,12 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
cleanupExclusiveLock.readLock().lock(); cleanupExclusiveLock.readLock().lock();
try { try {
s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesByPriorityStatement()); s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesByPriorityStatement());
// maxRows needs to be twice prefetch as the db will replay all unacked, so inflight messages will
// be returned and suppressed by the cursor audit. It is faster this way.
s.setMaxRows(maxRows); s.setMaxRows(maxRows);
s.setString(1, destination.getQualifiedName()); s.setString(1, destination.getQualifiedName());
s.setString(2, clientId); s.setString(2, clientId);
s.setString(3, subscriptionName); s.setString(3, subscriptionName);
s.setLong(4, seq);
s.setLong(5, priority);
rs = s.executeQuery(); rs = s.executeQuery();
int count = 0; int count = 0;
if (this.statements.isUseExternalMessageReferences()) { if (this.statements.isUseExternalMessageReferences()) {

View File

@ -617,6 +617,11 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
assertEquals(0, listener.count); assertEquals(0, listener.count);
} }
public void initCombosForTestOfflineSubscriptionWithSelectorAfterRestart() throws Exception {
this.addCombinationValues("defaultPersistenceAdapter",
new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
}
public void testOfflineSubscriptionWithSelectorAfterRestart() throws Exception { public void testOfflineSubscriptionWithSelectorAfterRestart() throws Exception {
// create offline subs 1 // create offline subs 1
Connection con = createConnection("offCli1"); Connection con = createConnection("offCli1");
@ -702,6 +707,72 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
assertEquals(filtered, listener3.count); assertEquals(filtered, listener3.count);
} }
public void initCombosForTestOfflineAfterRestart() throws Exception {
this.addCombinationValues("defaultPersistenceAdapter",
new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
}
public void testOfflineSubscriptionAfterRestart() throws Exception {
// create offline subs 1
Connection con = createConnection("offCli1");
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, false);
Listener listener = new Listener();
consumer.setMessageListener(listener);
// send messages
MessageProducer producer = session.createProducer(null);
int sent = 0;
for (int i = 0; i < 10; i++) {
sent++;
Message message = session.createMessage();
message.setStringProperty("filter", "false");
producer.send(topic, message);
}
LOG.info("sent: " + sent);
Thread.sleep(5 * 1000);
session.close();
con.close();
assertEquals(sent, listener.count);
// restart broker
Thread.sleep(3 * 1000);
broker.stop();
createBroker(false /*deleteAllMessages*/);
// send more messages
con = createConnection();
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(null);
for (int i = 0; i < 10; i++) {
sent++;
Message message = session.createMessage();
message.setStringProperty("filter", "false");
producer.send(topic, message);
}
LOG.info("after restart, sent: " + sent);
Thread.sleep(1 * 1000);
session.close();
con.close();
// test offline subs
con = createConnection("offCli1");
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
consumer.setMessageListener(listener);
Thread.sleep(3 * 1000);
session.close();
con.close();
assertEquals(sent, listener.count);
}
public void testInterleavedOfflineSubscriptionCanConsumeAfterUnsub() throws Exception { public void testInterleavedOfflineSubscriptionCanConsumeAfterUnsub() throws Exception {
// create offline subs 1 // create offline subs 1