ARTEMIS-1513 fix AMQP + core bridge

This commit is contained in:
Justin Bertram 2017-11-20 09:30:52 -06:00 committed by Clebert Suconic
parent b27ed5de16
commit 40aa3f5821
4 changed files with 57 additions and 5 deletions

View File

@ -1131,4 +1131,12 @@ public class AMQPMessage extends RefCountMessage {
this.durable = true; // it's coming from the journal, so it's durable
parseHeaders();
}
@Override
public String toString() {
return "AMQPMessage [durable=" + isDurable() +
", messageID=" + getMessageID() +
", address=" + getAddress() +
"]";
}
}

View File

@ -511,7 +511,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
}
/* Hook for processing message before forwarding */
protected Message beforeForward(final Message message) {
protected Message beforeForward(final Message message, final SimpleString forwardingAddress) {
if (useDuplicateDetection) {
// We keep our own DuplicateID for the Bridge, so bouncing back and forth will work fine
byte[] bytes = getDuplicateBytes(nodeUUID, message.getMessageID());
@ -519,6 +519,11 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
message.putExtraBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID, bytes);
}
if (forwardingAddress != null) {
// for AMQP messages this modification will be transient
message.setAddress(forwardingAddress);
}
if (transformer != null) {
final Message transformedMessage = transformer.transform(message);
if (transformedMessage != message) {
@ -568,8 +573,6 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
refs.put(ref.getMessage().getMessageID(), ref);
}
final Message message = beforeForward(ref.getMessage());
final SimpleString dest;
if (forwardingAddress != null) {
@ -579,6 +582,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
dest = ref.getMessage().getAddressSimpleString();
}
final Message message = beforeForward(ref.getMessage(), dest);
pendingAcks.countUp();
try {

View File

@ -150,7 +150,7 @@ public class ClusterConnectionBridge extends BridgeImpl {
}
@Override
protected Message beforeForward(final Message message) {
protected Message beforeForward(final Message message, final SimpleString forwardingAddress) {
// We make a copy of the message, then we strip out the unwanted routing id headers and leave
// only
// the one pertinent for the address node - this is important since different queues on different
@ -182,7 +182,7 @@ public class ClusterConnectionBridge extends BridgeImpl {
messageCopy.putExtraBytesProperty(Message.HDR_ROUTE_TO_IDS, queueIds);
messageCopy = super.beforeForward(messageCopy);
messageCopy = super.beforeForward(messageCopy, null);
return messageCopy;
}

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.amqp;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
@ -38,6 +39,7 @@ import org.slf4j.LoggerFactory;
import javax.jms.Topic;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@ -112,6 +114,43 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
connection.close();
}
@Test(timeout = 60000)
public void testCoreBridge() throws Exception {
server.getRemotingService().createAcceptor("acceptor", "vm://0").start();
server.getConfiguration().addConnectorConfiguration("connector", "vm://0");
server.deployBridge(new BridgeConfiguration()
.setName(getTestName())
.setQueueName(getQueueName())
.setForwardingAddress(getQueueName(1))
.setConfirmationWindowSize(10)
.setStaticConnectors(Arrays.asList("connector")));
sendMessages(getQueueName(), 1);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getQueueName(1));
Queue queueView = getProxyToQueue(getQueueName());
assertEquals(1, queueView.getConsumerCount());
assertEquals(0, queueView.getMessageCount());
queueView = getProxyToQueue(getQueueName(1));
assertEquals(1, queueView.getConsumerCount());
assertEquals(1, queueView.getMessageCount());
receiver.flow(1);
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
message.accept();
receiver.close();
assertEquals(0, queueView.getMessageCount());
connection.close();
}
@Test(timeout = 60000)
public void testMessageDurableFalse() throws Exception {
sendMessages(getQueueName(), 1, false);