mirror of https://github.com/apache/activemq.git
Converted to JUnit4 and added @Ignore tags for failing tests. See AMQ-4286 and AMQ-5001
This commit is contained in:
parent
907660d2cf
commit
bec711c7db
|
@ -24,15 +24,21 @@ import org.apache.activemq.broker.BrokerService;
|
|||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.BlockJUnit4ClassRunner;
|
||||
|
||||
/**
|
||||
* User: gtully
|
||||
*/
|
||||
@RunWith(BlockJUnit4ClassRunner.class)
|
||||
public class ExpiryHogTest extends JmsMultipleClientsTestSupport {
|
||||
boolean sleep = false;
|
||||
|
||||
int numMessages = 4;
|
||||
|
||||
@Test(timeout = 2 * 60 * 1000)
|
||||
public void testImmediateDispatchWhenCacheDisabled() throws Exception {
|
||||
ConnectionFactory f = createConnectionFactory();
|
||||
destination = createDestination();
|
||||
|
@ -67,7 +73,8 @@ public class ExpiryHogTest extends JmsMultipleClientsTestSupport {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void setUp() throws Exception {
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
autoFail = false;
|
||||
persistent = true;
|
||||
super.setUp();
|
||||
|
|
|
@ -43,6 +43,14 @@ import org.apache.activemq.command.ActiveMQDestination;
|
|||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.util.MessageIdList;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.rules.TestName;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
/**
|
||||
* Test case support used to test multiple message comsumers and message
|
||||
|
@ -50,7 +58,12 @@ import org.apache.activemq.util.MessageIdList;
|
|||
*
|
||||
*
|
||||
*/
|
||||
public class JmsMultipleClientsTestSupport extends CombinationTestSupport {
|
||||
public class JmsMultipleClientsTestSupport {
|
||||
|
||||
@Rule
|
||||
public TestName testName = new TestName();
|
||||
|
||||
protected static final Logger LOG = LoggerFactory.getLogger(JmsMultipleClientsTestSupport.class);
|
||||
|
||||
protected Map<MessageConsumer, MessageIdList> consumers = new HashMap<MessageConsumer, MessageIdList>(); // Map of consumer with messages
|
||||
// received
|
||||
|
@ -217,14 +230,14 @@ public class JmsMultipleClientsTestSupport extends CombinationTestSupport {
|
|||
return BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false&useJmx=true"));
|
||||
}
|
||||
|
||||
protected void setUp() throws Exception {
|
||||
super.setAutoFail(autoFail);
|
||||
super.setUp();
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
broker = createBroker();
|
||||
broker.start();
|
||||
}
|
||||
|
||||
protected void tearDown() throws Exception {
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
for (Iterator<Connection> iter = connections.iterator(); iter.hasNext();) {
|
||||
Connection conn = iter.next();
|
||||
try {
|
||||
|
@ -232,10 +245,11 @@ public class JmsMultipleClientsTestSupport extends CombinationTestSupport {
|
|||
} catch (Throwable e) {
|
||||
}
|
||||
}
|
||||
if (broker !=null ) { // FIXME remove
|
||||
broker.stop();
|
||||
allMessagesList.flushMessages();
|
||||
consumers.clear();
|
||||
super.tearDown();
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -285,4 +299,31 @@ public class JmsMultipleClientsTestSupport extends CombinationTestSupport {
|
|||
}
|
||||
assertEquals("Total of consumers message count", msgCount, totalMsg);
|
||||
}
|
||||
|
||||
|
||||
public String getName() {
|
||||
return getName(false);
|
||||
}
|
||||
|
||||
public String getName(boolean original) {
|
||||
String currentTestName = testName.getMethodName();
|
||||
currentTestName = currentTestName.replace("[","");
|
||||
currentTestName = currentTestName.replace("]","");
|
||||
return currentTestName;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* This is copied from AutoFailTestSupport. We may want to move it to someplace where more
|
||||
* tests can use it.
|
||||
*/
|
||||
public static void dumpAllThreads(String prefix) {
|
||||
Map<Thread, StackTraceElement[]> stacks = Thread.getAllStackTraces();
|
||||
for (Map.Entry<Thread, StackTraceElement[]> stackEntry : stacks.entrySet()) {
|
||||
System.err.println(prefix + " " + stackEntry.getKey());
|
||||
for(StackTraceElement element : stackEntry.getValue()) {
|
||||
System.err.println(" " + element);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,9 +35,19 @@ import org.apache.activemq.ActiveMQConnection;
|
|||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.BlockJUnit4ClassRunner;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
@RunWith(BlockJUnit4ClassRunner.class)
|
||||
public class NioQueueSubscriptionTest extends QueueSubscriptionTest {
|
||||
|
||||
protected static final Logger LOG = LoggerFactory.getLogger(NioQueueSubscriptionTest.class);
|
||||
|
@ -49,12 +59,18 @@ public class NioQueueSubscriptionTest extends QueueSubscriptionTest {
|
|||
return new ActiveMQConnectionFactory("tcp://localhost:62621?trace=false");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setUp() throws Exception {
|
||||
// setMaxTestTime(20*60*1000);
|
||||
/*
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
}
|
||||
*/
|
||||
|
||||
@Override
|
||||
protected BrokerService createBroker() throws Exception {
|
||||
BrokerService answer = BrokerFactory.createBroker(new URI(
|
||||
|
@ -74,6 +90,9 @@ public class NioQueueSubscriptionTest extends QueueSubscriptionTest {
|
|||
return answer;
|
||||
}
|
||||
|
||||
|
||||
@Ignore("See AMQ-4286")
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testLotsOfConcurrentConnections() throws Exception {
|
||||
ExecutorService executor = Executors.newCachedThreadPool();
|
||||
final ConnectionFactory factory = createConnectionFactory();
|
||||
|
|
|
@ -19,17 +19,33 @@ package org.apache.activemq.broker;
|
|||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.JmsMultipleClientsTestSupport;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.BlockJUnit4ClassRunner;
|
||||
|
||||
@RunWith(BlockJUnit4ClassRunner.class)
|
||||
public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
|
||||
protected int messageCount = 1000; // 1000 Messages per producer
|
||||
protected int prefetchCount = 10;
|
||||
|
||||
protected void setUp() throws Exception {
|
||||
@Before
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
durable = false;
|
||||
topic = false;
|
||||
}
|
||||
|
||||
@After
|
||||
@Override
|
||||
public void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
}
|
||||
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testManyProducersOneConsumer() throws Exception {
|
||||
consumerCount = 1;
|
||||
producerCount = 10;
|
||||
|
@ -42,6 +58,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
|
|||
assertTotalMessagesReceived(messageCount * producerCount);
|
||||
}
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception {
|
||||
consumerCount = 2;
|
||||
producerCount = 1;
|
||||
|
@ -54,6 +71,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
|
|||
assertTotalMessagesReceived(messageCount * producerCount);
|
||||
}
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception {
|
||||
consumerCount = 2;
|
||||
producerCount = 1;
|
||||
|
@ -66,6 +84,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
|
|||
assertTotalMessagesReceived(messageCount * producerCount);
|
||||
}
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception {
|
||||
consumerCount = 2;
|
||||
producerCount = 1;
|
||||
|
@ -78,6 +97,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
|
|||
assertTotalMessagesReceived(messageCount * producerCount);
|
||||
}
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws Exception {
|
||||
consumerCount = 2;
|
||||
producerCount = 1;
|
||||
|
@ -90,6 +110,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
|
|||
assertTotalMessagesReceived(messageCount * producerCount);
|
||||
}
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testOneProducerManyConsumersFewMessages() throws Exception {
|
||||
consumerCount = 50;
|
||||
producerCount = 1;
|
||||
|
@ -102,6 +123,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
|
|||
assertTotalMessagesReceived(messageCount * producerCount);
|
||||
}
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testOneProducerManyConsumersManyMessages() throws Exception {
|
||||
consumerCount = 50;
|
||||
producerCount = 1;
|
||||
|
@ -114,6 +136,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
|
|||
assertTotalMessagesReceived(messageCount * producerCount);
|
||||
}
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testManyProducersManyConsumers() throws Exception {
|
||||
consumerCount = 200;
|
||||
producerCount = 50;
|
||||
|
|
|
@ -19,20 +19,32 @@ package org.apache.activemq.broker;
|
|||
import org.apache.activemq.TestSupport;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.util.ThreadTracker;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.BlockJUnit4ClassRunner;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
|
||||
@RunWith(BlockJUnit4ClassRunner.class)
|
||||
public class TopicSubscriptionTest extends QueueSubscriptionTest {
|
||||
|
||||
protected void setUp() throws Exception {
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
durable = true;
|
||||
topic = true;
|
||||
}
|
||||
|
||||
protected void tearDown() throws Exception {
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
ThreadTracker.result();
|
||||
}
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testManyProducersManyConsumers() throws Exception {
|
||||
consumerCount = 40;
|
||||
producerCount = 20;
|
||||
|
@ -46,6 +58,7 @@ public class TopicSubscriptionTest extends QueueSubscriptionTest {
|
|||
assertDestinationMemoryUsageGoesToZero();
|
||||
}
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception {
|
||||
consumerCount = 2;
|
||||
producerCount = 1;
|
||||
|
@ -59,6 +72,7 @@ public class TopicSubscriptionTest extends QueueSubscriptionTest {
|
|||
assertDestinationMemoryUsageGoesToZero();
|
||||
}
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception {
|
||||
consumerCount = 2;
|
||||
producerCount = 1;
|
||||
|
@ -72,6 +86,7 @@ public class TopicSubscriptionTest extends QueueSubscriptionTest {
|
|||
assertDestinationMemoryUsageGoesToZero();
|
||||
}
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception {
|
||||
consumerCount = 2;
|
||||
producerCount = 1;
|
||||
|
@ -84,6 +99,7 @@ public class TopicSubscriptionTest extends QueueSubscriptionTest {
|
|||
assertTotalMessagesReceived(messageCount * consumerCount * producerCount);
|
||||
}
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws Exception {
|
||||
consumerCount = 2;
|
||||
producerCount = 1;
|
||||
|
@ -97,6 +113,7 @@ public class TopicSubscriptionTest extends QueueSubscriptionTest {
|
|||
assertDestinationMemoryUsageGoesToZero();
|
||||
}
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testOneProducerManyConsumersFewMessages() throws Exception {
|
||||
consumerCount = 50;
|
||||
producerCount = 1;
|
||||
|
@ -110,6 +127,7 @@ public class TopicSubscriptionTest extends QueueSubscriptionTest {
|
|||
assertDestinationMemoryUsageGoesToZero();
|
||||
}
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testOneProducerManyConsumersManyMessages() throws Exception {
|
||||
consumerCount = 50;
|
||||
producerCount = 1;
|
||||
|
@ -124,6 +142,7 @@ public class TopicSubscriptionTest extends QueueSubscriptionTest {
|
|||
}
|
||||
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testManyProducersOneConsumer() throws Exception {
|
||||
consumerCount = 1;
|
||||
producerCount = 20;
|
||||
|
|
|
@ -16,11 +16,6 @@
|
|||
*/
|
||||
package org.apache.activemq.broker.policy;
|
||||
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.Session;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnection;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
|
@ -28,19 +23,35 @@ import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy;
|
|||
import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
|
||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.BlockJUnit4ClassRunner;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.Session;
|
||||
|
||||
public class AbortSlowAckConsumerTest extends AbortSlowConsumerTest {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(AbortSlowAckConsumerTest.class);
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
@RunWith(value = BlockJUnit4ClassRunner.class)
|
||||
public class AbortSlowAckConsumer0Test extends AbortSlowConsumer0Test {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(AbortSlowAckConsumer0Test.class);
|
||||
protected long maxTimeSinceLastAck = 5 * 1000;
|
||||
|
||||
@Override
|
||||
protected AbortSlowConsumerStrategy createSlowConsumerStrategy() {
|
||||
return new AbortSlowConsumerStrategy();
|
||||
protected AbortSlowAckConsumerStrategy createSlowConsumerStrategy() {
|
||||
AbortSlowAckConsumerStrategy strategy = new AbortSlowAckConsumerStrategy();
|
||||
strategy.setAbortConnection(abortConnection);
|
||||
strategy.setCheckPeriod(checkPeriod);
|
||||
strategy.setMaxSlowDuration(maxSlowDuration);
|
||||
strategy.setMaxTimeSinceLastAck(maxTimeSinceLastAck);
|
||||
|
||||
return strategy;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -48,11 +59,7 @@ public class AbortSlowAckConsumerTest extends AbortSlowConsumerTest {
|
|||
BrokerService broker = super.createBroker();
|
||||
PolicyEntry policy = new PolicyEntry();
|
||||
|
||||
AbortSlowAckConsumerStrategy strategy = (AbortSlowAckConsumerStrategy) underTest;
|
||||
strategy.setAbortConnection(abortConnection);
|
||||
strategy.setCheckPeriod(checkPeriod);
|
||||
strategy.setMaxSlowDuration(maxSlowDuration);
|
||||
strategy.setMaxTimeSinceLastAck(maxTimeSinceLastAck);
|
||||
AbortSlowAckConsumerStrategy strategy = createSlowConsumerStrategy();
|
||||
|
||||
policy.setSlowConsumerStrategy(strategy);
|
||||
policy.setQueuePrefetch(10);
|
||||
|
@ -70,19 +77,18 @@ public class AbortSlowAckConsumerTest extends AbortSlowConsumerTest {
|
|||
return factory;
|
||||
}
|
||||
|
||||
|
||||
@Ignore("AMQ-5001")
|
||||
@Override
|
||||
@Test
|
||||
public void testSlowConsumerIsAbortedViaJmx() throws Exception {
|
||||
AbortSlowAckConsumerStrategy strategy = (AbortSlowAckConsumerStrategy) underTest;
|
||||
AbortSlowAckConsumerStrategy strategy = createSlowConsumerStrategy();
|
||||
strategy.setMaxTimeSinceLastAck(500); // so jmx does the abort
|
||||
super.testSlowConsumerIsAbortedViaJmx();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initCombosForTestSlowConsumerIsAborted() {
|
||||
addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
|
||||
addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
|
||||
}
|
||||
|
||||
@Ignore("AMQ-5001")
|
||||
@Test
|
||||
public void testZeroPrefetchConsumerIsAborted() throws Exception {
|
||||
ActiveMQConnection conn = (ActiveMQConnection) createConnectionFactory().createConnection();
|
||||
conn.setExceptionListener(this);
|
||||
|
@ -104,8 +110,10 @@ public class AbortSlowAckConsumerTest extends AbortSlowConsumerTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Ignore("AMQ-5001")
|
||||
@Test
|
||||
public void testIdleConsumerCanBeAbortedNoMessages() throws Exception {
|
||||
AbortSlowAckConsumerStrategy strategy = (AbortSlowAckConsumerStrategy) underTest;
|
||||
AbortSlowAckConsumerStrategy strategy = createSlowConsumerStrategy();
|
||||
strategy.setIgnoreIdleConsumers(false);
|
||||
|
||||
ActiveMQConnection conn = (ActiveMQConnection) createConnectionFactory().createConnection();
|
||||
|
@ -125,8 +133,10 @@ public class AbortSlowAckConsumerTest extends AbortSlowConsumerTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Ignore("AMQ-5001")
|
||||
@Test
|
||||
public void testIdleConsumerCanBeAborted() throws Exception {
|
||||
AbortSlowAckConsumerStrategy strategy = (AbortSlowAckConsumerStrategy) underTest;
|
||||
AbortSlowAckConsumerStrategy strategy = createSlowConsumerStrategy();
|
||||
strategy.setIgnoreIdleConsumers(false);
|
||||
|
||||
ActiveMQConnection conn = (ActiveMQConnection) createConnectionFactory().createConnection();
|
||||
|
@ -149,4 +159,6 @@ public class AbortSlowAckConsumerTest extends AbortSlowConsumerTest {
|
|||
} catch(Exception ex) {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,77 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.broker.policy;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy;
|
||||
import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
|
||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.BlockJUnit4ClassRunner;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.jms.ConnectionFactory;
|
||||
|
||||
@RunWith(value = Parameterized.class)
|
||||
public class AbortSlowAckConsumer1Test extends AbortSlowConsumer1Test {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(AbortSlowAckConsumer1Test.class);
|
||||
|
||||
protected long maxTimeSinceLastAck = 5 * 1000;
|
||||
|
||||
public AbortSlowAckConsumer1Test(Boolean abortConnection, Boolean topic) {
|
||||
super(abortConnection, topic);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbortSlowConsumerStrategy createSlowConsumerStrategy() {
|
||||
return new AbortSlowConsumerStrategy();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected BrokerService createBroker() throws Exception {
|
||||
BrokerService broker = super.createBroker();
|
||||
PolicyEntry policy = new PolicyEntry();
|
||||
|
||||
AbortSlowAckConsumerStrategy strategy = new AbortSlowAckConsumerStrategy();
|
||||
strategy.setAbortConnection(abortConnection);
|
||||
strategy.setCheckPeriod(checkPeriod);
|
||||
strategy.setMaxSlowDuration(maxSlowDuration);
|
||||
strategy.setMaxTimeSinceLastAck(maxTimeSinceLastAck);
|
||||
|
||||
policy.setSlowConsumerStrategy(strategy);
|
||||
policy.setQueuePrefetch(10);
|
||||
policy.setTopicPrefetch(10);
|
||||
PolicyMap pMap = new PolicyMap();
|
||||
pMap.setDefaultEntry(policy);
|
||||
broker.setDestinationPolicy(pMap);
|
||||
return broker;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ConnectionFactory createConnectionFactory() throws Exception {
|
||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
|
||||
factory.getPrefetchPolicy().setAll(1);
|
||||
return factory;
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,77 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.broker.policy;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy;
|
||||
import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
|
||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.BlockJUnit4ClassRunner;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.jms.ConnectionFactory;
|
||||
|
||||
@RunWith(value = Parameterized.class)
|
||||
public class AbortSlowAckConsumer2Test extends AbortSlowConsumer2Test {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(AbortSlowAckConsumer2Test.class);
|
||||
|
||||
protected long maxTimeSinceLastAck = 5 * 1000;
|
||||
|
||||
public AbortSlowAckConsumer2Test(Boolean topic) {
|
||||
super(topic);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbortSlowConsumerStrategy createSlowConsumerStrategy() {
|
||||
return new AbortSlowConsumerStrategy();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected BrokerService createBroker() throws Exception {
|
||||
BrokerService broker = super.createBroker();
|
||||
PolicyEntry policy = new PolicyEntry();
|
||||
|
||||
AbortSlowAckConsumerStrategy strategy = new AbortSlowAckConsumerStrategy();
|
||||
strategy.setAbortConnection(abortConnection);
|
||||
strategy.setCheckPeriod(checkPeriod);
|
||||
strategy.setMaxSlowDuration(maxSlowDuration);
|
||||
strategy.setMaxTimeSinceLastAck(maxTimeSinceLastAck);
|
||||
|
||||
policy.setSlowConsumerStrategy(strategy);
|
||||
policy.setQueuePrefetch(10);
|
||||
policy.setTopicPrefetch(10);
|
||||
PolicyMap pMap = new PolicyMap();
|
||||
pMap.setDefaultEntry(policy);
|
||||
broker.setDestinationPolicy(pMap);
|
||||
return broker;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ConnectionFactory createConnectionFactory() throws Exception {
|
||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
|
||||
factory.getPrefetchPolicy().setAll(1);
|
||||
return factory;
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -16,11 +16,22 @@
|
|||
*/
|
||||
package org.apache.activemq.broker.policy;
|
||||
|
||||
import java.lang.reflect.UndeclaredThrowableException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.activemq.JmsMultipleClientsTestSupport;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.jmx.AbortSlowConsumerStrategyViewMBean;
|
||||
import org.apache.activemq.broker.jmx.DestinationViewMBean;
|
||||
import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
|
||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.util.MessageIdList;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.BlockJUnit4ClassRunner;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ExceptionListener;
|
||||
|
@ -31,62 +42,21 @@ import javax.management.InstanceNotFoundException;
|
|||
import javax.management.ObjectName;
|
||||
import javax.management.openmbean.CompositeData;
|
||||
import javax.management.openmbean.TabularData;
|
||||
import java.lang.reflect.UndeclaredThrowableException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import junit.framework.Test;
|
||||
|
||||
import org.apache.activemq.JmsMultipleClientsTestSupport;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.jmx.AbortSlowConsumerStrategyViewMBean;
|
||||
import org.apache.activemq.broker.jmx.DestinationViewMBean;
|
||||
import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
|
||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.util.MessageIdList;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
|
||||
public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport implements ExceptionListener {
|
||||
@RunWith(BlockJUnit4ClassRunner.class)
|
||||
public class AbortSlowConsumer0Test extends AbortSlowConsumerBase {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumerTest.class);
|
||||
|
||||
protected AbortSlowConsumerStrategy underTest;
|
||||
protected boolean abortConnection = false;
|
||||
protected long checkPeriod = 2 * 1000;
|
||||
protected long maxSlowDuration = 5 * 1000;
|
||||
protected final List<Throwable> exceptions = new ArrayList<Throwable>();
|
||||
|
||||
@Override
|
||||
protected void setUp() throws Exception {
|
||||
exceptions.clear();
|
||||
topic = true;
|
||||
underTest = createSlowConsumerStrategy();
|
||||
super.setUp();
|
||||
createDestination();
|
||||
}
|
||||
|
||||
protected AbortSlowConsumerStrategy createSlowConsumerStrategy() {
|
||||
return new AbortSlowConsumerStrategy();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected BrokerService createBroker() throws Exception {
|
||||
BrokerService broker = super.createBroker();
|
||||
PolicyEntry policy = new PolicyEntry();
|
||||
underTest.setAbortConnection(abortConnection);
|
||||
underTest.setCheckPeriod(checkPeriod);
|
||||
underTest.setMaxSlowDuration(maxSlowDuration);
|
||||
|
||||
policy.setSlowConsumerStrategy(underTest);
|
||||
policy.setQueuePrefetch(10);
|
||||
policy.setTopicPrefetch(10);
|
||||
PolicyMap pMap = new PolicyMap();
|
||||
pMap.setDefaultEntry(policy);
|
||||
broker.setDestinationPolicy(pMap);
|
||||
return broker;
|
||||
}
|
||||
private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumer0Test.class);
|
||||
|
||||
@Test
|
||||
public void testRegularConsumerIsNotAborted() throws Exception {
|
||||
startConsumers(destination);
|
||||
for (Connection c : connections) {
|
||||
|
@ -97,41 +67,7 @@ public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport impleme
|
|||
allMessagesList.assertAtLeastMessagesReceived(10);
|
||||
}
|
||||
|
||||
public void initCombosForTestLittleSlowConsumerIsNotAborted() {
|
||||
addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
|
||||
}
|
||||
|
||||
public void testLittleSlowConsumerIsNotAborted() throws Exception {
|
||||
startConsumers(destination);
|
||||
Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
|
||||
consumertoAbort.getValue().setProcessingDelay(500);
|
||||
for (Connection c : connections) {
|
||||
c.setExceptionListener(this);
|
||||
}
|
||||
startProducers(destination, 12);
|
||||
allMessagesList.waitForMessagesToArrive(10);
|
||||
allMessagesList.assertAtLeastMessagesReceived(10);
|
||||
}
|
||||
|
||||
public void initCombosForTestSlowConsumerIsAborted() {
|
||||
addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
|
||||
addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
|
||||
}
|
||||
|
||||
public void testSlowConsumerIsAborted() throws Exception {
|
||||
startConsumers(destination);
|
||||
Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
|
||||
consumertoAbort.getValue().setProcessingDelay(8 * 1000);
|
||||
for (Connection c : connections) {
|
||||
c.setExceptionListener(this);
|
||||
}
|
||||
startProducers(destination, 100);
|
||||
|
||||
consumertoAbort.getValue().assertMessagesReceived(1);
|
||||
TimeUnit.SECONDS.sleep(5);
|
||||
consumertoAbort.getValue().assertAtMostMessagesReceived(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSlowConsumerIsAbortedViaJmx() throws Exception {
|
||||
underTest.setMaxSlowDuration(60*1000); // so jmx does the abort
|
||||
startConsumers(destination);
|
||||
|
@ -187,6 +123,7 @@ public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport impleme
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOnlyOneSlowConsumerIsAborted() throws Exception {
|
||||
consumerCount = 10;
|
||||
startConsumers(destination);
|
||||
|
@ -205,6 +142,7 @@ public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport impleme
|
|||
consumertoAbort.getValue().assertAtMostMessagesReceived(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAbortAlreadyClosingConsumers() throws Exception {
|
||||
consumerCount = 1;
|
||||
startConsumers(destination);
|
||||
|
@ -224,49 +162,7 @@ public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport impleme
|
|||
}
|
||||
}
|
||||
|
||||
public void initCombosForTestAbortAlreadyClosedConsumers() {
|
||||
addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
|
||||
addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
|
||||
}
|
||||
|
||||
public void testAbortAlreadyClosedConsumers() throws Exception {
|
||||
Connection conn = createConnectionFactory().createConnection();
|
||||
conn.setExceptionListener(this);
|
||||
connections.add(conn);
|
||||
|
||||
Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
|
||||
final MessageConsumer consumer = sess.createConsumer(destination);
|
||||
conn.start();
|
||||
startProducers(destination, 20);
|
||||
TimeUnit.SECONDS.sleep(1);
|
||||
LOG.info("closing consumer: " + consumer);
|
||||
consumer.close();
|
||||
|
||||
TimeUnit.SECONDS.sleep(5);
|
||||
assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty());
|
||||
}
|
||||
|
||||
public void initCombosForTestAbortAlreadyClosedConnection() {
|
||||
addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
|
||||
addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
|
||||
}
|
||||
|
||||
public void testAbortAlreadyClosedConnection() throws Exception {
|
||||
Connection conn = createConnectionFactory().createConnection();
|
||||
conn.setExceptionListener(this);
|
||||
|
||||
Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
|
||||
sess.createConsumer(destination);
|
||||
conn.start();
|
||||
startProducers(destination, 20);
|
||||
TimeUnit.SECONDS.sleep(1);
|
||||
LOG.info("closing connection: " + conn);
|
||||
conn.close();
|
||||
|
||||
TimeUnit.SECONDS.sleep(5);
|
||||
assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAbortConsumerOnDeadConnection() throws Exception {
|
||||
// socket proxy on pause, close could hang??
|
||||
}
|
||||
|
@ -276,8 +172,4 @@ public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport impleme
|
|||
exceptions.add(exception);
|
||||
exception.printStackTrace();
|
||||
}
|
||||
|
||||
public static Test suite() {
|
||||
return suite(AbortSlowConsumerTest.class);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,112 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.broker.policy;
|
||||
|
||||
import org.apache.activemq.util.MessageIdList;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.Session;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
@RunWith(value = Parameterized.class)
|
||||
public class AbortSlowConsumer1Test extends AbortSlowConsumerBase {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumer1Test.class);
|
||||
|
||||
@Parameterized.Parameters(name = "{0}-{1}")
|
||||
public static Collection<Object[]> getTestParameters() {
|
||||
|
||||
List<Object[]> testParameters = new ArrayList<Object[]>();
|
||||
Boolean[] booleanValues = {Boolean.TRUE, Boolean.TRUE};
|
||||
for (Boolean abortConnection : booleanValues) {
|
||||
for (Boolean topic : booleanValues) {
|
||||
Boolean[] pair = {abortConnection, topic};
|
||||
LOG.info(">>>>> in getTestparameters, adding {}, {}", abortConnection, topic);
|
||||
testParameters.add(pair);
|
||||
}
|
||||
}
|
||||
|
||||
return testParameters;
|
||||
}
|
||||
|
||||
public AbortSlowConsumer1Test(Boolean abortConnection, Boolean topic) {
|
||||
this.abortConnection = abortConnection;
|
||||
this.topic = topic;
|
||||
}
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testSlowConsumerIsAborted() throws Exception {
|
||||
startConsumers(destination);
|
||||
Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
|
||||
consumertoAbort.getValue().setProcessingDelay(8 * 1000);
|
||||
for (Connection c : connections) {
|
||||
c.setExceptionListener(this);
|
||||
}
|
||||
startProducers(destination, 100);
|
||||
|
||||
consumertoAbort.getValue().assertMessagesReceived(1);
|
||||
TimeUnit.SECONDS.sleep(5);
|
||||
consumertoAbort.getValue().assertAtMostMessagesReceived(1);
|
||||
}
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testAbortAlreadyClosedConsumers() throws Exception {
|
||||
Connection conn = createConnectionFactory().createConnection();
|
||||
conn.setExceptionListener(this);
|
||||
connections.add(conn);
|
||||
|
||||
Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
|
||||
final MessageConsumer consumer = sess.createConsumer(destination);
|
||||
conn.start();
|
||||
startProducers(destination, 20);
|
||||
TimeUnit.SECONDS.sleep(1);
|
||||
LOG.info("closing consumer: " + consumer);
|
||||
consumer.close();
|
||||
|
||||
TimeUnit.SECONDS.sleep(5);
|
||||
assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty());
|
||||
}
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testAbortAlreadyClosedConnection() throws Exception {
|
||||
Connection conn = createConnectionFactory().createConnection();
|
||||
conn.setExceptionListener(this);
|
||||
|
||||
Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
|
||||
sess.createConsumer(destination);
|
||||
conn.start();
|
||||
startProducers(destination, 20);
|
||||
TimeUnit.SECONDS.sleep(1);
|
||||
LOG.info("closing connection: " + conn);
|
||||
conn.close();
|
||||
|
||||
TimeUnit.SECONDS.sleep(5);
|
||||
assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,68 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.broker.policy;
|
||||
|
||||
import org.apache.activemq.util.MessageIdList;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.MessageConsumer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
@RunWith(value = Parameterized.class)
|
||||
public class AbortSlowConsumer2Test extends AbortSlowConsumerBase {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumer2Test.class);
|
||||
|
||||
@Parameterized.Parameters(name = "{0}")
|
||||
public static Collection<Object[]> getTestParameters() {
|
||||
|
||||
List<Object[]> testParameters = new ArrayList<Object[]>();
|
||||
Boolean[] booleanValues = {Boolean.TRUE, Boolean.FALSE};
|
||||
for (Boolean topic : booleanValues) {
|
||||
Boolean[] value = {topic};
|
||||
testParameters.add(value);
|
||||
}
|
||||
|
||||
return testParameters;
|
||||
}
|
||||
|
||||
public AbortSlowConsumer2Test(Boolean isTopic) {
|
||||
this.topic = isTopic;
|
||||
}
|
||||
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testLittleSlowConsumerIsNotAborted() throws Exception {
|
||||
startConsumers(destination);
|
||||
Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
|
||||
consumertoAbort.getValue().setProcessingDelay(500);
|
||||
for (Connection c : connections) {
|
||||
c.setExceptionListener(this);
|
||||
}
|
||||
startProducers(destination, 12);
|
||||
allMessagesList.waitForMessagesToArrive(10);
|
||||
allMessagesList.assertAtLeastMessagesReceived(10);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,96 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.broker.policy;
|
||||
|
||||
import junit.framework.Test;
|
||||
import org.apache.activemq.JmsMultipleClientsTestSupport;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.jmx.AbortSlowConsumerStrategyViewMBean;
|
||||
import org.apache.activemq.broker.jmx.DestinationViewMBean;
|
||||
import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
|
||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.util.MessageIdList;
|
||||
import org.junit.Before;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ExceptionListener;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.Session;
|
||||
import javax.management.InstanceNotFoundException;
|
||||
import javax.management.ObjectName;
|
||||
import javax.management.openmbean.CompositeData;
|
||||
import javax.management.openmbean.TabularData;
|
||||
import java.lang.reflect.UndeclaredThrowableException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
||||
public class AbortSlowConsumerBase extends JmsMultipleClientsTestSupport implements ExceptionListener {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumerBase.class);
|
||||
|
||||
protected AbortSlowConsumerStrategy underTest;
|
||||
protected boolean abortConnection = false;
|
||||
protected long checkPeriod = 2 * 1000;
|
||||
protected long maxSlowDuration = 5 * 1000;
|
||||
protected final List<Throwable> exceptions = new ArrayList<Throwable>();
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
exceptions.clear();
|
||||
topic = true;
|
||||
underTest = createSlowConsumerStrategy();
|
||||
super.setUp();
|
||||
createDestination();
|
||||
}
|
||||
|
||||
protected AbortSlowConsumerStrategy createSlowConsumerStrategy() {
|
||||
return new AbortSlowConsumerStrategy();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected BrokerService createBroker() throws Exception {
|
||||
BrokerService broker = super.createBroker();
|
||||
PolicyEntry policy = new PolicyEntry();
|
||||
underTest.setAbortConnection(abortConnection);
|
||||
underTest.setCheckPeriod(checkPeriod);
|
||||
underTest.setMaxSlowDuration(maxSlowDuration);
|
||||
|
||||
policy.setSlowConsumerStrategy(underTest);
|
||||
policy.setQueuePrefetch(10);
|
||||
policy.setTopicPrefetch(10);
|
||||
PolicyMap pMap = new PolicyMap();
|
||||
pMap.setDefaultEntry(policy);
|
||||
broker.setDestinationPolicy(pMap);
|
||||
return broker;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onException(JMSException exception) {
|
||||
exceptions.add(exception);
|
||||
exception.printStackTrace();
|
||||
}
|
||||
|
||||
}
|
|
@ -26,7 +26,11 @@ import org.apache.activemq.broker.QueueSubscriptionTest;
|
|||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.BlockJUnit4ClassRunner;
|
||||
|
||||
@RunWith(BlockJUnit4ClassRunner.class)
|
||||
public class RoundRobinDispatchPolicyTest extends QueueSubscriptionTest {
|
||||
|
||||
protected BrokerService createBroker() throws Exception {
|
||||
|
@ -43,6 +47,7 @@ public class RoundRobinDispatchPolicyTest extends QueueSubscriptionTest {
|
|||
return broker;
|
||||
}
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception {
|
||||
super.testOneProducerTwoConsumersSmallMessagesOnePrefetch();
|
||||
|
||||
|
@ -52,11 +57,13 @@ public class RoundRobinDispatchPolicyTest extends QueueSubscriptionTest {
|
|||
assertEachConsumerReceivedAtLeastXMessages(1);
|
||||
}
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception {
|
||||
super.testOneProducerTwoConsumersSmallMessagesLargePrefetch();
|
||||
assertMessagesDividedAmongConsumers();
|
||||
}
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception {
|
||||
super.testOneProducerTwoConsumersLargeMessagesOnePrefetch();
|
||||
|
||||
|
@ -66,11 +73,13 @@ public class RoundRobinDispatchPolicyTest extends QueueSubscriptionTest {
|
|||
assertEachConsumerReceivedAtLeastXMessages(1);
|
||||
}
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws Exception {
|
||||
super.testOneProducerTwoConsumersLargeMessagesLargePrefetch();
|
||||
assertMessagesDividedAmongConsumers();
|
||||
}
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testOneProducerManyConsumersFewMessages() throws Exception {
|
||||
super.testOneProducerManyConsumersFewMessages();
|
||||
|
||||
|
@ -79,16 +88,19 @@ public class RoundRobinDispatchPolicyTest extends QueueSubscriptionTest {
|
|||
assertMessagesDividedAmongConsumers();
|
||||
}
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testOneProducerManyConsumersManyMessages() throws Exception {
|
||||
super.testOneProducerManyConsumersManyMessages();
|
||||
assertMessagesDividedAmongConsumers();
|
||||
}
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testManyProducersManyConsumers() throws Exception {
|
||||
super.testManyProducersManyConsumers();
|
||||
assertMessagesDividedAmongConsumers();
|
||||
}
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testOneProducerTwoMatchingConsumersOneNotMatchingConsumer() throws Exception {
|
||||
// Create consumer that won't consume any message
|
||||
createMessageConsumer(createConnectionFactory().createConnection(), createDestination(), "JMSPriority<1");
|
||||
|
|
|
@ -27,7 +27,13 @@ import org.apache.activemq.broker.region.policy.PolicyEntry;
|
|||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
|
||||
import org.apache.activemq.util.MessageIdList;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.BlockJUnit4ClassRunner;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
@RunWith(BlockJUnit4ClassRunner.class)
|
||||
public class SimpleDispatchPolicyTest extends QueueSubscriptionTest {
|
||||
|
||||
@Override
|
||||
|
@ -46,6 +52,7 @@ public class SimpleDispatchPolicyTest extends QueueSubscriptionTest {
|
|||
}
|
||||
|
||||
@Override
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception {
|
||||
super.testOneProducerTwoConsumersSmallMessagesLargePrefetch();
|
||||
|
||||
|
@ -54,6 +61,7 @@ public class SimpleDispatchPolicyTest extends QueueSubscriptionTest {
|
|||
}
|
||||
|
||||
@Override
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws Exception {
|
||||
super.testOneProducerTwoConsumersLargeMessagesLargePrefetch();
|
||||
|
||||
|
|
|
@ -26,7 +26,13 @@ import org.apache.activemq.broker.region.policy.PolicyEntry;
|
|||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.apache.activemq.broker.region.policy.StrictOrderDispatchPolicy;
|
||||
import org.apache.activemq.util.MessageIdList;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.BlockJUnit4ClassRunner;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
@RunWith(BlockJUnit4ClassRunner.class)
|
||||
public class StrictOrderDispatchPolicyTest extends TopicSubscriptionTest {
|
||||
|
||||
@Override
|
||||
|
@ -44,6 +50,7 @@ public class StrictOrderDispatchPolicyTest extends TopicSubscriptionTest {
|
|||
return broker;
|
||||
}
|
||||
|
||||
@Test
|
||||
@Override
|
||||
public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception {
|
||||
super.testOneProducerTwoConsumersLargeMessagesOnePrefetch();
|
||||
|
@ -51,6 +58,7 @@ public class StrictOrderDispatchPolicyTest extends TopicSubscriptionTest {
|
|||
assertReceivedMessagesAreOrdered();
|
||||
}
|
||||
|
||||
@Test
|
||||
@Override
|
||||
public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception {
|
||||
super.testOneProducerTwoConsumersSmallMessagesOnePrefetch();
|
||||
|
@ -58,6 +66,7 @@ public class StrictOrderDispatchPolicyTest extends TopicSubscriptionTest {
|
|||
assertReceivedMessagesAreOrdered();
|
||||
}
|
||||
|
||||
@Test
|
||||
@Override
|
||||
public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception {
|
||||
super.testOneProducerTwoConsumersSmallMessagesLargePrefetch();
|
||||
|
@ -65,6 +74,7 @@ public class StrictOrderDispatchPolicyTest extends TopicSubscriptionTest {
|
|||
assertReceivedMessagesAreOrdered();
|
||||
}
|
||||
|
||||
@Test
|
||||
@Override
|
||||
public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws Exception {
|
||||
super.testOneProducerTwoConsumersLargeMessagesLargePrefetch();
|
||||
|
@ -72,6 +82,7 @@ public class StrictOrderDispatchPolicyTest extends TopicSubscriptionTest {
|
|||
assertReceivedMessagesAreOrdered();
|
||||
}
|
||||
|
||||
@Test
|
||||
@Override
|
||||
public void testOneProducerManyConsumersFewMessages() throws Exception {
|
||||
super.testOneProducerManyConsumersFewMessages();
|
||||
|
@ -79,6 +90,8 @@ public class StrictOrderDispatchPolicyTest extends TopicSubscriptionTest {
|
|||
assertReceivedMessagesAreOrdered();
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
@Override
|
||||
public void testOneProducerManyConsumersManyMessages() throws Exception {
|
||||
super.testOneProducerManyConsumersManyMessages();
|
||||
|
@ -86,6 +99,7 @@ public class StrictOrderDispatchPolicyTest extends TopicSubscriptionTest {
|
|||
assertReceivedMessagesAreOrdered();
|
||||
}
|
||||
|
||||
@Test
|
||||
@Override
|
||||
public void testManyProducersOneConsumer() throws Exception {
|
||||
super.testManyProducersOneConsumer();
|
||||
|
@ -93,6 +107,7 @@ public class StrictOrderDispatchPolicyTest extends TopicSubscriptionTest {
|
|||
assertReceivedMessagesAreOrdered();
|
||||
}
|
||||
|
||||
@Test
|
||||
@Override
|
||||
public void testManyProducersManyConsumers() throws Exception {
|
||||
super.testManyProducersManyConsumers();
|
||||
|
|
|
@ -16,11 +16,6 @@
|
|||
*/
|
||||
package org.apache.activemq.bugs;
|
||||
|
||||
import java.util.Vector;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.JmsMultipleClientsTestSupport;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
|
@ -28,7 +23,18 @@ import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePo
|
|||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.BlockJUnit4ClassRunner;
|
||||
|
||||
import java.util.Vector;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
@RunWith(BlockJUnit4ClassRunner.class)
|
||||
public class AMQ2910Test extends JmsMultipleClientsTestSupport {
|
||||
|
||||
final int maxConcurrency = 60;
|
||||
|
@ -55,6 +61,7 @@ public class AMQ2910Test extends JmsMultipleClientsTestSupport {
|
|||
return broker;
|
||||
}
|
||||
|
||||
@Test(timeout = 30 * 1000)
|
||||
public void testConcurrentSendToPendingCursor() throws Exception {
|
||||
final ActiveMQConnectionFactory factory =
|
||||
new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri());
|
||||
|
@ -103,6 +110,7 @@ public class AMQ2910Test extends JmsMultipleClientsTestSupport {
|
|||
|
||||
if (allMessagesList.getMessageCount() != numExpected) {
|
||||
dumpAllThreads(getName());
|
||||
|
||||
}
|
||||
allMessagesList.assertMessagesReceivedNoWait(numExpected);
|
||||
|
||||
|
|
Loading…
Reference in New Issue