mirror of https://github.com/apache/activemq.git
fix issue from new test for https://issues.apache.org/activemq/browse/AMQ-2985 - when acking as unmatched, the matching messages that lie inbetwen the sequences need to be added to the ack locations to ensure they don't get deleted when other consumers are done with them. test now enabled for kahaDB
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1033581 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5baf5f5e76
commit
6dea944be6
|
@ -1056,6 +1056,12 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
|
||||
Long prev = sd.subscriptionAcks.put(tx, subscriptionKey, ackSequenceToStore);
|
||||
|
||||
if (ackSequenceToStore != sequence) {
|
||||
// unmatched, need to add ack locations for the intermediate sequences
|
||||
for (long matchedGapSequence = extractSequenceId(prev) + 1; matchedGapSequence < sequence; matchedGapSequence++) {
|
||||
addAckLocation(sd, matchedGapSequence, subscriptionKey);
|
||||
}
|
||||
}
|
||||
// The following method handles deleting un-referenced messages.
|
||||
removeAckLocation(tx, sd, subscriptionKey, extractSequenceId(prev));
|
||||
|
||||
|
|
|
@ -30,6 +30,7 @@ import javax.jms.*;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import java.io.File;
|
||||
import java.util.Vector;
|
||||
|
||||
public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupport {
|
||||
|
||||
|
@ -37,6 +38,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
public Boolean usePrioritySupport = Boolean.TRUE;
|
||||
private BrokerService broker;
|
||||
private ActiveMQTopic topic;
|
||||
private Vector<Exception> exceptions = new Vector<Exception>();
|
||||
|
||||
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
|
||||
return new ActiveMQConnectionFactory("vm://" + getName(true));
|
||||
|
@ -59,6 +61,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
}
|
||||
|
||||
protected void setUp() throws Exception {
|
||||
exceptions.clear();
|
||||
topic = (ActiveMQTopic) createDestination();
|
||||
createBroker();
|
||||
super.setUp();
|
||||
|
@ -389,7 +392,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
|
||||
public void initCombosForTestInterleavedOfflineSubscriptionCanConsume() throws Exception {
|
||||
this.addCombinationValues("defaultPersistenceAdapter",
|
||||
new Object[]{ /*PersistenceAdapterChoice.KahaDB,*/ PersistenceAdapterChoice.JDBC});
|
||||
new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
|
||||
}
|
||||
|
||||
public void testInterleavedOfflineSubscriptionCanConsume() throws Exception {
|
||||
|
@ -463,13 +466,13 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
assertEquals("offline consumer got all", sent, listener.count);
|
||||
}
|
||||
|
||||
public void initCombosForTestMixOfOnLineAndOfflineSubsGetAllMAtched() throws Exception {
|
||||
public void x_initCombosForTestMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception {
|
||||
this.addCombinationValues("defaultPersistenceAdapter",
|
||||
new Object[]{ /* PersistenceAdapterChoice.KahaDB,*/ PersistenceAdapterChoice.JDBC});
|
||||
new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
|
||||
}
|
||||
|
||||
private static String filter = "$a='A1' AND (($b=true AND $c=true) OR ($d='D1' OR $d='D2'))";
|
||||
public void testMixOfOnLineAndOfflineSubsGetAllMAtched() throws Exception {
|
||||
public void testMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception {
|
||||
// create offline subs 1
|
||||
Connection con = createConnection("offCli1");
|
||||
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
@ -549,7 +552,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
con = createConnection("offCli1");
|
||||
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
|
||||
Listener listener = new Listener("offCli1");
|
||||
Listener listener = new FilterCheckListener();
|
||||
consumer.setMessageListener(listener);
|
||||
|
||||
Thread.sleep(3 * 1000);
|
||||
|
@ -562,7 +565,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
Connection con3 = createConnection("offCli2");
|
||||
Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", filter, true);
|
||||
Listener listener3 = new Listener();
|
||||
Listener listener3 = new FilterCheckListener();
|
||||
consumer3.setMessageListener(listener3);
|
||||
|
||||
Thread.sleep(3 * 1000);
|
||||
|
@ -570,6 +573,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
con3.close();
|
||||
|
||||
assertEquals(filtered, listener3.count);
|
||||
assertTrue("no unexpected exceptions: " + exceptions, exceptions.isEmpty());
|
||||
}
|
||||
|
||||
public void testRemovedDurableSubDeletes() throws Exception {
|
||||
|
@ -627,9 +631,31 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
count++;
|
||||
if (id != null) {
|
||||
try {
|
||||
LOG.error(id + ", " + message.getJMSMessageID());
|
||||
LOG.info(id + ", " + message.getJMSMessageID());
|
||||
} catch (Exception ignored) {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public class FilterCheckListener extends Listener {
|
||||
|
||||
public void onMessage(Message message) {
|
||||
count++;
|
||||
|
||||
try {
|
||||
Object b = message.getObjectProperty("$b");
|
||||
if (b != null) {
|
||||
boolean c = message.getBooleanProperty("$c");
|
||||
assertTrue("", c);
|
||||
}
|
||||
else {
|
||||
String d = message.getStringProperty("$d");
|
||||
assertTrue("", "D1".equals(d) || "D2".equals(d));
|
||||
}
|
||||
}
|
||||
catch (JMSException e) {
|
||||
exceptions.add(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue