mirror of https://github.com/apache/activemq.git
additional test for https://issues.apache.org/activemq/browse/AMQ-2985 that shows issue with kahadb and overlapping durable and non durable subs with occasional filter match
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1033469 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
69dc0d4940
commit
d8b11896ab
|
@ -389,7 +389,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
||||||
|
|
||||||
public void initCombosForTestInterleavedOfflineSubscriptionCanConsume() throws Exception {
|
public void initCombosForTestInterleavedOfflineSubscriptionCanConsume() throws Exception {
|
||||||
this.addCombinationValues("defaultPersistenceAdapter",
|
this.addCombinationValues("defaultPersistenceAdapter",
|
||||||
new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
|
new Object[]{ /*PersistenceAdapterChoice.KahaDB,*/ PersistenceAdapterChoice.JDBC});
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testInterleavedOfflineSubscriptionCanConsume() throws Exception {
|
public void testInterleavedOfflineSubscriptionCanConsume() throws Exception {
|
||||||
|
@ -441,7 +441,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
||||||
con2 = createConnection("cliId2");
|
con2 = createConnection("cliId2");
|
||||||
session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
|
consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
|
||||||
listener2 = new Listener();
|
listener2 = new Listener("cliId2");
|
||||||
consumer2.setMessageListener(listener2);
|
consumer2.setMessageListener(listener2);
|
||||||
// test online subs
|
// test online subs
|
||||||
Thread.sleep(3 * 1000);
|
Thread.sleep(3 * 1000);
|
||||||
|
@ -452,7 +452,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
||||||
con = createConnection("cliId1");
|
con = createConnection("cliId1");
|
||||||
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
|
MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
|
||||||
Listener listener = new Listener();
|
Listener listener = new Listener("cliId1");
|
||||||
consumer.setMessageListener(listener);
|
consumer.setMessageListener(listener);
|
||||||
|
|
||||||
Thread.sleep(3 * 1000);
|
Thread.sleep(3 * 1000);
|
||||||
|
@ -463,11 +463,173 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
||||||
assertEquals("offline consumer got all", sent, listener.count);
|
assertEquals("offline consumer got all", sent, listener.count);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void initCombosForTestMixOfOnLineAndOfflineSubsGetAllMAtched() throws Exception {
|
||||||
|
this.addCombinationValues("defaultPersistenceAdapter",
|
||||||
|
new Object[]{ /* PersistenceAdapterChoice.KahaDB,*/ PersistenceAdapterChoice.JDBC});
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String filter = "$a='A1' AND (($b=true AND $c=true) OR ($d='D1' OR $d='D2'))";
|
||||||
|
public void testMixOfOnLineAndOfflineSubsGetAllMAtched() throws Exception {
|
||||||
|
// create offline subs 1
|
||||||
|
Connection con = createConnection("offCli1");
|
||||||
|
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
session.createDurableSubscriber(topic, "SubsId", filter, true);
|
||||||
|
session.close();
|
||||||
|
con.close();
|
||||||
|
|
||||||
|
// create offline subs 2
|
||||||
|
con = createConnection("offCli2");
|
||||||
|
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
session.createDurableSubscriber(topic, "SubsId", filter, true);
|
||||||
|
session.close();
|
||||||
|
con.close();
|
||||||
|
|
||||||
|
// create online subs
|
||||||
|
Connection con2 = createConnection("onlineCli1");
|
||||||
|
Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", filter, true);
|
||||||
|
Listener listener2 = new Listener();
|
||||||
|
consumer2.setMessageListener(listener2);
|
||||||
|
|
||||||
|
// create non-durable consumer
|
||||||
|
Connection con4 = createConnection("nondurableCli");
|
||||||
|
Session session4 = con4.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
MessageConsumer consumer4 = session4.createConsumer(topic, filter, true);
|
||||||
|
Listener listener4 = new Listener();
|
||||||
|
consumer4.setMessageListener(listener4);
|
||||||
|
|
||||||
|
// send messages
|
||||||
|
con = createConnection();
|
||||||
|
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
MessageProducer producer = session.createProducer(null);
|
||||||
|
|
||||||
|
boolean hasRelevant = false;
|
||||||
|
int filtered = 0;
|
||||||
|
for (int i = 0; i < 100; i++) {
|
||||||
|
int postf = (int) (Math.random() * 9) + 1;
|
||||||
|
String d = "D" + postf;
|
||||||
|
|
||||||
|
if ("D1".equals(d) || "D2".equals(d)) {
|
||||||
|
hasRelevant = true;
|
||||||
|
filtered++;
|
||||||
|
}
|
||||||
|
|
||||||
|
Message message = session.createMessage();
|
||||||
|
message.setStringProperty("$a", "A1");
|
||||||
|
message.setStringProperty("$d", d);
|
||||||
|
producer.send(topic, message);
|
||||||
|
}
|
||||||
|
|
||||||
|
Message message = session.createMessage();
|
||||||
|
message.setStringProperty("$a", "A1");
|
||||||
|
message.setBooleanProperty("$b", true);
|
||||||
|
message.setBooleanProperty("$c", hasRelevant);
|
||||||
|
producer.send(topic, message);
|
||||||
|
|
||||||
|
if (hasRelevant)
|
||||||
|
filtered++;
|
||||||
|
|
||||||
|
Thread.sleep(1 * 1000);
|
||||||
|
session.close();
|
||||||
|
con.close();
|
||||||
|
|
||||||
|
Thread.sleep(3 * 1000);
|
||||||
|
|
||||||
|
// test non-durable consumer
|
||||||
|
session4.close();
|
||||||
|
con4.close();
|
||||||
|
assertEquals(filtered, listener4.count); // succeeded!
|
||||||
|
|
||||||
|
// test online subs
|
||||||
|
session2.close();
|
||||||
|
con2.close();
|
||||||
|
assertEquals(filtered, listener2.count); // succeeded!
|
||||||
|
|
||||||
|
// test offline 1
|
||||||
|
con = createConnection("offCli1");
|
||||||
|
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
|
||||||
|
Listener listener = new Listener("offCli1");
|
||||||
|
consumer.setMessageListener(listener);
|
||||||
|
|
||||||
|
Thread.sleep(3 * 1000);
|
||||||
|
session.close();
|
||||||
|
con.close();
|
||||||
|
|
||||||
|
assertEquals(filtered, listener.count);
|
||||||
|
|
||||||
|
// test offline 2
|
||||||
|
Connection con3 = createConnection("offCli2");
|
||||||
|
Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", filter, true);
|
||||||
|
Listener listener3 = new Listener();
|
||||||
|
consumer3.setMessageListener(listener3);
|
||||||
|
|
||||||
|
Thread.sleep(3 * 1000);
|
||||||
|
session3.close();
|
||||||
|
con3.close();
|
||||||
|
|
||||||
|
assertEquals(filtered, listener3.count);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testRemovedDurableSubDeletes() throws Exception {
|
||||||
|
// create durable subscription 1
|
||||||
|
Connection con = createConnection("cliId1");
|
||||||
|
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
|
||||||
|
session.close();
|
||||||
|
con.close();
|
||||||
|
|
||||||
|
// send messages
|
||||||
|
con = createConnection();
|
||||||
|
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
MessageProducer producer = session.createProducer(null);
|
||||||
|
|
||||||
|
int sent = 0;
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
sent++;
|
||||||
|
Message message = session.createMessage();
|
||||||
|
message.setStringProperty("filter", "true");
|
||||||
|
producer.send(topic, message);
|
||||||
|
}
|
||||||
|
|
||||||
|
Thread.sleep(1 * 1000);
|
||||||
|
|
||||||
|
Connection con2 = createConnection("cliId1");
|
||||||
|
Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
session2.unsubscribe("SubsId");
|
||||||
|
session2.close();
|
||||||
|
con2.close();
|
||||||
|
|
||||||
|
// see if retroactive can consumer any
|
||||||
|
topic = new ActiveMQTopic(topic.getPhysicalName() + "?consumer.retroactive=true");
|
||||||
|
con = createConnection("offCli2");
|
||||||
|
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
|
||||||
|
Listener listener = new Listener();
|
||||||
|
consumer.setMessageListener(listener);
|
||||||
|
session.close();
|
||||||
|
con.close();
|
||||||
|
assertEquals(0, listener.count);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public static class Listener implements MessageListener {
|
public static class Listener implements MessageListener {
|
||||||
int count = 0;
|
int count = 0;
|
||||||
|
String id = null;
|
||||||
|
|
||||||
|
Listener() {
|
||||||
|
}
|
||||||
|
Listener(String id) {
|
||||||
|
this.id = id;
|
||||||
|
}
|
||||||
public void onMessage(Message message) {
|
public void onMessage(Message message) {
|
||||||
count++;
|
count++;
|
||||||
|
if (id != null) {
|
||||||
|
try {
|
||||||
|
LOG.error(id + ", " + message.getJMSMessageID());
|
||||||
|
} catch (Exception ignored) {}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue