diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java index 83499d8f04..ce6e2954a0 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java @@ -389,7 +389,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp public void initCombosForTestInterleavedOfflineSubscriptionCanConsume() throws Exception { this.addCombinationValues("defaultPersistenceAdapter", - new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC}); + new Object[]{ /*PersistenceAdapterChoice.KahaDB,*/ PersistenceAdapterChoice.JDBC}); } public void testInterleavedOfflineSubscriptionCanConsume() throws Exception { @@ -441,7 +441,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp con2 = createConnection("cliId2"); session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE); consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); - listener2 = new Listener(); + listener2 = new Listener("cliId2"); consumer2.setMessageListener(listener2); // test online subs Thread.sleep(3 * 1000); @@ -452,7 +452,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp con = createConnection("cliId1"); session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); - Listener listener = new Listener(); + Listener listener = new Listener("cliId1"); consumer.setMessageListener(listener); Thread.sleep(3 * 1000); @@ -462,12 +462,174 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp assertEquals("offline consumer got all", sent, listener.count); } - + + public void initCombosForTestMixOfOnLineAndOfflineSubsGetAllMAtched() throws Exception { + this.addCombinationValues("defaultPersistenceAdapter", + new Object[]{ /* PersistenceAdapterChoice.KahaDB,*/ PersistenceAdapterChoice.JDBC}); + } + + private static String filter = "$a='A1' AND (($b=true AND $c=true) OR ($d='D1' OR $d='D2'))"; + public void testMixOfOnLineAndOfflineSubsGetAllMAtched() throws Exception { + // create offline subs 1 + Connection con = createConnection("offCli1"); + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createDurableSubscriber(topic, "SubsId", filter, true); + session.close(); + con.close(); + + // create offline subs 2 + con = createConnection("offCli2"); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createDurableSubscriber(topic, "SubsId", filter, true); + session.close(); + con.close(); + + // create online subs + Connection con2 = createConnection("onlineCli1"); + Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", filter, true); + Listener listener2 = new Listener(); + consumer2.setMessageListener(listener2); + + // create non-durable consumer + Connection con4 = createConnection("nondurableCli"); + Session session4 = con4.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer4 = session4.createConsumer(topic, filter, true); + Listener listener4 = new Listener(); + consumer4.setMessageListener(listener4); + + // send messages + con = createConnection(); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(null); + + boolean hasRelevant = false; + int filtered = 0; + for (int i = 0; i < 100; i++) { + int postf = (int) (Math.random() * 9) + 1; + String d = "D" + postf; + + if ("D1".equals(d) || "D2".equals(d)) { + hasRelevant = true; + filtered++; + } + + Message message = session.createMessage(); + message.setStringProperty("$a", "A1"); + message.setStringProperty("$d", d); + producer.send(topic, message); + } + + Message message = session.createMessage(); + message.setStringProperty("$a", "A1"); + message.setBooleanProperty("$b", true); + message.setBooleanProperty("$c", hasRelevant); + producer.send(topic, message); + + if (hasRelevant) + filtered++; + + Thread.sleep(1 * 1000); + session.close(); + con.close(); + + Thread.sleep(3 * 1000); + + // test non-durable consumer + session4.close(); + con4.close(); + assertEquals(filtered, listener4.count); // succeeded! + + // test online subs + session2.close(); + con2.close(); + assertEquals(filtered, listener2.count); // succeeded! + + // test offline 1 + con = createConnection("offCli1"); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", filter, true); + Listener listener = new Listener("offCli1"); + consumer.setMessageListener(listener); + + Thread.sleep(3 * 1000); + session.close(); + con.close(); + + assertEquals(filtered, listener.count); + + // test offline 2 + Connection con3 = createConnection("offCli2"); + Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", filter, true); + Listener listener3 = new Listener(); + consumer3.setMessageListener(listener3); + + Thread.sleep(3 * 1000); + session3.close(); + con3.close(); + + assertEquals(filtered, listener3.count); + } + + public void testRemovedDurableSubDeletes() throws Exception { + // create durable subscription 1 + Connection con = createConnection("cliId1"); + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); + session.close(); + con.close(); + + // send messages + con = createConnection(); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(null); + + int sent = 0; + for (int i = 0; i < 10; i++) { + sent++; + Message message = session.createMessage(); + message.setStringProperty("filter", "true"); + producer.send(topic, message); + } + + Thread.sleep(1 * 1000); + + Connection con2 = createConnection("cliId1"); + Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE); + session2.unsubscribe("SubsId"); + session2.close(); + con2.close(); + + // see if retroactive can consumer any + topic = new ActiveMQTopic(topic.getPhysicalName() + "?consumer.retroactive=true"); + con = createConnection("offCli2"); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", filter, true); + Listener listener = new Listener(); + consumer.setMessageListener(listener); + session.close(); + con.close(); + assertEquals(0, listener.count); + } + + public static class Listener implements MessageListener { int count = 0; + String id = null; + Listener() { + } + Listener(String id) { + this.id = id; + } public void onMessage(Message message) { count++; + if (id != null) { + try { + LOG.error(id + ", " + message.getJMSMessageID()); + } catch (Exception ignored) {} + } } } }