https://issues.apache.org/jira/browse/AMQ-4285 - tidied up test to be tolerant of pitfalls of request/reply with temps over a network

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1440988 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2013-01-31 14:41:35 +00:00
parent 0f3fdd7aa7
commit 7e87cf2685
3 changed files with 75 additions and 20 deletions

View File

@ -589,7 +589,7 @@ public abstract class AbstractRegion implements Region {
try {
lookup(consumerExchange.getConnectionContext(), control.getDestination(),false).wakeup();
} catch (Exception e) {
LOG.warn("failed to deliver consumerControl to destination: " + control.getDestination(), e);
LOG.warn("failed to deliver post consumerControl dispatch-wakeup, to destination: " + control.getDestination(), e);
}
}
}

View File

@ -85,6 +85,12 @@ public class TempQueue extends Queue{
@Override
public void dispose(ConnectionContext context) throws IOException {
if (this.destinationStatistics.getMessages().getCount() > 0) {
LOG.info(getActiveMQDestination().getQualifiedName()
+ " on dispose, purge of "
+ this.destinationStatistics.getMessages().getCount() + " pending messages: " + messages);
// we may want to capture these message ids in an advisory
}
try {
purge();
} catch (Exception e) {

View File

@ -17,10 +17,12 @@
package org.apache.activemq.network;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
@ -33,7 +35,10 @@ import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.DestinationDoesNotExistException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.AbstractDeadLetterStrategy;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.failover.FailoverTransport;
@ -56,8 +61,10 @@ public class NetworkFailoverTest extends TestCase {
protected BrokerService remoteBroker;
protected Session localSession;
protected Session remoteSession;
protected ActiveMQQueue included = new ActiveMQQueue("include.test.foo");
protected String consumerName = "durableSubs";
protected ActiveMQQueue included=new ActiveMQQueue("include.test.foo");
private AtomicInteger replyToNonExistDest = new AtomicInteger(0);
private AtomicInteger roundTripComplete = new AtomicInteger(0);
private AtomicInteger remoteDLQCount = new AtomicInteger(0);
public void testRequestReply() throws Exception {
final MessageProducer remoteProducer = remoteSession.createProducer(null);
@ -65,15 +72,25 @@ public class NetworkFailoverTest extends TestCase {
remoteConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message msg) {
final TextMessage textMsg = (TextMessage)msg;
try {
TextMessage textMsg = (TextMessage) msg;
String payload = "REPLY: " + textMsg.getText();
String payload = "REPLY: " + textMsg.getText() + ", " + textMsg.getJMSMessageID();
Destination replyTo;
replyTo = msg.getJMSReplyTo();
textMsg.clearBody();
textMsg.setText(payload);
LOG.info("*** Sending response: {}", textMsg.getText());
remoteProducer.send(replyTo, textMsg);
LOG.info("replied with: " + textMsg.getJMSMessageID());
} catch (DestinationDoesNotExistException expected) {
// been removed but not yet recreated
replyToNonExistDest.incrementAndGet();
try {
LOG.info("NED: " + textMsg.getJMSMessageID());
} catch (JMSException e) {
e.printStackTrace();
};
} catch (Exception e) {
LOG.warn("*** Responder listener caught exception: ", e);
e.printStackTrace();
@ -86,20 +103,52 @@ public class NetworkFailoverTest extends TestCase {
requestProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
MessageConsumer requestConsumer = localSession.createConsumer(tempQueue);
// track remote dlq for forward failures
MessageConsumer dlqconsumer = remoteSession.createConsumer(new ActiveMQQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME));
dlqconsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
LOG.info("dlq " + message.getJMSMessageID());
} catch (JMSException e) {
e.printStackTrace();
}
remoteDLQCount.incrementAndGet();
}
});
// allow for consumer infos to perculate arround
Thread.sleep(2000);
for (int i = 0; i < MESSAGE_COUNT; i++) {
long done = System.currentTimeMillis() + (MESSAGE_COUNT * 6000);
int i = 0;
while (MESSAGE_COUNT > roundTripComplete.get() + remoteDLQCount.get() + replyToNonExistDest.get()
&& done > System.currentTimeMillis()) {
if ( i < MESSAGE_COUNT) {
String payload = "test msg " + i;
i++;
TextMessage msg = localSession.createTextMessage(payload);
msg.setJMSReplyTo(tempQueue);
requestProducer.send(msg);
LOG.info("*** Failing over for iteration: #{}", i);
((FailoverTransport) ((TransportFilter) ((TransportFilter) ((ActiveMQConnection) localConnection).getTransport()).getNext()).getNext())
LOG.info("Sent: " + msg.getJMSMessageID() +", Failing over");
((FailoverTransport) ((TransportFilter) ((TransportFilter)
((ActiveMQConnection) localConnection)
.getTransport()).getNext()).getNext())
.handleTransportFailure(new IOException("Forcing failover from test"));
TextMessage result = (TextMessage) requestConsumer.receive(10000);
assertNotNull(result);
LOG.info("*** Iteration #{} got response: {}", i, result.getText());
}
TextMessage result = (TextMessage)requestConsumer.receive(5000);
if (result != null) {
LOG.info("Got reply: " + result.getJMSMessageID() + ", " + result.getText());
roundTripComplete.incrementAndGet();
}
}
LOG.info("complete: " + roundTripComplete.get()
+ ", remoteDLQCount: " + remoteDLQCount.get()
+ ", replyToNonExistDest: " + replyToNonExistDest.get());
assertEquals("complete:" + roundTripComplete.get()
+ ", remoteDLQCount: " + remoteDLQCount.get()
+ ", replyToNonExistDest: " + replyToNonExistDest.get(),
MESSAGE_COUNT, roundTripComplete.get() + remoteDLQCount.get() + replyToNonExistDest.get() );
}
@Override
@ -132,21 +181,21 @@ public class NetworkFailoverTest extends TestCase {
remoteBroker = createRemoteBroker();
remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
remoteBroker.setCacheTempDestinations(true);
remoteBroker.start();
localBroker = createLocalBroker();
localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
localBroker.setCacheTempDestinations(true);
localBroker.start();
String localURI = "tcp://localhost:61616";
String remoteURI = "tcp://localhost:61617";
ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory("failover:(" + localURI + "," + remoteURI
+ ")?randomize=false&backup=true&trackMessages=true");
// ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI);
ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory("failover:("+localURI+","+remoteURI+")?randomize=false&backup=false&trackMessages=true");
localConnection = fac.createConnection();
localConnection.setClientID("local");
localConnection.start();
fac = new ActiveMQConnectionFactory("failover:(" + remoteURI + "," + localURI + ")?randomize=false&backup=true&trackMessages=true");
fac = new ActiveMQConnectionFactory("failover:("+remoteURI + ","+localURI+")?randomize=false&backup=false&trackMessages=true");
fac.setWatchTopicAdvisories(false);
remoteConnection = fac.createConnection();
remoteConnection.setClientID("remote");