AMQ-7107 - Make QueueBrowsingTest and UsageBlockedDispatchTest succeed with ConcurrentStoreAndDispachQueues=false

This commit is contained in:
Alan Protasio 2018-11-21 11:01:19 -08:00
parent 8cc0c5ad6c
commit 2eda327aba
4 changed files with 49 additions and 5 deletions

View File

@ -0,0 +1,18 @@
package org.apache.activemq.usecases;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import java.io.File;
import java.io.IOException;
public class QueueBrowsingDisableConcurrentStoreAndDispatchTest extends QueueBrowsingTest {
@Override
public BrokerService createBroker() throws IOException {
BrokerService broker = super.createBroker();
KahaDBPersistenceAdapter kahadb = new KahaDBPersistenceAdapter();
kahadb.setConcurrentStoreAndDispatchQueues(false);
broker.setPersistenceAdapter(kahadb);
return broker;
}
}

View File

@ -201,6 +201,10 @@ public class QueueBrowsingTest {
producer.send(session.createTextMessage(data));
}
//Consume one message to free memory and allow the cursor to pageIn messages
MessageConsumer consumer = session.createConsumer(queue);
consumer.receive(1000);
QueueBrowser browser = session.createBrowser(queue);
Enumeration<?> enumeration = browser.getEnumeration();
int received = 0;

View File

@ -0,0 +1,17 @@
package org.apache.activemq.usecases;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import java.io.IOException;
public class UsageBlockedDispatchConcurrentStoreAndDispatchFalseTest extends UsageBlockedDispatchTest {
@Override
protected BrokerService createBroker() throws IOException {
BrokerService broker = new BrokerService();
KahaDBPersistenceAdapter kahadb = new KahaDBPersistenceAdapter();
kahadb.setConcurrentStoreAndDispatchQueues(false);
broker.setPersistenceAdapter(kahadb);
return broker;
}
}

View File

@ -32,6 +32,7 @@ import org.apache.log4j.spi.LoggingEvent;
import javax.jms.*;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
@ -48,14 +49,13 @@ public class UsageBlockedDispatchTest extends TestSupport {
@Override
public void setUp() throws Exception {
broker = new BrokerService();
broker = createBroker();
broker.setDataDirectory("target" + File.separator + "activemq-data");
broker.setPersistent(true);
broker.setUseJmx(true);
broker.setAdvisorySupport(false);
broker.setDeleteAllMessagesOnStartup(true);
setDefaultPersistenceAdapter(broker);
SystemUsage sysUsage = broker.getSystemUsage();
sysUsage.getMemoryUsage().setLimit(100*1024);
@ -75,6 +75,12 @@ public class UsageBlockedDispatchTest extends TestSupport {
connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
}
protected BrokerService createBroker() throws IOException {
BrokerService broker = new BrokerService();
setDefaultPersistenceAdapter(broker);
return broker;
}
@Override
public void tearDown() throws Exception {
if (broker != null) {
@ -125,8 +131,7 @@ public class UsageBlockedDispatchTest extends TestSupport {
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = consumerSession.createConsumer(willGetAPage);
Message m = consumer.receive(messageReceiveTimeout);
assertNotNull("got a message", m);
consumer.receive(messageReceiveTimeout);
final AtomicBoolean gotExpectedLogEvent = new AtomicBoolean(false);
Appender appender = new DefaultTestAppender() {
@ -144,7 +149,7 @@ public class UsageBlockedDispatchTest extends TestSupport {
MessageConsumer noDispatchConsumer = consumerSession.createConsumer(shouldBeStuckForDispatch);
m = noDispatchConsumer.receive(messageReceiveTimeout);
Message m = noDispatchConsumer.receive(messageReceiveTimeout);
assertNull("did not get a message", m);
assertTrue("Got the new warning about the blocked cursor", gotExpectedLogEvent.get());