mirror of https://github.com/apache/activemq.git
test code updates to address: https://issues.apache.org/jira/browse/AMQ-4415
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1462252 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b52c371cd9
commit
f7f294f059
|
@ -16,44 +16,65 @@
|
|||
*/
|
||||
package org.apache.activemq.broker;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import javax.management.ObjectName;
|
||||
|
||||
import junit.framework.Test;
|
||||
|
||||
import org.apache.activemq.TestSupport;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.util.JMXSupport;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@RunWith(value = Parameterized.class)
|
||||
public class QueueMbeanRestartTest extends TestSupport {
|
||||
private static final transient Logger LOG = LoggerFactory.getLogger(QueueMbeanRestartTest.class);
|
||||
|
||||
BrokerService broker;
|
||||
|
||||
public static Test suite() {
|
||||
return suite(QueueMbeanRestartTest.class);
|
||||
private final TestSupport.PersistenceAdapterChoice persistenceAdapterChoice;
|
||||
|
||||
@Parameterized.Parameters
|
||||
public static Collection<TestSupport.PersistenceAdapterChoice[]> getTestParameters() {
|
||||
TestSupport.PersistenceAdapterChoice[] kahaDb = {TestSupport.PersistenceAdapterChoice.KahaDB};
|
||||
TestSupport.PersistenceAdapterChoice[] levelDb = {TestSupport.PersistenceAdapterChoice.LevelDB};
|
||||
TestSupport.PersistenceAdapterChoice[] jdbc = {TestSupport.PersistenceAdapterChoice.JDBC};
|
||||
List<TestSupport.PersistenceAdapterChoice[]> choices = new ArrayList<TestSupport.PersistenceAdapterChoice[]>();
|
||||
choices.add(kahaDb);
|
||||
choices.add(levelDb);
|
||||
choices.add(jdbc);
|
||||
|
||||
return choices;
|
||||
}
|
||||
|
||||
public QueueMbeanRestartTest(TestSupport.PersistenceAdapterChoice choice) {
|
||||
this.persistenceAdapterChoice = choice;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
topic = false;
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
@Override
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
broker.stop();
|
||||
}
|
||||
|
||||
public void initCombosForTestMBeanPresenceOnRestart() {
|
||||
addCombinationValues("defaultPersistenceAdapter",
|
||||
new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testMBeanPresenceOnRestart() throws Exception {
|
||||
createBroker(true);
|
||||
|
||||
|
@ -95,7 +116,7 @@ public class QueueMbeanRestartTest extends TestSupport {
|
|||
|
||||
private void createBroker(boolean deleteAll) throws Exception {
|
||||
broker = new BrokerService();
|
||||
setDefaultPersistenceAdapter(broker);
|
||||
setPersistenceAdapter(broker, persistenceAdapterChoice);
|
||||
|
||||
broker.setDeleteAllMessagesOnStartup(deleteAll);
|
||||
broker.start();
|
||||
|
|
|
@ -16,6 +16,9 @@
|
|||
*/
|
||||
package org.apache.activemq.bugs;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -27,10 +30,9 @@ import javax.jms.MessageListener;
|
|||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
|
||||
import junit.framework.Test;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnection;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.TestSupport;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.jmx.BrokerView;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
|
@ -38,9 +40,15 @@ import org.apache.activemq.command.ActiveMQTopic;
|
|||
import org.apache.activemq.store.PersistenceAdapter;
|
||||
import org.apache.activemq.util.IntrospectionSupport;
|
||||
import org.apache.activemq.util.Wait;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@RunWith(value = Parameterized.class)
|
||||
public class AMQ2584Test extends org.apache.activemq.TestSupport {
|
||||
|
||||
static final Logger LOG = LoggerFactory.getLogger(AMQ2584Test.class);
|
||||
|
@ -53,18 +61,24 @@ public class AMQ2584Test extends org.apache.activemq.TestSupport {
|
|||
final int minPercentUsageForStore = 10;
|
||||
String data;
|
||||
|
||||
public static Test suite() {
|
||||
return suite(AMQ2584Test.class);
|
||||
private final TestSupport.PersistenceAdapterChoice persistenceAdapterChoice;
|
||||
|
||||
@Parameterized.Parameters
|
||||
public static Collection<TestSupport.PersistenceAdapterChoice[]> getTestParameters() {
|
||||
TestSupport.PersistenceAdapterChoice[] kahaDb = {TestSupport.PersistenceAdapterChoice.KahaDB};
|
||||
TestSupport.PersistenceAdapterChoice[] levelDb = {TestSupport.PersistenceAdapterChoice.LevelDB};
|
||||
List<TestSupport.PersistenceAdapterChoice[]> choices = new ArrayList<TestSupport.PersistenceAdapterChoice[]>();
|
||||
choices.add(kahaDb);
|
||||
choices.add(levelDb);
|
||||
|
||||
return choices;
|
||||
}
|
||||
|
||||
public void initCombosForTestSize() throws Exception {
|
||||
this.addCombinationValues("defaultPersistenceAdapter",
|
||||
new Object[]{
|
||||
PersistenceAdapterChoice.LevelDB,
|
||||
PersistenceAdapterChoice.KahaDB
|
||||
});
|
||||
public AMQ2584Test(TestSupport.PersistenceAdapterChoice choice) {
|
||||
this.persistenceAdapterChoice = choice;
|
||||
}
|
||||
|
||||
@Test(timeout = 120000)
|
||||
public void testSize() throws Exception {
|
||||
CountDownLatch redeliveryConsumerLatch = new CountDownLatch(15000 -1);
|
||||
openConsumer(redeliveryConsumerLatch);
|
||||
|
@ -167,7 +181,8 @@ public class AMQ2584Test extends org.apache.activemq.TestSupport {
|
|||
if (deleteMessages) {
|
||||
broker.setDeleteAllMessagesOnStartup(true);
|
||||
}
|
||||
setDefaultPersistenceAdapter(broker);
|
||||
LOG.info("Starting broker with persistenceAdapterChoice " + persistenceAdapterChoice.toString());
|
||||
setPersistenceAdapter(broker, persistenceAdapterChoice);
|
||||
configurePersistenceAdapter(broker.getPersistenceAdapter());
|
||||
broker.getSystemUsage().getStoreUsage().setLimit(200 * 1000 * 1000);
|
||||
broker.start();
|
||||
|
@ -196,9 +211,8 @@ public class AMQ2584Test extends org.apache.activemq.TestSupport {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void setUp() throws Exception {
|
||||
super.setUp();
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
StringBuilder sb = new StringBuilder(5000);
|
||||
for (int i = 0; i < 5000; i++) {
|
||||
sb.append('a');
|
||||
|
@ -210,8 +224,8 @@ public class AMQ2584Test extends org.apache.activemq.TestSupport {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void tearDown() throws Exception {
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
stopBroker();
|
||||
super.tearDown();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,14 +16,18 @@
|
|||
*/
|
||||
package org.apache.activemq.bugs;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageListener;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TopicSubscriber;
|
||||
import junit.framework.Test;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnection;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
|
@ -32,10 +36,16 @@ import org.apache.activemq.command.ActiveMQTopic;
|
|||
import org.apache.activemq.store.PersistenceAdapter;
|
||||
import org.apache.activemq.util.IntrospectionSupport;
|
||||
import org.apache.activemq.util.Wait;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class AMQ2870Test extends org.apache.activemq.TestSupport {
|
||||
@RunWith(value = Parameterized.class)
|
||||
public class AMQ2870Test extends org.apache.activemq.TestSupport {
|
||||
|
||||
static final Logger LOG = LoggerFactory.getLogger(AMQ2870Test.class);
|
||||
BrokerService broker = null;
|
||||
|
@ -47,15 +57,28 @@ public class AMQ2870Test extends org.apache.activemq.TestSupport {
|
|||
final int minPercentUsageForStore = 10;
|
||||
String data;
|
||||
|
||||
public static Test suite() {
|
||||
return suite(AMQ2870Test.class);
|
||||
private final PersistenceAdapterChoice persistenceAdapterChoice;
|
||||
|
||||
@Parameterized.Parameters
|
||||
public static Collection<PersistenceAdapterChoice[]> getTestParameters() {
|
||||
String osName = System.getProperty("os.name");
|
||||
LOG.info("Running on [" + osName + "]");
|
||||
PersistenceAdapterChoice[] kahaDb = {PersistenceAdapterChoice.KahaDB};
|
||||
PersistenceAdapterChoice[] levelDb = {PersistenceAdapterChoice.LevelDB};
|
||||
List<PersistenceAdapterChoice[]> choices = new ArrayList<PersistenceAdapterChoice[]>();
|
||||
choices.add(kahaDb);
|
||||
if (!osName.equalsIgnoreCase("AIX") && !osName.equalsIgnoreCase("SunOS")) {
|
||||
choices.add(levelDb);
|
||||
}
|
||||
|
||||
return choices;
|
||||
}
|
||||
|
||||
public void initCombosForTestSize() throws Exception {
|
||||
this.addCombinationValues("defaultPersistenceAdapter",
|
||||
new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB});
|
||||
|
||||
public AMQ2870Test(PersistenceAdapterChoice choice) {
|
||||
this.persistenceAdapterChoice = choice;
|
||||
}
|
||||
|
||||
|
||||
@Test(timeout = 300000)
|
||||
public void testSize() throws Exception {
|
||||
openConsumer();
|
||||
|
||||
|
@ -70,6 +93,7 @@ public class AMQ2870Test extends org.apache.activemq.TestSupport {
|
|||
// wait for reclaim
|
||||
assertTrue("in range with consumer",
|
||||
Wait.waitFor(new Wait.Condition() {
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
// usage percent updated only on send check for isFull so once
|
||||
// sends complete it is no longer updated till next send via a call to isFull
|
||||
|
@ -80,11 +104,11 @@ public class AMQ2870Test extends org.apache.activemq.TestSupport {
|
|||
}
|
||||
}));
|
||||
|
||||
|
||||
closeConsumer();
|
||||
|
||||
assertTrue("in range with closed consumer",
|
||||
Wait.waitFor(new Wait.Condition() {
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
broker.getSystemUsage().getStoreUsage().isFull();
|
||||
LOG.info("store precent usage: "+brokerView.getStorePercentUsage());
|
||||
|
@ -101,13 +125,13 @@ public class AMQ2870Test extends org.apache.activemq.TestSupport {
|
|||
|
||||
assertTrue("in range after send with consumer",
|
||||
Wait.waitFor(new Wait.Condition() {
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
broker.getSystemUsage().getStoreUsage().isFull();
|
||||
LOG.info("store precent usage: "+brokerView.getStorePercentUsage());
|
||||
LOG.info("store precent usage: "+brokerView.getStorePercentUsage());
|
||||
return broker.getAdminView().getStorePercentUsage() < minPercentUsageForStore;
|
||||
}
|
||||
}));
|
||||
|
||||
}
|
||||
|
||||
private void openConsumer() throws Exception {
|
||||
|
@ -118,6 +142,7 @@ public class AMQ2870Test extends org.apache.activemq.TestSupport {
|
|||
TopicSubscriber subscriber = session.createDurableSubscriber(topic, "subName", "filter=true", false);
|
||||
|
||||
subscriber.setMessageListener(new MessageListener() {
|
||||
@Override
|
||||
public void onMessage(Message message) {
|
||||
// received++;
|
||||
}
|
||||
|
@ -152,7 +177,8 @@ public class AMQ2870Test extends org.apache.activemq.TestSupport {
|
|||
if (deleteMessages) {
|
||||
broker.setDeleteAllMessagesOnStartup(true);
|
||||
}
|
||||
setDefaultPersistenceAdapter(broker);
|
||||
LOG.info("Starting broker with persistenceAdapterChoice " + persistenceAdapterChoice.toString());
|
||||
setPersistenceAdapter(broker, persistenceAdapterChoice);
|
||||
configurePersistenceAdapter(broker.getPersistenceAdapter());
|
||||
broker.getSystemUsage().getStoreUsage().setLimit(100 * 1000 * 1000);
|
||||
broker.start();
|
||||
|
@ -165,7 +191,7 @@ public class AMQ2870Test extends org.apache.activemq.TestSupport {
|
|||
properties.put("maxFileLength", maxFileLengthVal);
|
||||
properties.put("cleanupInterval", "2000");
|
||||
properties.put("checkpointInterval", "2000");
|
||||
|
||||
|
||||
// leveldb
|
||||
properties.put("logSize", maxFileLengthVal);
|
||||
|
||||
|
@ -178,14 +204,14 @@ public class AMQ2870Test extends org.apache.activemq.TestSupport {
|
|||
broker = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
|
||||
return new ActiveMQConnectionFactory("vm://testStoreSize?jms.watchTopicAdvisories=false&waitForStart=5000&create=false");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setUp() throws Exception {
|
||||
super.setUp();
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
StringBuilder sb = new StringBuilder(5000);
|
||||
for (int i = 0; i < 5000; i++) {
|
||||
sb.append('a');
|
||||
|
@ -197,8 +223,8 @@ public class AMQ2870Test extends org.apache.activemq.TestSupport {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void tearDown() throws Exception {
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
stopBroker();
|
||||
super.tearDown();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -71,7 +71,7 @@ public class FailoverTransactionTest extends TestSupport {
|
|||
}
|
||||
|
||||
public void setUp() throws Exception {
|
||||
super.setMaxTestTime(20 * 60 * 1000); // some boxes can be real slow
|
||||
super.setMaxTestTime(2 * 60 * 1000); // some boxes can be real slow
|
||||
super.setAutoFail(true);
|
||||
super.setUp();
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.activemq.usecases;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
|
@ -29,6 +30,7 @@ import java.util.concurrent.Executors;
|
|||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.DeliveryMode;
|
||||
|
@ -41,36 +43,54 @@ import javax.jms.MessageProducer;
|
|||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
import javax.jms.TopicSubscriber;
|
||||
import junit.framework.Test;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.ActiveMQPrefetchPolicy;
|
||||
import org.apache.activemq.TestSupport;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
//import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
|
||||
import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy;
|
||||
import org.apache.activemq.command.MessageId;
|
||||
import org.apache.activemq.util.MessageIdList;
|
||||
import org.apache.activemq.util.Wait;
|
||||
//import org.apache.commons.dbcp.BasicDataSource;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@RunWith(value = Parameterized.class)
|
||||
public class ConcurrentProducerDurableConsumerTest extends TestSupport {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ConcurrentProducerDurableConsumerTest.class);
|
||||
private int consumerCount = 5;
|
||||
private final int consumerCount = 5;
|
||||
BrokerService broker;
|
||||
protected List<Connection> connections = Collections.synchronizedList(new ArrayList<Connection>());
|
||||
protected Map<MessageConsumer, TimedMessageListener> consumers = new HashMap<MessageConsumer, TimedMessageListener>();
|
||||
protected MessageIdList allMessagesList = new MessageIdList();
|
||||
private int messageSize = 1024;
|
||||
private final int messageSize = 1024;
|
||||
|
||||
public void initCombosForTestSendRateWithActivatingConsumers() throws Exception {
|
||||
addCombinationValues("defaultPersistenceAdapter",
|
||||
new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.MEM});
|
||||
private final TestSupport.PersistenceAdapterChoice persistenceAdapterChoice;
|
||||
|
||||
@Parameterized.Parameters
|
||||
public static Collection<TestSupport.PersistenceAdapterChoice[]> getTestParameters() {
|
||||
TestSupport.PersistenceAdapterChoice[] kahaDb = {TestSupport.PersistenceAdapterChoice.KahaDB};
|
||||
TestSupport.PersistenceAdapterChoice[] levelDb = {TestSupport.PersistenceAdapterChoice.LevelDB};
|
||||
TestSupport.PersistenceAdapterChoice[] mem = {TestSupport.PersistenceAdapterChoice.MEM};
|
||||
List<TestSupport.PersistenceAdapterChoice[]> choices = new ArrayList<TestSupport.PersistenceAdapterChoice[]>();
|
||||
choices.add(kahaDb);
|
||||
choices.add(levelDb);
|
||||
choices.add(mem);
|
||||
return choices;
|
||||
}
|
||||
|
||||
public ConcurrentProducerDurableConsumerTest(TestSupport.PersistenceAdapterChoice choice) {
|
||||
this.persistenceAdapterChoice = choice;
|
||||
}
|
||||
|
||||
@Test(timeout = 120000)
|
||||
public void testSendRateWithActivatingConsumers() throws Exception {
|
||||
final Destination destination = createDestination();
|
||||
final ConnectionFactory factory = createConnectionFactory();
|
||||
|
@ -115,7 +135,6 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport {
|
|||
}
|
||||
});
|
||||
|
||||
|
||||
double[] statsWithActive = produceMessages(destination, 500, 10, session, producer, addConsumerSignal);
|
||||
|
||||
LOG.info(" with concurrent activate, ave: " + statsWithActive[1] + ", max: " + statsWithActive[0] + ", multiplier: " + (statsWithActive[0]/ statsWithActive[1]));
|
||||
|
@ -150,12 +169,6 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport {
|
|||
statsWithActive[1] < 15 * inactiveConsumerStats[1]);
|
||||
}
|
||||
|
||||
|
||||
public void x_initCombosForTestSendWithInactiveAndActiveConsumers() throws Exception {
|
||||
addCombinationValues("defaultPersistenceAdapter",
|
||||
new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
|
||||
}
|
||||
|
||||
public void x_testSendWithInactiveAndActiveConsumers() throws Exception {
|
||||
Destination destination = createDestination();
|
||||
ConnectionFactory factory = createConnectionFactory();
|
||||
|
@ -185,6 +198,7 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport {
|
|||
|
||||
final int toReceive = toSend * numIterations * consumerCount * 2;
|
||||
Wait.waitFor(new Wait.Condition() {
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
LOG.info("count: " + allMessagesList.getMessageCount());
|
||||
return toReceive == allMessagesList.getMessageCount();
|
||||
|
@ -194,14 +208,12 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport {
|
|||
assertEquals("got all messages", toReceive, allMessagesList.getMessageCount());
|
||||
}
|
||||
|
||||
|
||||
private MessageProducer createMessageProducer(Session session, Destination destination) throws JMSException {
|
||||
MessageProducer producer = session.createProducer(destination);
|
||||
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||
return producer;
|
||||
}
|
||||
|
||||
|
||||
private void startInactiveConsumers(ConnectionFactory factory, Destination destination) throws Exception {
|
||||
// create off line consumers
|
||||
startConsumers(factory, destination);
|
||||
|
@ -212,7 +224,6 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport {
|
|||
consumers.clear();
|
||||
}
|
||||
|
||||
|
||||
protected void startConsumers(ConnectionFactory factory, Destination dest) throws Exception {
|
||||
MessageConsumer consumer;
|
||||
for (int i = 0; i < consumerCount; i++) {
|
||||
|
@ -263,8 +274,7 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport {
|
|||
LOG.info("Signalled add consumer");
|
||||
}
|
||||
}
|
||||
}
|
||||
;
|
||||
};
|
||||
if (count % 5000 == 0) {
|
||||
LOG.info("Sent " + count + ", singleSendMax:" + max);
|
||||
}
|
||||
|
@ -300,7 +310,8 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void setUp() throws Exception {
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
topic = true;
|
||||
super.setUp();
|
||||
broker = createBroker();
|
||||
|
@ -308,7 +319,8 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void tearDown() throws Exception {
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
for (Iterator<Connection> iter = connections.iterator(); iter.hasNext();) {
|
||||
Connection conn = iter.next();
|
||||
try {
|
||||
|
@ -322,7 +334,6 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport {
|
|||
super.tearDown();
|
||||
}
|
||||
|
||||
|
||||
protected BrokerService createBroker() throws Exception {
|
||||
BrokerService brokerService = new BrokerService();
|
||||
brokerService.setEnableStatistics(false);
|
||||
|
@ -373,11 +384,12 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport {
|
|||
</dependency>
|
||||
*/
|
||||
// } else {
|
||||
setDefaultPersistenceAdapter(brokerService);
|
||||
setPersistenceAdapter(brokerService, persistenceAdapterChoice);
|
||||
// }
|
||||
return brokerService;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
|
||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
|
||||
broker.getTransportConnectors().get(0).getPublishableConnectString());
|
||||
|
@ -389,10 +401,6 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport {
|
|||
return factory;
|
||||
}
|
||||
|
||||
public static Test suite() {
|
||||
return suite(ConcurrentProducerDurableConsumerTest.class);
|
||||
}
|
||||
|
||||
class TimedMessageListener implements MessageListener {
|
||||
final int batchSize = 1000;
|
||||
CountDownLatch firstReceiptLatch = new CountDownLatch(1);
|
||||
|
@ -480,5 +488,4 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport {
|
|||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -87,6 +87,8 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
|
||||
@Override
|
||||
protected void setUp() throws Exception {
|
||||
setAutoFail(true);
|
||||
setMaxTestTime(2 * 60 * 1000);
|
||||
exceptions.clear();
|
||||
topic = (ActiveMQTopic) createDestination();
|
||||
createBroker();
|
||||
|
@ -226,7 +228,6 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
assertEquals(sent, listener.count);
|
||||
}
|
||||
|
||||
|
||||
public void initCombosForTestVerifyAllConsumedAreAcked() throws Exception {
|
||||
this.addCombinationValues("defaultPersistenceAdapter",
|
||||
new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
|
||||
|
@ -454,7 +455,6 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
return 10 == val;
|
||||
}
|
||||
}));
|
||||
|
||||
}
|
||||
|
||||
public void initCombosForTestOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception {
|
||||
|
@ -535,7 +535,6 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
assertEquals(sent, listener3.count);
|
||||
}
|
||||
|
||||
|
||||
public void initCombosForTestInterleavedOfflineSubscriptionCanConsume() throws Exception {
|
||||
this.addCombinationValues("defaultPersistenceAdapter",
|
||||
new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
|
||||
|
@ -989,7 +988,6 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
session.close();
|
||||
con.close();
|
||||
|
||||
|
||||
// send messages
|
||||
con = createConnection();
|
||||
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
@ -1085,7 +1083,6 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
final String payLoad = new String(new byte[1000]);
|
||||
con = createConnection();
|
||||
final Session sendSession = con.createSession(true, Session.SESSION_TRANSACTED);
|
||||
|
@ -1268,7 +1265,6 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
session.close();
|
||||
con.close();
|
||||
|
||||
|
||||
con = createConnection("offCli1");
|
||||
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
|
||||
|
@ -1283,7 +1279,6 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
assertEquals(0, listener.count);
|
||||
}
|
||||
|
||||
|
||||
public void testAllConsumed() throws Exception {
|
||||
final String filter = "filter = 'true'";
|
||||
Connection con = createConnection("cli1");
|
||||
|
@ -1336,7 +1331,6 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
session.close();
|
||||
con.close();
|
||||
|
||||
|
||||
// send messages
|
||||
con = createConnection();
|
||||
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
@ -1400,7 +1394,6 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
con.close();
|
||||
LOG.info("sent: " + sent);
|
||||
|
||||
|
||||
// new sub at id 10
|
||||
con = createConnection("cli2");
|
||||
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
@ -1530,7 +1523,6 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
con.close();
|
||||
}
|
||||
|
||||
|
||||
// populate ack locations
|
||||
con = createConnection();
|
||||
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
@ -1705,8 +1697,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
if (b != null) {
|
||||
boolean c = message.getBooleanProperty("$c");
|
||||
assertTrue("", c);
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
String d = message.getStringProperty("$d");
|
||||
assertTrue("", "D1".equals(d) || "D2".equals(d));
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue