From 7e87cf26851c149c84d1fe8ef633f371aa167ffd Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Thu, 31 Jan 2013 14:41:35 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-4285 - tidied up test to be tolerant of pitfalls of request/reply with temps over a network git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1440988 13f79535-47bb-0310-9956-ffa450edef68 --- .../broker/region/AbstractRegion.java | 2 +- .../activemq/broker/region/TempQueue.java | 6 ++ .../activemq/network/NetworkFailoverTest.java | 87 +++++++++++++++---- 3 files changed, 75 insertions(+), 20 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java index a2ec83eba8..6f5c5414d6 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java @@ -589,7 +589,7 @@ public abstract class AbstractRegion implements Region { try { lookup(consumerExchange.getConnectionContext(), control.getDestination(),false).wakeup(); } catch (Exception e) { - LOG.warn("failed to deliver consumerControl to destination: " + control.getDestination(), e); + LOG.warn("failed to deliver post consumerControl dispatch-wakeup, to destination: " + control.getDestination(), e); } } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempQueue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempQueue.java index a9b354b8e8..aa0315dc7e 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempQueue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempQueue.java @@ -85,6 +85,12 @@ public class TempQueue extends Queue{ @Override public void dispose(ConnectionContext context) throws IOException { + if (this.destinationStatistics.getMessages().getCount() > 0) { + LOG.info(getActiveMQDestination().getQualifiedName() + + " on dispose, purge of " + + this.destinationStatistics.getMessages().getCount() + " pending messages: " + messages); + // we may want to capture these message ids in an advisory + } try { purge(); } catch (Exception e) { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkFailoverTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkFailoverTest.java index 2c869a2492..a72d7f3ed3 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkFailoverTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkFailoverTest.java @@ -17,10 +17,12 @@ package org.apache.activemq.network; import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; @@ -33,7 +35,10 @@ import junit.framework.TestCase; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.DestinationDoesNotExistException; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.AbstractDeadLetterStrategy; +import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.transport.TransportFilter; import org.apache.activemq.transport.failover.FailoverTransport; @@ -56,8 +61,10 @@ public class NetworkFailoverTest extends TestCase { protected BrokerService remoteBroker; protected Session localSession; protected Session remoteSession; - protected ActiveMQQueue included = new ActiveMQQueue("include.test.foo"); - protected String consumerName = "durableSubs"; + protected ActiveMQQueue included=new ActiveMQQueue("include.test.foo"); + private AtomicInteger replyToNonExistDest = new AtomicInteger(0); + private AtomicInteger roundTripComplete = new AtomicInteger(0); + private AtomicInteger remoteDLQCount = new AtomicInteger(0); public void testRequestReply() throws Exception { final MessageProducer remoteProducer = remoteSession.createProducer(null); @@ -65,15 +72,25 @@ public class NetworkFailoverTest extends TestCase { remoteConsumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message msg) { + final TextMessage textMsg = (TextMessage)msg; try { - TextMessage textMsg = (TextMessage) msg; - String payload = "REPLY: " + textMsg.getText(); + String payload = "REPLY: " + textMsg.getText() + ", " + textMsg.getJMSMessageID(); Destination replyTo; replyTo = msg.getJMSReplyTo(); textMsg.clearBody(); textMsg.setText(payload); LOG.info("*** Sending response: {}", textMsg.getText()); remoteProducer.send(replyTo, textMsg); + LOG.info("replied with: " + textMsg.getJMSMessageID()); + + } catch (DestinationDoesNotExistException expected) { + // been removed but not yet recreated + replyToNonExistDest.incrementAndGet(); + try { + LOG.info("NED: " + textMsg.getJMSMessageID()); + } catch (JMSException e) { + e.printStackTrace(); + }; } catch (Exception e) { LOG.warn("*** Responder listener caught exception: ", e); e.printStackTrace(); @@ -86,20 +103,52 @@ public class NetworkFailoverTest extends TestCase { requestProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); MessageConsumer requestConsumer = localSession.createConsumer(tempQueue); + // track remote dlq for forward failures + MessageConsumer dlqconsumer = remoteSession.createConsumer(new ActiveMQQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME)); + dlqconsumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + try { + LOG.info("dlq " + message.getJMSMessageID()); + } catch (JMSException e) { + e.printStackTrace(); + } + remoteDLQCount.incrementAndGet(); + } + }); + // allow for consumer infos to perculate arround Thread.sleep(2000); - for (int i = 0; i < MESSAGE_COUNT; i++) { - String payload = "test msg " + i; - TextMessage msg = localSession.createTextMessage(payload); - msg.setJMSReplyTo(tempQueue); - requestProducer.send(msg); - LOG.info("*** Failing over for iteration: #{}", i); - ((FailoverTransport) ((TransportFilter) ((TransportFilter) ((ActiveMQConnection) localConnection).getTransport()).getNext()).getNext()) - .handleTransportFailure(new IOException("Forcing failover from test")); - TextMessage result = (TextMessage) requestConsumer.receive(10000); - assertNotNull(result); - LOG.info("*** Iteration #{} got response: {}", i, result.getText()); + long done = System.currentTimeMillis() + (MESSAGE_COUNT * 6000); + int i = 0; + while (MESSAGE_COUNT > roundTripComplete.get() + remoteDLQCount.get() + replyToNonExistDest.get() + && done > System.currentTimeMillis()) { + if ( i < MESSAGE_COUNT) { + String payload = "test msg " + i; + i++; + TextMessage msg = localSession.createTextMessage(payload); + msg.setJMSReplyTo(tempQueue); + requestProducer.send(msg); + LOG.info("Sent: " + msg.getJMSMessageID() +", Failing over"); + ((FailoverTransport) ((TransportFilter) ((TransportFilter) + ((ActiveMQConnection) localConnection) + .getTransport()).getNext()).getNext()) + .handleTransportFailure(new IOException("Forcing failover from test")); + } + TextMessage result = (TextMessage)requestConsumer.receive(5000); + if (result != null) { + LOG.info("Got reply: " + result.getJMSMessageID() + ", " + result.getText()); + roundTripComplete.incrementAndGet(); + } } + + LOG.info("complete: " + roundTripComplete.get() + + ", remoteDLQCount: " + remoteDLQCount.get() + + ", replyToNonExistDest: " + replyToNonExistDest.get()); + assertEquals("complete:" + roundTripComplete.get() + + ", remoteDLQCount: " + remoteDLQCount.get() + + ", replyToNonExistDest: " + replyToNonExistDest.get(), + MESSAGE_COUNT, roundTripComplete.get() + remoteDLQCount.get() + replyToNonExistDest.get() ); } @Override @@ -132,21 +181,21 @@ public class NetworkFailoverTest extends TestCase { remoteBroker = createRemoteBroker(); remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages); + remoteBroker.setCacheTempDestinations(true); remoteBroker.start(); localBroker = createLocalBroker(); localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages); + localBroker.setCacheTempDestinations(true); localBroker.start(); String localURI = "tcp://localhost:61616"; String remoteURI = "tcp://localhost:61617"; - ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory("failover:(" + localURI + "," + remoteURI - + ")?randomize=false&backup=true&trackMessages=true"); - // ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI); + ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory("failover:("+localURI+","+remoteURI+")?randomize=false&backup=false&trackMessages=true"); localConnection = fac.createConnection(); localConnection.setClientID("local"); localConnection.start(); - fac = new ActiveMQConnectionFactory("failover:(" + remoteURI + "," + localURI + ")?randomize=false&backup=true&trackMessages=true"); + fac = new ActiveMQConnectionFactory("failover:("+remoteURI + ","+localURI+")?randomize=false&backup=false&trackMessages=true"); fac.setWatchTopicAdvisories(false); remoteConnection = fac.createConnection(); remoteConnection.setClientID("remote");