ARTEMIS-3578 Save SimpleString duplication and long[] allocation while moving Core messages
This commit is contained in:
parent
185236f74d
commit
7e6373d4df
|
@ -470,12 +470,13 @@ public interface Message {
|
|||
// only valid probably on AMQP
|
||||
}
|
||||
|
||||
default void referenceOriginalMessage(final Message original, String originalQueue) {
|
||||
default void referenceOriginalMessage(final Message original, final SimpleString originalQueue) {
|
||||
setBrokerProperty(Message.HDR_ORIGINAL_QUEUE, originalQueue);
|
||||
setBrokerProperty(Message.HDR_ORIGINAL_ADDRESS, original.getAddress());
|
||||
setBrokerProperty(Message.HDR_ORIGINAL_ADDRESS, original.getAddressSimpleString());
|
||||
setBrokerProperty(Message.HDR_ORIG_MESSAGE_ID, original.getMessageID());
|
||||
if (original.getRoutingType() != null) {
|
||||
setBrokerProperty(Message.HDR_ORIG_ROUTING_TYPE, original.getRoutingType().getType());
|
||||
final RoutingType routingType = original.getRoutingType();
|
||||
if (routingType != null) {
|
||||
setBrokerProperty(Message.HDR_ORIG_ROUTING_TYPE, routingType.getType());
|
||||
}
|
||||
|
||||
// reset expiry
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
|||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||
import org.apache.activemq.artemis.core.message.LargeBodyReader;
|
||||
|
@ -643,7 +644,7 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
|
|||
}
|
||||
|
||||
@Override
|
||||
public void referenceOriginalMessage(final Message original, String originalQueue) {
|
||||
public void referenceOriginalMessage(final Message original, final SimpleString originalQueue) {
|
||||
|
||||
super.referenceOriginalMessage(original, originalQueue);
|
||||
|
||||
|
|
|
@ -499,9 +499,9 @@ public class TestConversions extends Assert {
|
|||
for (int i = 0; i < 100; i++) {
|
||||
encodedMessage.setMessageID(333L);
|
||||
if (i % 3 == 0) {
|
||||
encodedMessage.referenceOriginalMessage(encodedMessage, "SOME-OTHER-QUEUE-DOES-NOT-MATTER-WHAT");
|
||||
encodedMessage.referenceOriginalMessage(encodedMessage, SimpleString.toSimpleString("SOME-OTHER-QUEUE-DOES-NOT-MATTER-WHAT"));
|
||||
} else {
|
||||
encodedMessage.referenceOriginalMessage(encodedMessage, "XXX");
|
||||
encodedMessage.referenceOriginalMessage(encodedMessage, SimpleString.toSimpleString("XXX"));
|
||||
}
|
||||
encodedMessage.putStringProperty("another " + i, "value " + i);
|
||||
encodedMessage.messageChanged();
|
||||
|
|
|
@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
|||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||
import org.apache.activemq.artemis.core.message.LargeBodyReader;
|
||||
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
|
||||
|
@ -265,7 +266,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar
|
|||
}
|
||||
|
||||
@Override
|
||||
public void referenceOriginalMessage(final Message original, String originalQueue) {
|
||||
public void referenceOriginalMessage(final Message original, final SimpleString originalQueue) {
|
||||
|
||||
super.referenceOriginalMessage(original, originalQueue);
|
||||
|
||||
|
|
|
@ -105,7 +105,7 @@ public class DivertImpl implements Divert {
|
|||
copy = message.copy(id);
|
||||
|
||||
// This will set the original MessageId, and the original address
|
||||
copy.referenceOriginalMessage(message, this.getUniqueName().toString());
|
||||
copy.referenceOriginalMessage(message, this.getUniqueName());
|
||||
|
||||
copy.setAddress(forwardAddress);
|
||||
|
||||
|
|
|
@ -2709,11 +2709,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
}
|
||||
}
|
||||
|
||||
if (targetQueue != null) {
|
||||
move(SimpleString.toSimpleString(originalMessageAddress), tx, ref, false, false, targetQueue.longValue());
|
||||
} else {
|
||||
move(SimpleString.toSimpleString(originalMessageAddress), tx, ref, false, false);
|
||||
}
|
||||
move(SimpleString.toSimpleString(originalMessageAddress), tx, ref, false, false, targetQueue);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
@ -3386,7 +3382,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
final MessageReference ref,
|
||||
final boolean expiry,
|
||||
final boolean rejectDuplicate,
|
||||
final long... queueIDs) throws Exception {
|
||||
final Long queueID) throws Exception {
|
||||
Message copyMessage = makeCopy(ref, expiry, toAddress);
|
||||
|
||||
Object originalRoutingType = ref.getMessage().getBrokerProperty(Message.HDR_ORIG_ROUTING_TYPE);
|
||||
|
@ -3394,12 +3390,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
copyMessage.setRoutingType(RoutingType.getType((Byte) originalRoutingType));
|
||||
}
|
||||
|
||||
if (queueIDs != null && queueIDs.length > 0) {
|
||||
ByteBuffer buffer = ByteBuffer.allocate(8 * queueIDs.length);
|
||||
for (long id : queueIDs) {
|
||||
buffer.putLong(id);
|
||||
}
|
||||
copyMessage.putBytesProperty(Message.HDR_ROUTE_TO_IDS.toString(), buffer.array());
|
||||
if (queueID != null) {
|
||||
final byte[] encodedBuffer = new byte[Long.BYTES];
|
||||
ByteBuffer buffer = ByteBuffer.wrap(encodedBuffer);
|
||||
buffer.putLong(0, queueID);
|
||||
copyMessage.putBytesProperty(Message.HDR_ROUTE_TO_IDS.toString(), encodedBuffer);
|
||||
}
|
||||
|
||||
postOffice.route(copyMessage, tx, false, rejectDuplicate);
|
||||
|
@ -3555,7 +3550,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
}
|
||||
|
||||
if (copyOriginalHeaders) {
|
||||
copy.referenceOriginalMessage(message, ref.getQueue().getName().toString());
|
||||
copy.referenceOriginalMessage(message, ref.getQueue().getName());
|
||||
}
|
||||
|
||||
copy.setExpiration(0);
|
||||
|
@ -3584,7 +3579,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoBindings(expiryAddress);
|
||||
acknowledge(tx, ref, AckReason.EXPIRED, null);
|
||||
} else {
|
||||
move(expiryAddress, tx, ref, true, false);
|
||||
move(expiryAddress, tx, ref, true, false, null);
|
||||
}
|
||||
} else {
|
||||
if (!printErrorExpiring) {
|
||||
|
|
Loading…
Reference in New Issue