https://issues.apache.org/jira/browse/AMQ-3998 https://issues.apache.org/jira/browse/AMQ-3999 - resolve emergent problems with retroactive durables and use of cache when active durable disconnects. resolves unit test failures and add some more

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1380547 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2012-09-04 11:02:17 +00:00
parent ad5e969bb1
commit eaac0d2eee
7 changed files with 50 additions and 21 deletions

View File

@ -61,9 +61,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
}
protected void resetSize() {
if (isStarted()) {
this.size = getStoreSize();
}
this.storeHasMessages=this.size > 0;
}

View File

@ -123,6 +123,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
tsp.setEnableAudit(isEnableAudit());
tsp.setMemoryUsageHighWaterMark(getMemoryUsageHighWaterMark());
tsp.setUseCache(isUseCache());
tsp.setCacheEnabled(isUseCache() && tsp.isEmpty());
topics.put(destination, tsp);
storePrefetches.add(tsp);
if (isStarted()) {

View File

@ -52,6 +52,7 @@ class TopicStorePrefetch extends AbstractStoreCursor {
this.subscriberName = subscriberName;
this.maxProducersToAudit=32;
this.maxAuditDepth=10000;
resetSize();
}
public boolean recoverMessageReference(MessageId messageReference) throws Exception {

View File

@ -1278,7 +1278,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
if (!command.getRetroactive()) {
ackLocation = sd.orderIndex.nextMessageId-1;
} else {
addAckLocationForRetroactiveSub(tx, sd, ackLocation, subscriptionKey);
addAckLocationForRetroactiveSub(tx, sd, subscriptionKey);
}
sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation));
sd.subscriptionCache.add(subscriptionKey);
@ -1857,23 +1857,22 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
// new sub is interested in potentially all existing messages
private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
if (sequences == null) {
sequences = new SequenceSet();
sequences.add(messageSequence);
sd.ackPositions.add(tx, subscriptionKey, sequences);
} else {
sequences.add(messageSequence);
sd.ackPositions.put(tx, subscriptionKey, sequences);
private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
SequenceSet allOutstanding = new SequenceSet();
Iterator<Map.Entry<String, SequenceSet>> iterator = sd.ackPositions.iterator(tx);
while (iterator.hasNext()) {
SequenceSet set = iterator.next().getValue();
for (Long entry : set) {
allOutstanding.add(entry);
}
}
sd.ackPositions.put(tx, subscriptionKey, allOutstanding);
Long count = sd.messageReferences.get(messageSequence);
if (count == null) {
count = Long.valueOf(0L);
}
for (Long ackPosition : allOutstanding) {
Long count = sd.messageReferences.get(ackPosition);
count = count.longValue() + 1;
sd.messageReferences.put(messageSequence, count);
sd.messageReferences.put(ackPosition, count);
}
}
// on a new message add, all existing subs are interested in this message

View File

@ -867,7 +867,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
producer.send(topic, message);
}
LOG.info("after restart, sent: " + filtered);
LOG.info("after restart, total sent with filter='true': " + filtered);
Thread.sleep(1 * 1000);
session.close();
con.close();
@ -876,7 +876,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
con = createConnection("offCli1");
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
Listener listener = new Listener();
Listener listener = new Listener("1>");
consumer.setMessageListener(listener);
Connection con3 = createConnection("offCli2");

View File

@ -283,6 +283,36 @@ public abstract class DurableSubscriptionTestSupport extends TestSupport {
assertNull(consumer.receive(5000));
}
public void testDurableSubscriptionRetroactive() throws Exception {
// Create the durable sub.
connection.start();
session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("TestTopic?consumer.retroactive=true");
consumer = session.createDurableSubscriber(topic, "sub1");
connection.close();
// Produce
connection = createConnection();
connection.start();
session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
producer.send(session.createTextMessage("Msg:1"));
restartBroker();
// connect second durable to pick up retroactive message
connection.start();
session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
consumer = session.createDurableSubscriber(topic, "sub2");
// Try to get the message.
assertTextMessageEquals("Msg:1", consumer.receive(5000));
assertNull(consumer.receive(2000));
}
public void testDurableSubscriptionRollbackRedeliver() throws Exception {
// Create the durable sub.

View File

@ -52,7 +52,7 @@ public class Sequence extends LinkedNode<Sequence> {
@Override
public String toString() {
return first == last ? "" + first : first + "-" + last;
return first == last ? "" + first : first + ".." + last;
}
public long getFirst() {