diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java index b10698bdcf..d2c91c8c21 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -1327,7 +1327,11 @@ public class TransportConnection implements Connection, Task, CommandVisitor { setDuplexNetworkConnectorId(duplexNetworkConnectorId); } Transport localTransport = NetworkBridgeFactory.createLocalTransport(broker); - Transport remoteBridgeTransport = new ResponseCorrelator(transport); + Transport remoteBridgeTransport = transport; + if (! (remoteBridgeTransport instanceof ResponseCorrelator)) { + // the vm transport case is already wrapped + remoteBridgeTransport = new ResponseCorrelator(remoteBridgeTransport); + } String duplexName = localTransport.toString(); if (duplexName.contains("#")) { duplexName = duplexName.substring(duplexName.lastIndexOf("#")); diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index b8024f551b..b6404d5a84 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -603,7 +603,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } else { if (isDuplex()) { if (LOG.isTraceEnabled()) { - LOG.trace(configuration.getBrokerName() + " duplex command type: " + command.getCommandId()); + LOG.trace(configuration.getBrokerName() + " duplex command type: " + command.getDataStructureType()); } if (command.isMessage()) { final ActiveMQMessage message = (ActiveMQMessage) command; @@ -976,6 +976,16 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br + message.getDestination() + ", brokerPath: " + Arrays.toString(message.getBrokerPath()) + ", message: " + message); } + if (isDuplex() && AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) { + try { + // never request b/c they are eventually acked async + remoteBroker.oneway(message); + } finally { + sub.decrementOutstandingResponses(); + } + return; + } + if (message.isPersistent() || configuration.isAlwaysSyncSend()) { // The message was not sent using async send, so we should only diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java index 204887f810..a1b8ee1dc1 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java @@ -19,6 +19,7 @@ package org.apache.activemq.network; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; @@ -220,6 +221,7 @@ public class BrokerNetworkWithStuckMessagesTest { for (int i = 0; i < receiveNumMessages; ++i) { Message message1 = receiveMessage(connection2, 20000); assertNotNull(message1); + LOG.info("on remote, got: " + message1.getMessageId()); connection2.send(createAck(consumerInfo2, message1, 1, MessageAck.INDIVIDUAL_ACK_TYPE)); } @@ -261,6 +263,13 @@ public class BrokerNetworkWithStuckMessagesTest { connection2.send(connectionInfo2.createRemoveCommand()); // There should now be 5 messages stuck on the remote broker + assertTrue("correct stuck message count", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + Object[] result = browseQueueWithJmx(remoteBroker); + return 5 == result.length; + } + })); messages = browseQueueWithJmx(remoteBroker); assertEquals(5, messages.length); @@ -303,6 +312,7 @@ public class BrokerNetworkWithStuckMessagesTest { int counter = 1; for (; counter < receiveNumMessages; counter++) { message1 = receiveMessage(connection1); + LOG.info("local consume of: " + (message1 != null ? message1.getMessageId() : " null")); connection1.send(createAck(consumerInfo1, message1, 1, MessageAck.INDIVIDUAL_ACK_TYPE)); } // Ensure that 5 messages were received