This closes #3370
This commit is contained in:
commit
4a91bcfd58
|
@ -389,6 +389,16 @@ public class ByteUtil {
|
|||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* This ensure a more exact resizing then {@link ByteBuf#ensureWritable(int)}, if needed.<br>
|
||||
* It won't try to trim a large enough buffer.
|
||||
*/
|
||||
public static void ensureExactWritable(ByteBuf buffer, int minWritableBytes) {
|
||||
if (buffer.maxFastWritableBytes() < minWritableBytes) {
|
||||
buffer.capacity(buffer.writerIndex() + minWritableBytes);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Returns {@code true} if the {@link SimpleString} encoded content into {@code bytes} is equals to {@code s},
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.activemq.artemis.utils.AbstractByteBufPool;
|
|||
import org.apache.activemq.artemis.utils.ByteUtil;
|
||||
import org.apache.activemq.artemis.utils.DataConstants;
|
||||
|
||||
import static org.apache.activemq.artemis.utils.ByteUtil.ensureExactWritable;
|
||||
import static org.apache.activemq.artemis.utils.DataConstants.BOOLEAN;
|
||||
import static org.apache.activemq.artemis.utils.DataConstants.BYTE;
|
||||
import static org.apache.activemq.artemis.utils.DataConstants.BYTES;
|
||||
|
@ -546,11 +547,18 @@ public class TypedProperties {
|
|||
decode(buffer, null);
|
||||
}
|
||||
|
||||
|
||||
public synchronized void encode(final ByteBuf buffer) {
|
||||
public synchronized int encode(final ByteBuf buffer) {
|
||||
final int encodedSize;
|
||||
// it's a trick to not pay the cost of buffer.writeIndex without assertions enabled
|
||||
int writerIndex = 0;
|
||||
assert (writerIndex = buffer.writerIndex()) >= 0 : "Always true";
|
||||
if (properties == null || size == 0) {
|
||||
encodedSize = DataConstants.SIZE_BYTE;
|
||||
ensureExactWritable(buffer, encodedSize);
|
||||
buffer.writeByte(DataConstants.NULL);
|
||||
} else {
|
||||
encodedSize = DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + size;
|
||||
ensureExactWritable(buffer, encodedSize);
|
||||
buffer.writeByte(DataConstants.NOT_NULL);
|
||||
|
||||
buffer.writeInt(properties.size());
|
||||
|
@ -563,6 +571,8 @@ public class TypedProperties {
|
|||
value.write(buffer);
|
||||
});
|
||||
}
|
||||
assert buffer.writerIndex() == (writerIndex + encodedSize) : "Bad encode size estimation";
|
||||
return encodedSize;
|
||||
}
|
||||
|
||||
public synchronized int getEncodeSize() {
|
||||
|
|
|
@ -24,6 +24,8 @@ import java.util.Arrays;
|
|||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
import org.jboss.logging.Logger;
|
||||
import org.junit.Assert;
|
||||
|
@ -37,7 +39,7 @@ import static org.junit.Assert.fail;
|
|||
|
||||
public class ByteUtilTest {
|
||||
|
||||
private static Logger log = Logger.getLogger(ByteUtilTest.class);
|
||||
private static final Logger log = Logger.getLogger(ByteUtilTest.class);
|
||||
|
||||
@Test
|
||||
public void testBytesToString() {
|
||||
|
@ -409,5 +411,29 @@ public class ByteUtilTest {
|
|||
assertArrayEquals(assertContent, convertedContent);
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void shouldEnsureExactWritableFailToEnlargeWrappedByteBuf() {
|
||||
byte[] wrapped = new byte[32];
|
||||
ByteBuf buffer = Unpooled.wrappedBuffer(wrapped);
|
||||
buffer.writerIndex(wrapped.length);
|
||||
ByteUtil.ensureExactWritable(buffer, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldEnsureExactWritableNotEnlargeBufferWithEnoughSpace() {
|
||||
byte[] wrapped = new byte[32];
|
||||
ByteBuf buffer = Unpooled.wrappedBuffer(wrapped);
|
||||
buffer.writerIndex(wrapped.length - 1);
|
||||
ByteUtil.ensureExactWritable(buffer, 1);
|
||||
Assert.assertSame(wrapped, buffer.array());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldEnsureExactWritableEnlargeBufferWithoutEnoughSpace() {
|
||||
ByteBuf buffer = Unpooled.buffer(32);
|
||||
buffer.writerIndex(32);
|
||||
ByteUtil.ensureExactWritable(buffer, 1);
|
||||
Assert.assertEquals(33, buffer.capacity());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -470,12 +470,13 @@ public interface Message {
|
|||
// only valid probably on AMQP
|
||||
}
|
||||
|
||||
default void referenceOriginalMessage(final Message original, String originalQueue) {
|
||||
default void referenceOriginalMessage(final Message original, final SimpleString originalQueue) {
|
||||
setBrokerProperty(Message.HDR_ORIGINAL_QUEUE, originalQueue);
|
||||
setBrokerProperty(Message.HDR_ORIGINAL_ADDRESS, original.getAddress());
|
||||
setBrokerProperty(Message.HDR_ORIGINAL_ADDRESS, original.getAddressSimpleString());
|
||||
setBrokerProperty(Message.HDR_ORIG_MESSAGE_ID, original.getMessageID());
|
||||
if (original.getRoutingType() != null) {
|
||||
setBrokerProperty(Message.HDR_ORIG_ROUTING_TYPE, original.getRoutingType().getType());
|
||||
final RoutingType routingType = original.getRoutingType();
|
||||
if (routingType != null) {
|
||||
setBrokerProperty(Message.HDR_ORIG_ROUTING_TYPE, routingType.getType());
|
||||
}
|
||||
|
||||
// reset expiry
|
||||
|
|
|
@ -60,13 +60,15 @@ import org.apache.activemq.artemis.utils.UUID;
|
|||
import org.apache.activemq.artemis.utils.collections.TypedProperties;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
import static org.apache.activemq.artemis.utils.ByteUtil.ensureExactWritable;
|
||||
|
||||
/** Note: you shouldn't change properties using multi-threads. Change your properties before you can send it to multiple
|
||||
* consumers */
|
||||
public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
||||
|
||||
public static final int BUFFER_HEADER_SPACE = PacketImpl.PACKET_HEADERS_SIZE;
|
||||
|
||||
private volatile int memoryEstimate = -1;
|
||||
protected volatile int memoryEstimate = -1;
|
||||
|
||||
private static final Logger logger = Logger.getLogger(CoreMessage.class);
|
||||
|
||||
|
@ -81,8 +83,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
|
||||
protected volatile ResetLimitWrappedActiveMQBuffer writableBuffer;
|
||||
|
||||
Object body;
|
||||
|
||||
protected int endOfBodyPosition = -1;
|
||||
|
||||
protected int messageIDPosition = -1;
|
||||
|
@ -445,7 +445,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
// with getEncodedBuffer(), otherwise can introduce race condition when delivering concurrently to
|
||||
// many subscriptions and bridging to other nodes in a cluster
|
||||
synchronized (other) {
|
||||
this.body = other.body;
|
||||
this.endOfBodyPosition = other.endOfBodyPosition;
|
||||
internalSetMessageID(other.messageID);
|
||||
this.address = other.address;
|
||||
|
@ -641,6 +640,15 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
@Override
|
||||
public int getMemoryEstimate() {
|
||||
if (memoryEstimate == -1) {
|
||||
if (buffer != null && !isLargeMessage()) {
|
||||
if (!validBuffer) {
|
||||
// this can happen if a message is modified
|
||||
// eg clustered messages get additional routing information
|
||||
// that need to be correctly accounted in memory
|
||||
checkEncode();
|
||||
}
|
||||
}
|
||||
final TypedProperties properties = this.properties;
|
||||
memoryEstimate = memoryOffset +
|
||||
(buffer != null ? buffer.capacity() : 0) +
|
||||
(properties != null ? properties.getMemoryOffset() : 0);
|
||||
|
@ -733,11 +741,9 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
endOfBodyPosition = BUFFER_HEADER_SPACE + DataConstants.SIZE_INT;
|
||||
}
|
||||
|
||||
buffer.setIndex(0, 0);
|
||||
buffer.writeInt(endOfBodyPosition);
|
||||
|
||||
buffer.setInt(0, endOfBodyPosition);
|
||||
// The end of body position
|
||||
buffer.writerIndex(endOfBodyPosition - BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
|
||||
buffer.setIndex(0, endOfBodyPosition - BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
|
||||
|
||||
encodeHeadersAndProperties(buffer);
|
||||
|
||||
|
@ -748,21 +754,42 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
|
||||
public void encodeHeadersAndProperties(final ByteBuf buffer) {
|
||||
final TypedProperties properties = getProperties();
|
||||
messageIDPosition = buffer.writerIndex();
|
||||
buffer.writeLong(messageID);
|
||||
SimpleString.writeNullableSimpleString(buffer, address);
|
||||
if (userID == null) {
|
||||
buffer.writeByte(DataConstants.NULL);
|
||||
} else {
|
||||
buffer.writeByte(DataConstants.NOT_NULL);
|
||||
buffer.writeBytes(userID.asBytes());
|
||||
final int initialWriterIndex = buffer.writerIndex();
|
||||
messageIDPosition = initialWriterIndex;
|
||||
final UUID userID = this.userID;
|
||||
final int userIDEncodedSize = userID == null ? Byte.BYTES : Byte.BYTES + userID.asBytes().length;
|
||||
final SimpleString address = this.address;
|
||||
final int addressEncodedBytes = SimpleString.sizeofNullableString(address);
|
||||
final int headersSize =
|
||||
Long.BYTES + // messageID
|
||||
addressEncodedBytes + // address
|
||||
userIDEncodedSize + // userID
|
||||
Byte.BYTES + // type
|
||||
Byte.BYTES + // durable
|
||||
Long.BYTES + // expiration
|
||||
Long.BYTES + // timestamp
|
||||
Byte.BYTES; // priority
|
||||
synchronized (properties) {
|
||||
final int propertiesEncodeSize = properties.getEncodeSize();
|
||||
final int totalEncodedSize = headersSize + propertiesEncodeSize;
|
||||
ensureExactWritable(buffer, totalEncodedSize);
|
||||
buffer.writeLong(messageID);
|
||||
SimpleString.writeNullableSimpleString(buffer, address);
|
||||
if (userID == null) {
|
||||
buffer.writeByte(DataConstants.NULL);
|
||||
} else {
|
||||
buffer.writeByte(DataConstants.NOT_NULL);
|
||||
buffer.writeBytes(userID.asBytes());
|
||||
}
|
||||
buffer.writeByte(type);
|
||||
buffer.writeBoolean(durable);
|
||||
buffer.writeLong(expiration);
|
||||
buffer.writeLong(timestamp);
|
||||
buffer.writeByte(priority);
|
||||
assert buffer.writerIndex() == initialWriterIndex + headersSize : "Bad Headers encode size estimation";
|
||||
final int realPropertiesEncodeSize = properties.encode(buffer);
|
||||
assert realPropertiesEncodeSize == propertiesEncodeSize : "TypedProperties has a wrong encode size estimation or is being modified concurrently";
|
||||
}
|
||||
buffer.writeByte(type);
|
||||
buffer.writeBoolean(durable);
|
||||
buffer.writeLong(expiration);
|
||||
buffer.writeLong(timestamp);
|
||||
buffer.writeByte(priority);
|
||||
properties.encode(buffer);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -38,6 +38,9 @@ import org.junit.Before;
|
|||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
|
||||
public class CoreMessageTest {
|
||||
|
||||
public static final SimpleString ADDRESS = new SimpleString("this.local.address");
|
||||
|
@ -359,6 +362,34 @@ public class CoreMessageTest {
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMemoryEstimateChangedAfterModifiedAlreadyEncodedCopy() {
|
||||
final CoreMessage msg = new CoreMessage(1, 44);
|
||||
msg.getEncodeSize();
|
||||
final int memoryEstimate = msg.getMemoryEstimate();
|
||||
final CoreMessage copy = (CoreMessage) msg.copy(2);
|
||||
copy.getEncodeSize();
|
||||
copy.putBytesProperty(Message.HDR_ROUTE_TO_IDS, new byte[Long.BYTES]);
|
||||
final int increasedMemoryFootprint = copy.getMemoryEstimate() - memoryEstimate;
|
||||
final int increasedPropertyFootprint = copy.getProperties().getMemoryOffset() - msg.getProperties().getMemoryOffset();
|
||||
assertThat("memory estimation isn't accounting for the additional encoded property",
|
||||
increasedMemoryFootprint, greaterThan(increasedPropertyFootprint));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMessageBufferCapacityMatchEncodedSizeAfterModifiedCopy() {
|
||||
final CoreMessage msg = new CoreMessage(1, 4155);
|
||||
msg.setAddress("a");
|
||||
msg.putBytesProperty("_", new byte[4096]);
|
||||
final CoreMessage copy = (CoreMessage) msg.copy(2);
|
||||
Assert.assertEquals(msg.getEncodeSize(), copy.getBuffer().capacity());
|
||||
copy.setAddress("b");
|
||||
copy.setBrokerProperty(Message.HDR_ORIGINAL_QUEUE, "a");
|
||||
copy.setBrokerProperty(Message.HDR_ORIGINAL_ADDRESS, msg.getAddressSimpleString());
|
||||
copy.setBrokerProperty(Message.HDR_ORIG_MESSAGE_ID, msg.getMessageID());
|
||||
Assert.assertEquals(copy.getEncodeSize(), copy.getBuffer().capacity());
|
||||
}
|
||||
|
||||
private void printVariable(String body, String encode) {
|
||||
System.out.println("// body = \"" + body + "\";");
|
||||
System.out.println("private final String STRING_ENCODE = \"" + encode + "\";");
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
|||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||
import org.apache.activemq.artemis.core.message.LargeBodyReader;
|
||||
|
@ -643,7 +644,7 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
|
|||
}
|
||||
|
||||
@Override
|
||||
public void referenceOriginalMessage(final Message original, String originalQueue) {
|
||||
public void referenceOriginalMessage(final Message original, final SimpleString originalQueue) {
|
||||
|
||||
super.referenceOriginalMessage(original, originalQueue);
|
||||
|
||||
|
|
|
@ -499,9 +499,9 @@ public class TestConversions extends Assert {
|
|||
for (int i = 0; i < 100; i++) {
|
||||
encodedMessage.setMessageID(333L);
|
||||
if (i % 3 == 0) {
|
||||
encodedMessage.referenceOriginalMessage(encodedMessage, "SOME-OTHER-QUEUE-DOES-NOT-MATTER-WHAT");
|
||||
encodedMessage.referenceOriginalMessage(encodedMessage, SimpleString.toSimpleString("SOME-OTHER-QUEUE-DOES-NOT-MATTER-WHAT"));
|
||||
} else {
|
||||
encodedMessage.referenceOriginalMessage(encodedMessage, "XXX");
|
||||
encodedMessage.referenceOriginalMessage(encodedMessage, SimpleString.toSimpleString("XXX"));
|
||||
}
|
||||
encodedMessage.putStringProperty("another " + i, "value " + i);
|
||||
encodedMessage.messageChanged();
|
||||
|
|
|
@ -37,6 +37,8 @@ import org.jboss.logging.Logger;
|
|||
|
||||
public class LargeBody {
|
||||
|
||||
static final int MEMORY_OFFSET = 56;
|
||||
|
||||
private static final Logger logger = Logger.getLogger(LargeBody.class);
|
||||
|
||||
private long bodySize = -1;
|
||||
|
|
|
@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
|||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||
import org.apache.activemq.artemis.core.message.LargeBodyReader;
|
||||
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
|
||||
|
@ -33,6 +34,11 @@ import org.jboss.logging.Logger;
|
|||
|
||||
public final class LargeServerMessageImpl extends CoreMessage implements CoreLargeServerMessage {
|
||||
|
||||
// Given that LargeBody is never null it needs to be accounted on this instance footprint.
|
||||
// This value has been computed using https://github.com/openjdk/jol
|
||||
// with HotSpot 64-bit COOPS 8-byte align
|
||||
private static final int MEMORY_OFFSET = 112 + LargeBody.MEMORY_OFFSET;
|
||||
|
||||
@Override
|
||||
public Message toMessage() {
|
||||
return this;
|
||||
|
@ -73,9 +79,6 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar
|
|||
|
||||
private final StorageManager storageManager;
|
||||
|
||||
// We cache this
|
||||
private volatile int memoryEstimate = -1;
|
||||
|
||||
public LargeServerMessageImpl(final StorageManager storageManager) {
|
||||
largeBody = new LargeBody(this, storageManager);
|
||||
this.storageManager = storageManager;
|
||||
|
@ -243,8 +246,12 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar
|
|||
public int getMemoryEstimate() {
|
||||
synchronized (largeBody) {
|
||||
if (memoryEstimate == -1) {
|
||||
// The body won't be on memory (aways on-file), so we don't consider this for paging
|
||||
memoryEstimate = getHeadersAndPropertiesEncodeSize() + DataConstants.SIZE_INT + getEncodeSize() + (16 + 4) * 2 + 1;
|
||||
// The body won't be on memory (always on-file), so we don't consider this for paging
|
||||
memoryEstimate = MEMORY_OFFSET +
|
||||
getHeadersAndPropertiesEncodeSize() +
|
||||
DataConstants.SIZE_INT +
|
||||
getEncodeSize() +
|
||||
(16 + 4) * 2 + 1;
|
||||
}
|
||||
|
||||
return memoryEstimate;
|
||||
|
@ -259,7 +266,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar
|
|||
}
|
||||
|
||||
@Override
|
||||
public void referenceOriginalMessage(final Message original, String originalQueue) {
|
||||
public void referenceOriginalMessage(final Message original, final SimpleString originalQueue) {
|
||||
|
||||
super.referenceOriginalMessage(original, originalQueue);
|
||||
|
||||
|
|
|
@ -105,7 +105,7 @@ public class DivertImpl implements Divert {
|
|||
copy = message.copy(id);
|
||||
|
||||
// This will set the original MessageId, and the original address
|
||||
copy.referenceOriginalMessage(message, this.getUniqueName().toString());
|
||||
copy.referenceOriginalMessage(message, this.getUniqueName());
|
||||
|
||||
copy.setAddress(forwardAddress);
|
||||
|
||||
|
|
|
@ -2649,7 +2649,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction() {
|
||||
@Override
|
||||
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
|
||||
return moveBetweenSnFQueues(queueSuffix, tx, ref);
|
||||
return moveBetweenSnFQueues(queueSuffix, tx, ref, null);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -2709,11 +2709,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
}
|
||||
}
|
||||
|
||||
if (targetQueue != null) {
|
||||
move(SimpleString.toSimpleString(originalMessageAddress), tx, ref, false, false, targetQueue.longValue());
|
||||
} else {
|
||||
move(SimpleString.toSimpleString(originalMessageAddress), tx, ref, false, false);
|
||||
}
|
||||
move(SimpleString.toSimpleString(originalMessageAddress), tx, ref, false, false, targetQueue);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
@ -3386,22 +3382,19 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
final MessageReference ref,
|
||||
final boolean expiry,
|
||||
final boolean rejectDuplicate,
|
||||
final long... queueIDs) throws Exception {
|
||||
Message copyMessage = makeCopy(ref, expiry);
|
||||
|
||||
copyMessage.setAddress(toAddress);
|
||||
final Long queueID) throws Exception {
|
||||
Message copyMessage = makeCopy(ref, expiry, toAddress);
|
||||
|
||||
Object originalRoutingType = ref.getMessage().getBrokerProperty(Message.HDR_ORIG_ROUTING_TYPE);
|
||||
if (originalRoutingType != null && originalRoutingType instanceof Byte) {
|
||||
copyMessage.setRoutingType(RoutingType.getType((Byte) originalRoutingType));
|
||||
}
|
||||
|
||||
if (queueIDs != null && queueIDs.length > 0) {
|
||||
ByteBuffer buffer = ByteBuffer.allocate(8 * queueIDs.length);
|
||||
for (long id : queueIDs) {
|
||||
buffer.putLong(id);
|
||||
}
|
||||
copyMessage.putBytesProperty(Message.HDR_ROUTE_TO_IDS.toString(), buffer.array());
|
||||
if (queueID != null) {
|
||||
final byte[] encodedBuffer = new byte[Long.BYTES];
|
||||
ByteBuffer buffer = ByteBuffer.wrap(encodedBuffer);
|
||||
buffer.putLong(0, queueID);
|
||||
copyMessage.putBytesProperty(Message.HDR_ROUTE_TO_IDS.toString(), encodedBuffer);
|
||||
}
|
||||
|
||||
postOffice.route(copyMessage, tx, false, rejectDuplicate);
|
||||
|
@ -3417,8 +3410,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
@SuppressWarnings({"ArrayToString", "ArrayToStringConcatenation"})
|
||||
private boolean moveBetweenSnFQueues(final SimpleString queueSuffix,
|
||||
final Transaction tx,
|
||||
final MessageReference ref) throws Exception {
|
||||
Message copyMessage = makeCopy(ref, false, false);
|
||||
final MessageReference ref,
|
||||
final SimpleString newAddress) throws Exception {
|
||||
Message copyMessage = makeCopy(ref, false, false, newAddress);
|
||||
|
||||
byte[] oldRouteToIDs = null;
|
||||
String targetNodeID;
|
||||
|
@ -3521,13 +3515,14 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
return new Pair<>(targetNodeID, targetBinding);
|
||||
}
|
||||
|
||||
private Message makeCopy(final MessageReference ref, final boolean expiry) throws Exception {
|
||||
return makeCopy(ref, expiry, true);
|
||||
private Message makeCopy(final MessageReference ref, final boolean expiry, final SimpleString newAddress) throws Exception {
|
||||
return makeCopy(ref, expiry, true, newAddress);
|
||||
}
|
||||
|
||||
private Message makeCopy(final MessageReference ref,
|
||||
final boolean expiry,
|
||||
final boolean copyOriginalHeaders) throws Exception {
|
||||
final boolean copyOriginalHeaders,
|
||||
final SimpleString newAddress) throws Exception {
|
||||
if (ref == null) {
|
||||
ActiveMQServerLogger.LOGGER.nullRefMessage();
|
||||
throw new ActiveMQNullRefException("Reference to message is null");
|
||||
|
@ -3547,8 +3542,15 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
|
||||
Message copy = message.copy(newID, true);
|
||||
|
||||
if (newAddress != null) {
|
||||
// setting it before checkLargeMessage:
|
||||
// checkLargeMessage can cause msg encoding and setting it later invalidate it,
|
||||
// forcing to be re-encoded later
|
||||
copy.setAddress(newAddress);
|
||||
}
|
||||
|
||||
if (copyOriginalHeaders) {
|
||||
copy.referenceOriginalMessage(message, ref.getQueue().getName().toString());
|
||||
copy.referenceOriginalMessage(message, ref.getQueue().getName());
|
||||
}
|
||||
|
||||
copy.setExpiration(0);
|
||||
|
@ -3577,7 +3579,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoBindings(expiryAddress);
|
||||
acknowledge(tx, ref, AckReason.EXPIRED, null);
|
||||
} else {
|
||||
move(expiryAddress, tx, ref, true, false);
|
||||
move(expiryAddress, tx, ref, true, false, null);
|
||||
}
|
||||
} else {
|
||||
if (!printErrorExpiring) {
|
||||
|
@ -3706,9 +3708,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
tx = new TransactionImpl(storageManager);
|
||||
}
|
||||
|
||||
Message copyMessage = makeCopy(ref, reason == AckReason.EXPIRED);
|
||||
|
||||
copyMessage.setAddress(address);
|
||||
Message copyMessage = makeCopy(ref, reason == AckReason.EXPIRED, address);
|
||||
|
||||
RoutingStatus routingStatus = postOffice.route(copyMessage, tx, false, rejectDuplicate, binding);
|
||||
|
||||
|
|
Loading…
Reference in New Issue