ARTEMIS-2361 Bridge should make a copy of messages

and some improvements into ARTEMIS-2355
This commit is contained in:
Clebert Suconic 2019-05-29 13:37:45 -04:00
parent 1ccb688eec
commit 4fa2e75cdc
4 changed files with 33 additions and 3 deletions

View File

@ -550,7 +550,15 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
}
/* Hook for processing message before forwarding */
protected Message beforeForward(final Message message, final SimpleString forwardingAddress) {
protected Message beforeForward(Message message, final SimpleString forwardingAddress) {
message = message.copy();
return beforeForwardingNoCopy(message, forwardingAddress);
}
/** ClusterConnectionBridge already makes a copy of the message.
* So I needed I hook where the message is not copied. */
protected Message beforeForwardingNoCopy(Message message, SimpleString forwardingAddress) {
if (useDuplicateDetection) {
// We keep our own DuplicateID for the Bridge, so bouncing back and forth will work fine
byte[] bytes = getDuplicateBytes(nodeUUID, message.getMessageID());
@ -577,6 +585,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
break;
}
message.messageChanged();
if (transformer != null) {
final Message transformedMessage = transformer.transform(message);
if (transformedMessage != message) {

View File

@ -190,7 +190,7 @@ public class ClusterConnectionBridge extends BridgeImpl {
messageCopy.putExtraBytesProperty(Message.HDR_ROUTE_TO_IDS, queueIds);
messageCopy = super.beforeForward(messageCopy, forwardingAddress);
messageCopy = super.beforeForwardingNoCopy(messageCopy, forwardingAddress);
return messageCopy;
}

View File

@ -125,6 +125,8 @@ public class DivertImpl implements Divert {
if (transformer != null) {
copy = transformer.transform(copy);
}
copy.messageChanged();
} else {
copy = message;
}

View File

@ -806,6 +806,7 @@ public class BridgeTest extends ActiveMQTestBase {
final String testAddress = "testAddress";
final String queueName0 = "queue0";
final String secondQueue = "queue1";
final String forwardAddress = "forwardAddress";
final String queueName1 = "forwardQueue";
@ -827,6 +828,8 @@ public class BridgeTest extends ActiveMQTestBase {
CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration().setAddress(testAddress).setName(queueName0);
List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<>();
queueConfigs0.add(queueConfig0);
queueConfig0 = new CoreQueueConfiguration().setAddress(testAddress).setName(secondQueue);
queueConfigs0.add(queueConfig0);
server0.getConfiguration().setQueueConfigurations(queueConfigs0);
server0.start();
@ -882,7 +885,8 @@ public class BridgeTest extends ActiveMQTestBase {
tx.commit();
}
Thread.sleep(1000);
Thread.sleep(100);
ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
@ -909,6 +913,20 @@ public class BridgeTest extends ActiveMQTestBase {
Assert.assertNull(consumer1.receiveImmediate());
ClientConsumer otherConsumer = session0.createConsumer(secondQueue);
session0.start();
for (int i = 0; i < numMessages; i++) {
ClientMessage message = otherConsumer.receive(5000);
Assert.assertNotNull(message);
// This is validating the Bridge is not messing up with the original message
// and should make a copy of the message before sending it
Assert.assertEquals(2, message.getPropertyNames().size());
Assert.assertEquals(i, message.getIntProperty(propKey).intValue());
Assert.assertEquals(new SimpleString("monkey" + i), message.getSimpleStringProperty(selectorKey));
message.acknowledge();
}
consumer1.close();
session1.deleteQueue(queueName1);