diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index 077d9ddc7f..a4c9686b67 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -17,7 +17,6 @@ package org.apache.activemq.artemis.protocol.amqp.broker; import java.nio.charset.StandardCharsets; -import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -122,11 +121,49 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. protected static final int VALUE_NOT_PRESENT = -1; + /** + * This has been made public just for testing purposes: it's not stable + * and developers shouldn't rely on this for developing purposes. + */ + public enum MessageDataScanningStatus { + NOT_SCANNED(0), RELOAD_PERSISTENCE(1), SCANNED(2); + + private static final MessageDataScanningStatus[] STATES; + + static { + // this prevent codes to be assigned with the wrong order + // and ensure valueOf to work fine + final MessageDataScanningStatus[] states = values(); + STATES = new MessageDataScanningStatus[states.length]; + for (final MessageDataScanningStatus state : states) { + final int code = state.code; + if (STATES[code] != null) { + throw new AssertionError("code already in use: " + code); + } + STATES[code] = state; + } + } + + final byte code; + + MessageDataScanningStatus(int code) { + assert code >= 0 && code <= Byte.MAX_VALUE; + this.code = (byte) code; + } + + static MessageDataScanningStatus valueOf(int code) { + checkCode(code); + return STATES[code]; + } + + private static void checkCode(int code) { + if (code < 0 || code > (STATES.length - 1)) { + throw new IllegalArgumentException("invalid status code: " + code); + } + } + } // Buffer and state for the data backing this message. - protected static final byte NOT_SCANNED = 0; - protected static final byte RELOAD_PERSISTENCE = 1; - protected static final byte SCANNED = 2; - protected byte messageDataScanned; + protected byte messageDataScanned = MessageDataScanningStatus.NOT_SCANNED.code; // Marks the message as needed to be re-encoded to update the backing buffer protected boolean modified; @@ -191,6 +228,15 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. this.coreMessageObjectPools = null; } + /** + * Similarly to {@link MessageDataScanningStatus}, this method is made available only for testing + * purposes to check the message data scanning status.
+ * Its access is not thread-safe and it shouldn't return {@code null}. + */ + public final MessageDataScanningStatus messageDataScanned() { + return MessageDataScanningStatus.valueOf(messageDataScanned); + } + /** This will return application properties without attempting to decode it. * That means, if applicationProperties were never parsed before, this will return null, even if there is application properties. * This was created as an internal method for testing, as we need to validate if the application properties are not decoded until needed. */ @@ -448,7 +494,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. // re-encode should be done to update the backing data with the in memory elements. protected synchronized void ensureMessageDataScanned() { - final byte state = messageDataScanned; + final MessageDataScanningStatus state = MessageDataScanningStatus.valueOf(messageDataScanned); switch (state) { case NOT_SCANNED: scanMessageData(); @@ -459,10 +505,6 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. case SCANNED: // NO-OP break; - default: - throw new IllegalStateException("invalid messageDataScanned state: expected within " + - Arrays.toString(new byte[]{NOT_SCANNED, SCANNED, RELOAD_PERSISTENCE}) + - " but " + messageDataScanned); } } @@ -509,7 +551,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. TLSEncode.getDecoder().setBuffer(new NettyReadable(buf)); try { - messageDataScanned = SCANNED; + messageDataScanned = MessageDataScanningStatus.SCANNED.code; headerPosition = buf.readInt(); encodedHeaderSize = buf.readInt(); @@ -553,7 +595,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. } protected synchronized void scanMessageData() { - this.messageDataScanned = SCANNED; + this.messageDataScanned = MessageDataScanningStatus.SCANNED.code; DecoderImpl decoder = TLSEncode.getDecoder(); decoder.setBuffer(getData().rewind()); @@ -791,9 +833,9 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. public abstract void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools); protected synchronized void lazyScanAfterReloadPersistence() { - assert messageDataScanned == RELOAD_PERSISTENCE; + assert messageDataScanned == MessageDataScanningStatus.RELOAD_PERSISTENCE.code; scanMessageData(); - messageDataScanned = SCANNED; + messageDataScanned = MessageDataScanningStatus.SCANNED.code; modified = false; // Message state should reflect that is came from persistent storage which @@ -1164,7 +1206,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. private boolean anyMessageAnnotations(Symbol[] symbols, KMPNeedle[] symbolNeedles) { assert symbols.length == symbolNeedles.length; final int count = symbols.length; - if (messageDataScanned == SCANNED) { + if (messageDataScanned == MessageDataScanningStatus.SCANNED.code) { final MessageAnnotations messageAnnotations = this.messageAnnotations; if (messageAnnotations == null) { return false; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java index 99d3be35c0..2a15adcf0f 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java @@ -162,14 +162,7 @@ public class AMQPStandardMessage extends AMQPMessage { // Message state is now that the underlying buffer is loaded, but the contents not yet scanned resetMessageData(); modified = false; - messageDataScanned = RELOAD_PERSISTENCE; - // can happen when moved to a durable location. We must re-encode here to - // avoid a subsequent redelivery from suddenly appearing with a durable header - // tag when the initial delivery did not. - if (!isDurable()) { - setDurable(true); - reencode(); - } + messageDataScanned = MessageDataScanningStatus.RELOAD_PERSISTENCE.code; } @Override @@ -207,7 +200,7 @@ public class AMQPStandardMessage extends AMQPMessage { @Override protected synchronized void encodeMessage() { this.modified = false; - this.messageDataScanned = NOT_SCANNED; + this.messageDataScanned = MessageDataScanningStatus.NOT_SCANNED.code; int estimated = Math.max(1500, data != null ? data.capacity() + 1000 : 0); ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(estimated); EncoderImpl encoder = TLSEncode.getEncoder(); diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java index 882248188d..efe356c963 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java @@ -172,11 +172,21 @@ public class AMQPMessageTest { } catch (NullPointerException npe) { } + Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.NOT_SCANNED, message.messageDataScanned()); + // Now reload from encoded data message.reloadPersistence(encoded, null); + Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.messageDataScanned()); + assertTrue(message.hasScheduledDeliveryTime()); + + Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.messageDataScanned()); + message.getHeader(); + + Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.messageDataScanned()); + assertTrue(message.hasScheduledDeliveryTime()); } @@ -195,11 +205,21 @@ public class AMQPMessageTest { } catch (NullPointerException npe) { } + Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.NOT_SCANNED, message.messageDataScanned()); + // Now reload from encoded data message.reloadPersistence(encoded, null); + Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.messageDataScanned()); + assertTrue(message.hasScheduledDeliveryTime()); + + Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.messageDataScanned()); + message.getHeader(); + + Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.messageDataScanned()); + assertTrue(message.hasScheduledDeliveryTime()); } @@ -215,11 +235,21 @@ public class AMQPMessageTest { } catch (NullPointerException npe) { } + Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.NOT_SCANNED, message.messageDataScanned()); + // Now reload from encoded data message.reloadPersistence(encoded, null); + Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.messageDataScanned()); + assertFalse(message.hasScheduledDeliveryTime()); + + Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.messageDataScanned()); + message.getHeader(); + + Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.messageDataScanned()); + assertFalse(message.hasScheduledDeliveryTime()); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/journal/AmqpJournalLoadingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/journal/AmqpJournalLoadingTest.java new file mode 100644 index 0000000000..4f5315c0ab --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/journal/AmqpJournalLoadingTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp.journal; + +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; +import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; +import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.activemq.artemis.utils.collections.LinkedListIterator; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.hamcrest.Matchers; +import org.junit.Assert; +import org.junit.Test; + +public class AmqpJournalLoadingTest extends AmqpClientTestSupport { + + @Test + public void durableMessageDataNotScannedOnRestartTest() throws Exception { + sendMessages(getQueueName(), 1, true); + final Queue queueView = getProxyToQueue(getQueueName()); + Wait.assertTrue("All messages should arrive", () -> queueView.getMessageCount() == 1); + + server.stop(); + server.start(); + + final AMQPMessage amqpMessage; + + final Queue afterRestartQueueView = getProxyToQueue(getQueueName()); + + Wait.assertTrue("All messages should arrive", () -> afterRestartQueueView.getMessageCount() == 1); + + try (LinkedListIterator iterator = afterRestartQueueView.iterator()) { + Assert.assertTrue(iterator.hasNext()); + final MessageReference next = iterator.next(); + Assert.assertNotNull(next); + Assert.assertFalse(iterator.hasNext()); + final Message message = next.getMessage(); + Assert.assertThat(message, Matchers.instanceOf(AMQPMessage.class)); + amqpMessage = (AMQPMessage) message; + Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, amqpMessage.messageDataScanned()); + } + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(getQueueName()); + + receiver.flow(1); + AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(receive); + assertTrue(receive.isDurable()); + + assertEquals(1, afterRestartQueueView.getMessageCount()); + + Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, amqpMessage.messageDataScanned()); + + receive.accept(); + + receiver.close(); + + Wait.assertEquals(0, () -> afterRestartQueueView.getMessageCount()); + + connection.close(); + } + +}