mirror of https://github.com/apache/activemq.git
Added a test for browsing offline durable subscriber sub
This commit is contained in:
parent
a439a0c6bf
commit
f4f416a093
|
@ -17,10 +17,15 @@
|
||||||
package org.apache.activemq.usecases;
|
package org.apache.activemq.usecases;
|
||||||
|
|
||||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
|
||||||
import org.apache.activemq.command.ActiveMQTopic;
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
import org.apache.activemq.command.MessageId;
|
import org.apache.activemq.command.MessageId;
|
||||||
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
|
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
|
||||||
import org.apache.activemq.store.kahadb.disk.page.PageFile;
|
import org.apache.activemq.store.kahadb.disk.page.PageFile;
|
||||||
|
import org.apache.activemq.transport.vm.VMTransport;
|
||||||
|
import org.apache.activemq.transport.vm.VMTransportFactory;
|
||||||
|
import org.apache.activemq.transport.vm.VMTransportServer;
|
||||||
import org.junit.Ignore;
|
import org.junit.Ignore;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -32,6 +37,8 @@ import javax.jms.Message;
|
||||||
import javax.jms.MessageConsumer;
|
import javax.jms.MessageConsumer;
|
||||||
import javax.jms.MessageProducer;
|
import javax.jms.MessageProducer;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
|
import javax.management.ObjectName;
|
||||||
|
import javax.management.openmbean.CompositeData;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
|
@ -92,6 +99,41 @@ public class DurableSubscriptionOfflineTest extends DurableSubscriptionOfflineTe
|
||||||
assertEquals(sent, listener.count);
|
assertEquals(sent, listener.count);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60 * 1000)
|
||||||
|
public void testBrowseOfflineSub() throws Exception {
|
||||||
|
// create durable subscription
|
||||||
|
Connection con = createConnection();
|
||||||
|
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
session.createDurableSubscriber(topic, "SubsId");
|
||||||
|
session.close();
|
||||||
|
con.close();
|
||||||
|
|
||||||
|
// send messages
|
||||||
|
con = createConnection();
|
||||||
|
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
MessageProducer producer = session.createProducer(null);
|
||||||
|
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
Message message = session.createMessage();
|
||||||
|
message.setStringProperty("filter", "true");
|
||||||
|
producer.send(topic, message);
|
||||||
|
}
|
||||||
|
|
||||||
|
Thread.sleep(1 * 1000);
|
||||||
|
|
||||||
|
session.close();
|
||||||
|
con.close();
|
||||||
|
|
||||||
|
// browse the durable sub
|
||||||
|
ObjectName[] subs = broker.getAdminView().getInactiveDurableTopicSubscribers();
|
||||||
|
assertEquals(1, subs.length);
|
||||||
|
ObjectName subName = subs[0];
|
||||||
|
DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean)
|
||||||
|
broker.getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true);
|
||||||
|
CompositeData[] data = sub.browse();
|
||||||
|
assertNotNull(data);
|
||||||
|
assertEquals(10, data.length);
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 60 * 1000)
|
@Test(timeout = 60 * 1000)
|
||||||
public void testTwoOfflineSubscriptionCanConsume() throws Exception {
|
public void testTwoOfflineSubscriptionCanConsume() throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue