diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index d8b67ea2e3..a03ff10508 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -1131,4 +1131,12 @@ public class AMQPMessage extends RefCountMessage { this.durable = true; // it's coming from the journal, so it's durable parseHeaders(); } + + @Override + public String toString() { + return "AMQPMessage [durable=" + isDurable() + + ", messageID=" + getMessageID() + + ", address=" + getAddress() + + "]"; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java index 39a2b2de46..3aa82a1bf8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java @@ -511,7 +511,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled } /* Hook for processing message before forwarding */ - protected Message beforeForward(final Message message) { + protected Message beforeForward(final Message message, final SimpleString forwardingAddress) { if (useDuplicateDetection) { // We keep our own DuplicateID for the Bridge, so bouncing back and forth will work fine byte[] bytes = getDuplicateBytes(nodeUUID, message.getMessageID()); @@ -519,6 +519,11 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled message.putExtraBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID, bytes); } + if (forwardingAddress != null) { + // for AMQP messages this modification will be transient + message.setAddress(forwardingAddress); + } + if (transformer != null) { final Message transformedMessage = transformer.transform(message); if (transformedMessage != message) { @@ -568,8 +573,6 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled refs.put(ref.getMessage().getMessageID(), ref); } - final Message message = beforeForward(ref.getMessage()); - final SimpleString dest; if (forwardingAddress != null) { @@ -579,6 +582,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled dest = ref.getMessage().getAddressSimpleString(); } + final Message message = beforeForward(ref.getMessage(), dest); + pendingAcks.countUp(); try { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java index cf24d0fac5..5f70d2828c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java @@ -150,7 +150,7 @@ public class ClusterConnectionBridge extends BridgeImpl { } @Override - protected Message beforeForward(final Message message) { + protected Message beforeForward(final Message message, final SimpleString forwardingAddress) { // We make a copy of the message, then we strip out the unwanted routing id headers and leave // only // the one pertinent for the address node - this is important since different queues on different @@ -182,7 +182,7 @@ public class ClusterConnectionBridge extends BridgeImpl { messageCopy.putExtraBytesProperty(Message.HDR_ROUTE_TO_IDS, queueIds); - messageCopy = super.beforeForward(messageCopy); + messageCopy = super.beforeForward(messageCopy, null); return messageCopy; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java index 24ca845172..a35541cdb2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.amqp; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.BridgeConfiguration; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; @@ -38,6 +39,7 @@ import org.slf4j.LoggerFactory; import javax.jms.Topic; import java.util.ArrayList; +import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -112,6 +114,43 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { connection.close(); } + @Test(timeout = 60000) + public void testCoreBridge() throws Exception { + server.getRemotingService().createAcceptor("acceptor", "vm://0").start(); + server.getConfiguration().addConnectorConfiguration("connector", "vm://0"); + server.deployBridge(new BridgeConfiguration() + .setName(getTestName()) + .setQueueName(getQueueName()) + .setForwardingAddress(getQueueName(1)) + .setConfirmationWindowSize(10) + .setStaticConnectors(Arrays.asList("connector"))); + sendMessages(getQueueName(), 1); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(getQueueName(1)); + + Queue queueView = getProxyToQueue(getQueueName()); + assertEquals(1, queueView.getConsumerCount()); + assertEquals(0, queueView.getMessageCount()); + + queueView = getProxyToQueue(getQueueName(1)); + assertEquals(1, queueView.getConsumerCount()); + assertEquals(1, queueView.getMessageCount()); + + receiver.flow(1); + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + message.accept(); + receiver.close(); + + assertEquals(0, queueView.getMessageCount()); + + connection.close(); + } + @Test(timeout = 60000) public void testMessageDurableFalse() throws Exception { sendMessages(getQueueName(), 1, false);