ARTEMIS-3577 Save Core msg re-encoding due to msg copy

This commit is contained in:
franz1981 2021-11-16 10:58:56 +01:00
parent ad4f6a133a
commit 185236f74d
1 changed files with 17 additions and 12 deletions

View File

@ -2649,7 +2649,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction() {
@Override
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
return moveBetweenSnFQueues(queueSuffix, tx, ref);
return moveBetweenSnFQueues(queueSuffix, tx, ref, null);
}
});
}
@ -3387,9 +3387,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
final boolean expiry,
final boolean rejectDuplicate,
final long... queueIDs) throws Exception {
Message copyMessage = makeCopy(ref, expiry);
copyMessage.setAddress(toAddress);
Message copyMessage = makeCopy(ref, expiry, toAddress);
Object originalRoutingType = ref.getMessage().getBrokerProperty(Message.HDR_ORIG_ROUTING_TYPE);
if (originalRoutingType != null && originalRoutingType instanceof Byte) {
@ -3417,8 +3415,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@SuppressWarnings({"ArrayToString", "ArrayToStringConcatenation"})
private boolean moveBetweenSnFQueues(final SimpleString queueSuffix,
final Transaction tx,
final MessageReference ref) throws Exception {
Message copyMessage = makeCopy(ref, false, false);
final MessageReference ref,
final SimpleString newAddress) throws Exception {
Message copyMessage = makeCopy(ref, false, false, newAddress);
byte[] oldRouteToIDs = null;
String targetNodeID;
@ -3521,13 +3520,14 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return new Pair<>(targetNodeID, targetBinding);
}
private Message makeCopy(final MessageReference ref, final boolean expiry) throws Exception {
return makeCopy(ref, expiry, true);
private Message makeCopy(final MessageReference ref, final boolean expiry, final SimpleString newAddress) throws Exception {
return makeCopy(ref, expiry, true, newAddress);
}
private Message makeCopy(final MessageReference ref,
final boolean expiry,
final boolean copyOriginalHeaders) throws Exception {
final boolean copyOriginalHeaders,
final SimpleString newAddress) throws Exception {
if (ref == null) {
ActiveMQServerLogger.LOGGER.nullRefMessage();
throw new ActiveMQNullRefException("Reference to message is null");
@ -3547,6 +3547,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
Message copy = message.copy(newID, true);
if (newAddress != null) {
// setting it before checkLargeMessage:
// checkLargeMessage can cause msg encoding and setting it later invalidate it,
// forcing to be re-encoded later
copy.setAddress(newAddress);
}
if (copyOriginalHeaders) {
copy.referenceOriginalMessage(message, ref.getQueue().getName().toString());
}
@ -3706,9 +3713,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
tx = new TransactionImpl(storageManager);
}
Message copyMessage = makeCopy(ref, reason == AckReason.EXPIRED);
copyMessage.setAddress(address);
Message copyMessage = makeCopy(ref, reason == AckReason.EXPIRED, address);
RoutingStatus routingStatus = postOffice.route(copyMessage, tx, false, rejectDuplicate, binding);