diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java index 97b476d386..cee846ff06 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; public abstract class MessagePacket extends PacketImpl implements MessagePacketI { @@ -34,6 +35,12 @@ public abstract class MessagePacket extends PacketImpl implements MessagePacketI return message; } + @Override + public MessagePacket replaceMessage(Message message) { + this.message = (ICoreMessage) message; + return this; + } + @Override public String getParentString() { return super.getParentString() + ", message=" + message; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacketI.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacketI.java index 161274dd43..a05fd307e5 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacketI.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacketI.java @@ -22,4 +22,7 @@ import org.apache.activemq.artemis.api.core.Message; public interface MessagePacketI { Message getMessage(); + + // This is to be used by interceptors to replace the message during processing + MessagePacketI replaceMessage(Message message); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java index dc2c4582ef..68b68b5e28 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java @@ -23,7 +23,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; public class SessionReceiveLargeMessage extends PacketImpl implements MessagePacketI { - private final Message message; + private Message message; /** * Since we receive the message before the entire message was received, @@ -55,6 +55,12 @@ public class SessionReceiveLargeMessage extends PacketImpl implements MessagePac this.largeMessageSize = largeMessageSize; } + @Override + public MessagePacketI replaceMessage(Message message) { + this.message = message; + return this; + } + public Message getLargeMessage() { return message; } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java index 869940ccea..b936cdc160 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java @@ -26,7 +26,7 @@ public class SessionSendLargeMessage extends PacketImpl implements MessagePacket /** * Used only if largeMessage */ - private final Message largeMessage; + private Message largeMessage; // Static -------------------------------------------------------- @@ -44,6 +44,12 @@ public class SessionSendLargeMessage extends PacketImpl implements MessagePacket return largeMessage; } + @Override + public MessagePacketI replaceMessage(Message message) { + this.largeMessage = message; + return this; + } + @Override public Message getMessage() { return largeMessage; diff --git a/artemis-protocols/artemis-hornetq-protocol/pom.xml b/artemis-protocols/artemis-hornetq-protocol/pom.xml index 35c4026e0f..29ea41b5cd 100644 --- a/artemis-protocols/artemis-hornetq-protocol/pom.xml +++ b/artemis-protocols/artemis-hornetq-protocol/pom.xml @@ -74,6 +74,11 @@ org.osgi osgi.cmpn + + junit + junit + test + diff --git a/artemis-protocols/artemis-hornetq-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/hornetq/PropertiesConversionTest.java b/artemis-protocols/artemis-hornetq-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/hornetq/PropertiesConversionTest.java new file mode 100644 index 0000000000..5cd810e797 --- /dev/null +++ b/artemis-protocols/artemis-hornetq-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/hornetq/PropertiesConversionTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.protocol.hornetq; + +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.MessagePacket; +import org.junit.Assert; +import org.junit.Test; + +public class PropertiesConversionTest { + + class FakeMessagePacket extends MessagePacket { + + FakeMessagePacket(ICoreMessage message) { + super(PacketImpl.SESS_SEND, message); + } + + @Override + public int expectedEncodeSize() { + return 0; + } + + @Override + public void release() { + + } + } + + @Test + public void testParallelConversions() throws Throwable { + CoreMessage coreMessage = new CoreMessage(1, 1024); + for (int i = 0; i < 10; i++) { + coreMessage.putBooleanProperty(SimpleString.toSimpleString("key1"), true); + } + coreMessage.putStringProperty(new SimpleString("_HQ_ORIG_ADDRESS"), SimpleString.toSimpleString("hqOne")); + coreMessage.putStringProperty(new SimpleString("_AMQ_ORIG_QUEUE"), SimpleString.toSimpleString("amqOne")); + + int threads = 1000; + int conversions = 100; + Thread[] t = new Thread[threads]; + CyclicBarrier barrier = new CyclicBarrier(threads); + HQPropertiesConversionInterceptor hq = new HQPropertiesConversionInterceptor(true); + HQPropertiesConversionInterceptor amq = new HQPropertiesConversionInterceptor(true); + AtomicInteger errors = new AtomicInteger(0); + AtomicInteger counts = new AtomicInteger(0); + for (int i = 0; i < threads; i++) { + t[i] = new Thread() { + + @Override + public void run() { + try { + for (int i = 0; i < conversions; i++) { + counts.incrementAndGet(); + FakeMessagePacket packetSend = new FakeMessagePacket(coreMessage); + if (i == 0) { + barrier.await(); + } + hq.intercept(packetSend, null); + FakeMessagePacket packetRec = new FakeMessagePacket(coreMessage); + amq.intercept(packetRec, null); + if (i > conversions / 2) { + // I only validate half of the messages + // to give it a chance of Races and Exceptions + // that could happen from reusing the same message on these conversions + Assert.assertNotSame(packetRec.getMessage(), coreMessage); + Assert.assertNotSame(packetSend.getMessage(), coreMessage); + } + } + } catch (Throwable e) { + errors.incrementAndGet(); + e.printStackTrace(); + } + } + }; + t[i].start(); + } + + for (Thread thread : t) { + thread.join(); + } + + Assert.assertEquals(threads * conversions, counts.get()); + + Assert.assertEquals(0, errors.get()); + } +} diff --git a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQPropertiesConversionInterceptor.java b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQPropertiesConversionInterceptor.java index 1f14ba8484..05f024fcd8 100644 --- a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQPropertiesConversionInterceptor.java +++ b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQPropertiesConversionInterceptor.java @@ -19,6 +19,8 @@ package org.apache.activemq.artemis.core.protocol.hornetq; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Interceptor; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal; import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.MessagePacketI; import org.apache.activemq.artemis.core.protocol.hornetq.util.HQPropertiesConverter; @@ -42,11 +44,20 @@ public class HQPropertiesConversionInterceptor implements Interceptor { } private void handleReceiveMessage(MessagePacketI messagePacket) { - if (replaceHQ) { - HQPropertiesConverter.replaceHQProperties(messagePacket.getMessage()); - } else { - HQPropertiesConverter.replaceAMQProperties(messagePacket.getMessage()); + Message copy = messagePacket.getMessage(); + + // there's no need to copy client messages, only the server ones are problematic + if (!(copy instanceof ClientMessageInternal)) { + copy = copy.copy(); + messagePacket.replaceMessage(copy); } + + if (replaceHQ) { + HQPropertiesConverter.replaceHQProperties(copy); + } else { + HQPropertiesConverter.replaceAMQProperties(copy); + } + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java index 153c583300..9c05df5c17 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java @@ -355,6 +355,8 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe @Override public Message copy() { + new Exception("hmmmmm").printStackTrace(); + System.exit(-1); SequentialFile newfile = storageManager.createFileForLargeMessage(messageID, durable); Message newMessage = new LargeServerMessageImpl(this, properties, newfile, messageID);