additional offiline test, and enable the jdbc variants - https://issues.apache.org/activemq/browse/AMQ-2985

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1028702 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-10-29 11:46:31 +00:00
parent d650d0c7e1
commit 0c3117aac5
1 changed files with 77 additions and 2 deletions

View File

@ -69,9 +69,13 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
}
private void createBroker() throws Exception {
createBroker(true);
}
private void createBroker(boolean deleteAllMessages) throws Exception {
broker = BrokerFactory.createBroker("broker:(vm://" + getName(true) +")");
broker.setBrokerName(getName(true));
broker.setDeleteAllMessagesOnStartup(true);
broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
broker.getManagementContext().setCreateConnector(false);
if (usePrioritySupport) {
@ -91,7 +95,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
broker.stop();
}
public void x_initCombosForTestConsumeOnlyMatchedMessages() throws Exception {
public void initCombosForTestConsumeOnlyMatchedMessages() throws Exception {
this.addCombinationValues("defaultPersistenceAdapter",
new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
this.addCombinationValues("usePrioritySupport",
@ -290,6 +294,77 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
assertEquals("offline consumer got all", sent, listener.count);
}
public void testOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception {
Connection con = createConnection("offCli1");
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
session.close();
con.close();
con = createConnection("offCli2");
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
session.close();
con.close();
Connection con2 = createConnection("onlineCli1");
Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
Listener listener2 = new Listener();
consumer2.setMessageListener(listener2);
// send messages
con = createConnection();
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(null);
int sent = 0;
for (int i = 0; i < 10; i++) {
sent++;
Message message = session.createMessage();
message.setStringProperty("filter", "true");
producer.send(topic, message);
}
Thread.sleep(1 * 1000);
session.close();
con.close();
// test online subs
Thread.sleep(3 * 1000);
session2.close();
con2.close();
assertEquals(sent, listener2.count);
// restart broker
broker.stop();
createBroker(false /*deleteAllMessages*/);
// test offline
con = createConnection("offCli1");
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
Connection con3 = createConnection("offCli2");
Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
Listener listener = new Listener();
consumer.setMessageListener(listener);
Listener listener3 = new Listener();
consumer3.setMessageListener(listener3);
Thread.sleep(3 * 1000);
session.close();
con.close();
session3.close();
con3.close();
assertEquals(sent, listener.count);
assertEquals(sent, listener3.count);
}
public static class Listener implements MessageListener {
int count = 0;