mirror of https://github.com/apache/activemq.git
convert to JUnit 4 test and add a timeout
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1506149 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3940f2dffd
commit
acbe5499b5
|
@ -16,6 +16,10 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.bugs;
|
package org.apache.activemq.bugs;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.lang.Thread.UncaughtExceptionHandler;
|
import java.lang.Thread.UncaughtExceptionHandler;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Vector;
|
import java.util.Vector;
|
||||||
|
@ -32,28 +36,33 @@ import javax.jms.MessageProducer;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
import javax.jms.TextMessage;
|
import javax.jms.TextMessage;
|
||||||
|
|
||||||
import junit.framework.TestCase;
|
|
||||||
|
|
||||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.command.ActiveMQQueue;
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
import org.apache.activemq.command.ActiveMQTopic;
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.TestName;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is a test case for the issue reported at:
|
* This is a test case for the issue reported at: https://issues.apache.org/activemq/browse/AMQ-2021 Bug is modification
|
||||||
* https://issues.apache.org/activemq/browse/AMQ-2021
|
* of inflight message properties so the failure can manifest itself in a bunch or ways, from message receipt with null
|
||||||
* Bug is modification of inflight message properties so the failure can manifest itself in a bunch
|
* properties to marshall errors
|
||||||
* or ways, from message receipt with null properties to marshall errors
|
|
||||||
*/
|
*/
|
||||||
public class AMQ2021Test extends TestCase implements ExceptionListener, UncaughtExceptionHandler {
|
public class AMQ2021Test implements ExceptionListener, UncaughtExceptionHandler {
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(AMQ2021Test.class);
|
private static final Logger log = LoggerFactory.getLogger(AMQ2021Test.class);
|
||||||
BrokerService brokerService;
|
BrokerService brokerService;
|
||||||
ArrayList<Thread> threads = new ArrayList<Thread>();
|
ArrayList<Thread> threads = new ArrayList<Thread>();
|
||||||
Vector<Throwable> exceptions;
|
Vector<Throwable> exceptions;
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public TestName name = new TestName();
|
||||||
|
|
||||||
AMQ2021Test testCase;
|
AMQ2021Test testCase;
|
||||||
|
|
||||||
private final String ACTIVEMQ_BROKER_BIND = "tcp://localhost:0";
|
private final String ACTIVEMQ_BROKER_BIND = "tcp://localhost:0";
|
||||||
|
@ -62,14 +71,14 @@ public class AMQ2021Test extends TestCase implements ExceptionListener, Uncaught
|
||||||
|
|
||||||
private final int numMessages = 1000;
|
private final int numMessages = 1000;
|
||||||
private final int numConsumers = 2;
|
private final int numConsumers = 2;
|
||||||
private final int dlqMessages = numMessages/2;
|
private final int dlqMessages = numMessages / 2;
|
||||||
|
|
||||||
private CountDownLatch receivedLatch;
|
private CountDownLatch receivedLatch;
|
||||||
private ActiveMQTopic destination;
|
private ActiveMQTopic destination;
|
||||||
private CountDownLatch started;
|
private CountDownLatch started;
|
||||||
|
|
||||||
@Override
|
@Before
|
||||||
protected void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
Thread.setDefaultUncaughtExceptionHandler(this);
|
Thread.setDefaultUncaughtExceptionHandler(this);
|
||||||
testCase = this;
|
testCase = this;
|
||||||
|
|
||||||
|
@ -78,19 +87,18 @@ public class AMQ2021Test extends TestCase implements ExceptionListener, Uncaught
|
||||||
brokerService.setDeleteAllMessagesOnStartup(true);
|
brokerService.setDeleteAllMessagesOnStartup(true);
|
||||||
brokerService.addConnector(ACTIVEMQ_BROKER_BIND);
|
brokerService.addConnector(ACTIVEMQ_BROKER_BIND);
|
||||||
brokerService.start();
|
brokerService.start();
|
||||||
destination = new ActiveMQTopic(getName());
|
destination = new ActiveMQTopic(name.getMethodName());
|
||||||
exceptions = new Vector<Throwable>();
|
exceptions = new Vector<Throwable>();
|
||||||
|
|
||||||
CONSUMER_BROKER_URL = brokerService.getTransportConnectors().get(0).getPublishableConnectString() + CONSUMER_BROKER_URL;
|
CONSUMER_BROKER_URL = brokerService.getTransportConnectors().get(0).getPublishableConnectString() + CONSUMER_BROKER_URL;
|
||||||
PRODUCER_BROKER_URL = brokerService.getTransportConnectors().get(0).getPublishableConnectString();
|
PRODUCER_BROKER_URL = brokerService.getTransportConnectors().get(0).getPublishableConnectString();
|
||||||
|
|
||||||
receivedLatch =
|
receivedLatch = new CountDownLatch(numConsumers * (numMessages + dlqMessages));
|
||||||
new CountDownLatch(numConsumers * (numMessages + dlqMessages));
|
|
||||||
started = new CountDownLatch(1);
|
started = new CountDownLatch(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@After
|
||||||
protected void tearDown() throws Exception {
|
public void tearDown() throws Exception {
|
||||||
for (Thread t : threads) {
|
for (Thread t : threads) {
|
||||||
t.interrupt();
|
t.interrupt();
|
||||||
t.join();
|
t.join();
|
||||||
|
@ -98,9 +106,10 @@ public class AMQ2021Test extends TestCase implements ExceptionListener, Uncaught
|
||||||
brokerService.stop();
|
brokerService.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout=240000)
|
||||||
public void testConcurrentTopicResendToDLQ() throws Exception {
|
public void testConcurrentTopicResendToDLQ() throws Exception {
|
||||||
|
|
||||||
for (int i=0; i<numConsumers;i++) {
|
for (int i = 0; i < numConsumers; i++) {
|
||||||
ConsumerThread c1 = new ConsumerThread("Consumer-" + i);
|
ConsumerThread c1 = new ConsumerThread("Consumer-" + i);
|
||||||
threads.add(c1);
|
threads.add(c1);
|
||||||
c1.start();
|
c1.start();
|
||||||
|
@ -121,7 +130,7 @@ public class AMQ2021Test extends TestCase implements ExceptionListener, Uncaught
|
||||||
producer.start();
|
producer.start();
|
||||||
|
|
||||||
boolean allGood = receivedLatch.await(90, TimeUnit.SECONDS);
|
boolean allGood = receivedLatch.await(90, TimeUnit.SECONDS);
|
||||||
for (Throwable t: exceptions) {
|
for (Throwable t : exceptions) {
|
||||||
log.error("failing test with first exception", t);
|
log.error("failing test with first exception", t);
|
||||||
fail("exception during test : " + t);
|
fail("exception during test : " + t);
|
||||||
}
|
}
|
||||||
|
@ -129,10 +138,10 @@ public class AMQ2021Test extends TestCase implements ExceptionListener, Uncaught
|
||||||
|
|
||||||
assertEquals(0, exceptions.size());
|
assertEquals(0, exceptions.size());
|
||||||
|
|
||||||
for (int i=0; i<numConsumers; i++) {
|
for (int i = 0; i < numConsumers; i++) {
|
||||||
// last recovery sends message to deq so is not received again
|
// last recovery sends message to deq so is not received again
|
||||||
assertEquals(dlqMessages*2, ((ConsumerThread)threads.get(i)).recoveries);
|
assertEquals(dlqMessages * 2, ((ConsumerThread) threads.get(i)).recoveries);
|
||||||
assertEquals(numMessages + dlqMessages, ((ConsumerThread)threads.get(i)).counter);
|
assertEquals(numMessages + dlqMessages, ((ConsumerThread) threads.get(i)).counter);
|
||||||
}
|
}
|
||||||
|
|
||||||
// half of the messages for each consumer should go to the dlq but duplicates will
|
// half of the messages for each consumer should go to the dlq but duplicates will
|
||||||
|
@ -141,16 +150,15 @@ public class AMQ2021Test extends TestCase implements ExceptionListener, Uncaught
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void consumeFromDLQ( int messageCount) throws Exception {
|
private void consumeFromDLQ(int messageCount) throws Exception {
|
||||||
ActiveMQConnectionFactory connectionFactory =
|
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(CONSUMER_BROKER_URL);
|
||||||
new ActiveMQConnectionFactory(CONSUMER_BROKER_URL);
|
|
||||||
Connection connection = connectionFactory.createConnection();
|
Connection connection = connectionFactory.createConnection();
|
||||||
connection.start();
|
connection.start();
|
||||||
|
|
||||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
MessageConsumer dlqConsumer = session.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
|
MessageConsumer dlqConsumer = session.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
|
||||||
int count = 0;
|
int count = 0;
|
||||||
for (int i=0; i< messageCount; i++) {
|
for (int i = 0; i < messageCount; i++) {
|
||||||
if (dlqConsumer.receive(1000) == null) {
|
if (dlqConsumer.receive(1000) == null) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -160,7 +168,7 @@ public class AMQ2021Test extends TestCase implements ExceptionListener, Uncaught
|
||||||
}
|
}
|
||||||
|
|
||||||
public void produce(int count) throws Exception {
|
public void produce(int count) throws Exception {
|
||||||
Connection connection=null;
|
Connection connection = null;
|
||||||
try {
|
try {
|
||||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(PRODUCER_BROKER_URL);
|
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(PRODUCER_BROKER_URL);
|
||||||
connection = factory.createConnection();
|
connection = factory.createConnection();
|
||||||
|
@ -169,9 +177,9 @@ public class AMQ2021Test extends TestCase implements ExceptionListener, Uncaught
|
||||||
producer.setTimeToLive(0);
|
producer.setTimeToLive(0);
|
||||||
connection.start();
|
connection.start();
|
||||||
|
|
||||||
for (int i=0 ; i< count; i++) {
|
for (int i = 0; i < count; i++) {
|
||||||
int id = i+1;
|
int id = i + 1;
|
||||||
TextMessage message = session.createTextMessage(getName()+" Message "+ id);
|
TextMessage message = session.createTextMessage(name.getMethodName() + " Message " + id);
|
||||||
message.setIntProperty("MsgNumber", id);
|
message.setIntProperty("MsgNumber", id);
|
||||||
producer.send(message);
|
producer.send(message);
|
||||||
|
|
||||||
|
@ -192,7 +200,7 @@ public class AMQ2021Test extends TestCase implements ExceptionListener, Uncaught
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public class ConsumerThread extends Thread implements MessageListener {
|
public class ConsumerThread extends Thread implements MessageListener {
|
||||||
public long counter = 0;
|
public long counter = 0;
|
||||||
public long recoveries = 0;
|
public long recoveries = 0;
|
||||||
private Session session;
|
private Session session;
|
||||||
|
@ -201,10 +209,10 @@ public class AMQ2021Test extends TestCase implements ExceptionListener, Uncaught
|
||||||
super(threadId);
|
super(threadId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
ActiveMQConnectionFactory connectionFactory =
|
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(CONSUMER_BROKER_URL);
|
||||||
new ActiveMQConnectionFactory(CONSUMER_BROKER_URL);
|
|
||||||
Connection connection = connectionFactory.createConnection();
|
Connection connection = connectionFactory.createConnection();
|
||||||
connection.setExceptionListener(testCase);
|
connection.setExceptionListener(testCase);
|
||||||
connection.setClientID(getName());
|
connection.setClientID(getName());
|
||||||
|
@ -213,7 +221,7 @@ public class AMQ2021Test extends TestCase implements ExceptionListener, Uncaught
|
||||||
consumer.setMessageListener(this);
|
consumer.setMessageListener(this);
|
||||||
connection.start();
|
connection.start();
|
||||||
|
|
||||||
started .countDown();
|
started.countDown();
|
||||||
|
|
||||||
} catch (JMSException exception) {
|
} catch (JMSException exception) {
|
||||||
log.error("unexpected ex in consumer run", exception);
|
log.error("unexpected ex in consumer run", exception);
|
||||||
|
@ -221,11 +229,12 @@ public class AMQ2021Test extends TestCase implements ExceptionListener, Uncaught
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void onMessage(Message message) {
|
public void onMessage(Message message) {
|
||||||
try {
|
try {
|
||||||
counter++;
|
counter++;
|
||||||
int messageNumber=message.getIntProperty("MsgNumber");
|
int messageNumber = message.getIntProperty("MsgNumber");
|
||||||
if(messageNumber%2==0){
|
if (messageNumber % 2 == 0) {
|
||||||
session.recover();
|
session.recover();
|
||||||
recoveries++;
|
recoveries++;
|
||||||
} else {
|
} else {
|
||||||
|
@ -236,7 +245,7 @@ public class AMQ2021Test extends TestCase implements ExceptionListener, Uncaught
|
||||||
log.info("recoveries:" + recoveries + ", Received " + counter + ", counter'th " + message);
|
log.info("recoveries:" + recoveries + ", Received " + counter + ", counter'th " + message);
|
||||||
}
|
}
|
||||||
receivedLatch.countDown();
|
receivedLatch.countDown();
|
||||||
}catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("unexpected ex on onMessage", e);
|
log.error("unexpected ex on onMessage", e);
|
||||||
exceptions.add(e);
|
exceptions.add(e);
|
||||||
}
|
}
|
||||||
|
@ -244,14 +253,15 @@ public class AMQ2021Test extends TestCase implements ExceptionListener, Uncaught
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void onException(JMSException exception) {
|
public void onException(JMSException exception) {
|
||||||
log.info("Unexpected JMSException", exception);
|
log.info("Unexpected JMSException", exception);
|
||||||
exceptions.add(exception);
|
exceptions.add(exception);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void uncaughtException(Thread thread, Throwable exception) {
|
public void uncaughtException(Thread thread, Throwable exception) {
|
||||||
log.info("Unexpected exception from thread " + thread + ", ex: " + exception);
|
log.info("Unexpected exception from thread " + thread + ", ex: " + exception);
|
||||||
exceptions.add(exception);
|
exceptions.add(exception);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue