diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java index 41d1f050e0..32c2d2c325 100755 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java @@ -28,10 +28,15 @@ import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.KahaDBStore; import org.apache.activemq.test.TestSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; + /** * */ @@ -39,6 +44,8 @@ public class DurableConsumerCloseAndReconnectTest extends TestSupport { protected static final long RECEIVE_TIMEOUT = 5000L; private static final Logger LOG = LoggerFactory.getLogger(DurableConsumerCloseAndReconnectTest.class); + BrokerService brokerService; + protected Connection connection; private Session session; private MessageConsumer consumer; @@ -46,28 +53,63 @@ public class DurableConsumerCloseAndReconnectTest extends TestSupport { private Destination destination; private int messageCount; + private String vmConnectorURI; + @Override protected void setUp() throws Exception { + createBroker(); super.setUp(); - deleteAllMessages(); } @Override protected void tearDown() throws Exception { + stopBroker(); super.tearDown(); - deleteAllMessages(); } - private void deleteAllMessages() throws Exception { - ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory("vm://localhost?broker.deleteAllMessagesOnStartup=true"); - Connection dummyConnection = fac.createConnection(); - dummyConnection.start(); - dummyConnection.close(); - } - protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { - return new ActiveMQConnectionFactory("vm://localhost?broker.deleteAllMessagesOnStartup=false"); + return new ActiveMQConnectionFactory(vmConnectorURI); + } + + protected void createBroker() throws Exception { + brokerService = new BrokerService(); + brokerService.setUseJmx(false); + brokerService.setPersistent(false); + KahaDBPersistenceAdapter store = new KahaDBPersistenceAdapter(); + brokerService.setPersistenceAdapter(store); + brokerService.start(); + brokerService.waitUntilStarted(); + vmConnectorURI = brokerService.getVmConnectorURI().toString(); + } + + protected void stopBroker() throws Exception { + brokerService.stop(); + brokerService.waitUntilStopped(); + } + + public void testDurableSubscriberReconnectMultipleTimes() throws Exception { + Connection dummyConnection = createConnection(); + dummyConnection.start(); + + makeConsumer(Session.AUTO_ACKNOWLEDGE); + closeConsumer(); + + publish(30); + + int counter = 1; + for (int i = 0; i < 15; i++) { + makeConsumer(Session.AUTO_ACKNOWLEDGE); + Message message = consumer.receive(RECEIVE_TIMEOUT); + assertTrue("Should have received a message!", message != null); + LOG.info("Received message " + counter++); + message = consumer.receive(RECEIVE_TIMEOUT); + assertTrue("Should have received a message!", message != null); + LOG.info("Received message " + counter++); + closeConsumer(); + } + + dummyConnection.close(); } public void testCreateDurableConsumerCloseThenReconnect() throws Exception { @@ -84,10 +126,11 @@ public class DurableConsumerCloseAndReconnectTest extends TestSupport { } protected void consumeMessagesDeliveredWhileConsumerClosed() throws Exception { + // default to client ack for consumer makeConsumer(); closeConsumer(); - publish(); + publish(1); // wait a few moments for the close to really occur Thread.sleep(1000); @@ -117,7 +160,7 @@ public class DurableConsumerCloseAndReconnectTest extends TestSupport { closeConsumer(); LOG.info("Lets publish one more message now"); - publish(); + publish(1); makeConsumer(); message = consumer.receive(RECEIVE_TIMEOUT); @@ -127,7 +170,7 @@ public class DurableConsumerCloseAndReconnectTest extends TestSupport { closeConsumer(); } - protected void publish() throws Exception { + protected void publish(int numMessages) throws Exception { connection = createConnection(); connection.start(); @@ -136,8 +179,10 @@ public class DurableConsumerCloseAndReconnectTest extends TestSupport { producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); - TextMessage msg = session.createTextMessage("This is a test: " + messageCount++); - producer.send(msg); + for (int i = 0; i < numMessages; i++) { + TextMessage msg = session.createTextMessage("This is a test: " + messageCount++); + producer.send(msg); + } producer.close(); producer = null; @@ -157,6 +202,7 @@ public class DurableConsumerCloseAndReconnectTest extends TestSupport { } protected void closeConsumer() throws JMSException { + LOG.info("Closing the consumer"); consumer.close(); consumer = null; closeSession(); @@ -170,10 +216,14 @@ public class DurableConsumerCloseAndReconnectTest extends TestSupport { } protected void makeConsumer() throws Exception { + makeConsumer(Session.CLIENT_ACKNOWLEDGE); + } + + protected void makeConsumer(int ackMode) throws Exception { String durableName = getName(); String clientID = getSubject(); - LOG.info("Creating a durable subscribe for clientID: " + clientID + " and durable name: " + durableName); - createSession(clientID); + LOG.info("Creating a durable subscriber for clientID: " + clientID + " and durable name: " + durableName); + createSession(clientID, ackMode); consumer = createConsumer(durableName); } @@ -185,12 +235,12 @@ public class DurableConsumerCloseAndReconnectTest extends TestSupport { } } - protected void createSession(String clientID) throws Exception { + protected void createSession(String clientID, int ackMode) throws Exception { connection = createConnection(); connection.setClientID(clientID); connection.start(); - session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + session = connection.createSession(false, ackMode); destination = createDestination(); } }