diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingDisableConcurrentStoreAndDispatchTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingDisableConcurrentStoreAndDispatchTest.java new file mode 100644 index 0000000000..5c7de8e0f7 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingDisableConcurrentStoreAndDispatchTest.java @@ -0,0 +1,18 @@ +package org.apache.activemq.usecases; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; + +import java.io.File; +import java.io.IOException; + +public class QueueBrowsingDisableConcurrentStoreAndDispatchTest extends QueueBrowsingTest { + @Override + public BrokerService createBroker() throws IOException { + BrokerService broker = super.createBroker(); + KahaDBPersistenceAdapter kahadb = new KahaDBPersistenceAdapter(); + kahadb.setConcurrentStoreAndDispatchQueues(false); + broker.setPersistenceAdapter(kahadb); + return broker; + } +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java index 29b6e7297c..19c1cb72f0 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java @@ -201,6 +201,10 @@ public class QueueBrowsingTest { producer.send(session.createTextMessage(data)); } + //Consume one message to free memory and allow the cursor to pageIn messages + MessageConsumer consumer = session.createConsumer(queue); + consumer.receive(1000); + QueueBrowser browser = session.createBrowser(queue); Enumeration enumeration = browser.getEnumeration(); int received = 0; diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/UsageBlockedDispatchConcurrentStoreAndDispatchFalseTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/UsageBlockedDispatchConcurrentStoreAndDispatchFalseTest.java new file mode 100644 index 0000000000..48ff54db8f --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/UsageBlockedDispatchConcurrentStoreAndDispatchFalseTest.java @@ -0,0 +1,17 @@ +package org.apache.activemq.usecases; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; + +import java.io.IOException; + +public class UsageBlockedDispatchConcurrentStoreAndDispatchFalseTest extends UsageBlockedDispatchTest { + @Override + protected BrokerService createBroker() throws IOException { + BrokerService broker = new BrokerService(); + KahaDBPersistenceAdapter kahadb = new KahaDBPersistenceAdapter(); + kahadb.setConcurrentStoreAndDispatchQueues(false); + broker.setPersistenceAdapter(kahadb); + return broker; + } +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/UsageBlockedDispatchTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/UsageBlockedDispatchTest.java index 7767672d6c..dba73c1dc0 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/UsageBlockedDispatchTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/UsageBlockedDispatchTest.java @@ -32,6 +32,7 @@ import org.apache.log4j.spi.LoggingEvent; import javax.jms.*; import java.io.File; +import java.io.IOException; import java.util.ArrayList; import java.util.concurrent.atomic.AtomicBoolean; @@ -48,14 +49,13 @@ public class UsageBlockedDispatchTest extends TestSupport { @Override public void setUp() throws Exception { - broker = new BrokerService(); + broker = createBroker(); broker.setDataDirectory("target" + File.separator + "activemq-data"); broker.setPersistent(true); broker.setUseJmx(true); broker.setAdvisorySupport(false); broker.setDeleteAllMessagesOnStartup(true); - setDefaultPersistenceAdapter(broker); SystemUsage sysUsage = broker.getSystemUsage(); sysUsage.getMemoryUsage().setLimit(100*1024); @@ -75,6 +75,12 @@ public class UsageBlockedDispatchTest extends TestSupport { connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); } + protected BrokerService createBroker() throws IOException { + BrokerService broker = new BrokerService(); + setDefaultPersistenceAdapter(broker); + return broker; + } + @Override public void tearDown() throws Exception { if (broker != null) { @@ -125,8 +131,7 @@ public class UsageBlockedDispatchTest extends TestSupport { Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = consumerSession.createConsumer(willGetAPage); - Message m = consumer.receive(messageReceiveTimeout); - assertNotNull("got a message", m); + consumer.receive(messageReceiveTimeout); final AtomicBoolean gotExpectedLogEvent = new AtomicBoolean(false); Appender appender = new DefaultTestAppender() { @@ -144,7 +149,7 @@ public class UsageBlockedDispatchTest extends TestSupport { MessageConsumer noDispatchConsumer = consumerSession.createConsumer(shouldBeStuckForDispatch); - m = noDispatchConsumer.receive(messageReceiveTimeout); + Message m = noDispatchConsumer.receive(messageReceiveTimeout); assertNull("did not get a message", m); assertTrue("Got the new warning about the blocked cursor", gotExpectedLogEvent.get());