This closes #3023
This commit is contained in:
commit
579425c61e
|
@ -17,7 +17,6 @@
|
||||||
package org.apache.activemq.artemis.protocol.amqp.broker;
|
package org.apache.activemq.artemis.protocol.amqp.broker;
|
||||||
|
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -122,11 +121,49 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
|
||||||
|
|
||||||
protected static final int VALUE_NOT_PRESENT = -1;
|
protected static final int VALUE_NOT_PRESENT = -1;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This has been made public just for testing purposes: it's not stable
|
||||||
|
* and developers shouldn't rely on this for developing purposes.
|
||||||
|
*/
|
||||||
|
public enum MessageDataScanningStatus {
|
||||||
|
NOT_SCANNED(0), RELOAD_PERSISTENCE(1), SCANNED(2);
|
||||||
|
|
||||||
|
private static final MessageDataScanningStatus[] STATES;
|
||||||
|
|
||||||
|
static {
|
||||||
|
// this prevent codes to be assigned with the wrong order
|
||||||
|
// and ensure valueOf to work fine
|
||||||
|
final MessageDataScanningStatus[] states = values();
|
||||||
|
STATES = new MessageDataScanningStatus[states.length];
|
||||||
|
for (final MessageDataScanningStatus state : states) {
|
||||||
|
final int code = state.code;
|
||||||
|
if (STATES[code] != null) {
|
||||||
|
throw new AssertionError("code already in use: " + code);
|
||||||
|
}
|
||||||
|
STATES[code] = state;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final byte code;
|
||||||
|
|
||||||
|
MessageDataScanningStatus(int code) {
|
||||||
|
assert code >= 0 && code <= Byte.MAX_VALUE;
|
||||||
|
this.code = (byte) code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static MessageDataScanningStatus valueOf(int code) {
|
||||||
|
checkCode(code);
|
||||||
|
return STATES[code];
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void checkCode(int code) {
|
||||||
|
if (code < 0 || code > (STATES.length - 1)) {
|
||||||
|
throw new IllegalArgumentException("invalid status code: " + code);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
// Buffer and state for the data backing this message.
|
// Buffer and state for the data backing this message.
|
||||||
protected static final byte NOT_SCANNED = 0;
|
protected byte messageDataScanned = MessageDataScanningStatus.NOT_SCANNED.code;
|
||||||
protected static final byte RELOAD_PERSISTENCE = 1;
|
|
||||||
protected static final byte SCANNED = 2;
|
|
||||||
protected byte messageDataScanned;
|
|
||||||
|
|
||||||
// Marks the message as needed to be re-encoded to update the backing buffer
|
// Marks the message as needed to be re-encoded to update the backing buffer
|
||||||
protected boolean modified;
|
protected boolean modified;
|
||||||
|
@ -191,6 +228,15 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
|
||||||
this.coreMessageObjectPools = null;
|
this.coreMessageObjectPools = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Similarly to {@link MessageDataScanningStatus}, this method is made available only for testing
|
||||||
|
* purposes to check the message data scanning status.<br>
|
||||||
|
* Its access is not thread-safe and it shouldn't return {@code null}.
|
||||||
|
*/
|
||||||
|
public final MessageDataScanningStatus messageDataScanned() {
|
||||||
|
return MessageDataScanningStatus.valueOf(messageDataScanned);
|
||||||
|
}
|
||||||
|
|
||||||
/** This will return application properties without attempting to decode it.
|
/** This will return application properties without attempting to decode it.
|
||||||
* That means, if applicationProperties were never parsed before, this will return null, even if there is application properties.
|
* That means, if applicationProperties were never parsed before, this will return null, even if there is application properties.
|
||||||
* This was created as an internal method for testing, as we need to validate if the application properties are not decoded until needed. */
|
* This was created as an internal method for testing, as we need to validate if the application properties are not decoded until needed. */
|
||||||
|
@ -448,7 +494,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
|
||||||
// re-encode should be done to update the backing data with the in memory elements.
|
// re-encode should be done to update the backing data with the in memory elements.
|
||||||
|
|
||||||
protected synchronized void ensureMessageDataScanned() {
|
protected synchronized void ensureMessageDataScanned() {
|
||||||
final byte state = messageDataScanned;
|
final MessageDataScanningStatus state = MessageDataScanningStatus.valueOf(messageDataScanned);
|
||||||
switch (state) {
|
switch (state) {
|
||||||
case NOT_SCANNED:
|
case NOT_SCANNED:
|
||||||
scanMessageData();
|
scanMessageData();
|
||||||
|
@ -459,10 +505,6 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
|
||||||
case SCANNED:
|
case SCANNED:
|
||||||
// NO-OP
|
// NO-OP
|
||||||
break;
|
break;
|
||||||
default:
|
|
||||||
throw new IllegalStateException("invalid messageDataScanned state: expected within " +
|
|
||||||
Arrays.toString(new byte[]{NOT_SCANNED, SCANNED, RELOAD_PERSISTENCE}) +
|
|
||||||
" but " + messageDataScanned);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -509,7 +551,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
|
||||||
TLSEncode.getDecoder().setBuffer(new NettyReadable(buf));
|
TLSEncode.getDecoder().setBuffer(new NettyReadable(buf));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
messageDataScanned = SCANNED;
|
messageDataScanned = MessageDataScanningStatus.SCANNED.code;
|
||||||
|
|
||||||
headerPosition = buf.readInt();
|
headerPosition = buf.readInt();
|
||||||
encodedHeaderSize = buf.readInt();
|
encodedHeaderSize = buf.readInt();
|
||||||
|
@ -553,7 +595,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
|
||||||
}
|
}
|
||||||
|
|
||||||
protected synchronized void scanMessageData() {
|
protected synchronized void scanMessageData() {
|
||||||
this.messageDataScanned = SCANNED;
|
this.messageDataScanned = MessageDataScanningStatus.SCANNED.code;
|
||||||
DecoderImpl decoder = TLSEncode.getDecoder();
|
DecoderImpl decoder = TLSEncode.getDecoder();
|
||||||
decoder.setBuffer(getData().rewind());
|
decoder.setBuffer(getData().rewind());
|
||||||
|
|
||||||
|
@ -791,9 +833,9 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
|
||||||
public abstract void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools);
|
public abstract void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools);
|
||||||
|
|
||||||
protected synchronized void lazyScanAfterReloadPersistence() {
|
protected synchronized void lazyScanAfterReloadPersistence() {
|
||||||
assert messageDataScanned == RELOAD_PERSISTENCE;
|
assert messageDataScanned == MessageDataScanningStatus.RELOAD_PERSISTENCE.code;
|
||||||
scanMessageData();
|
scanMessageData();
|
||||||
messageDataScanned = SCANNED;
|
messageDataScanned = MessageDataScanningStatus.SCANNED.code;
|
||||||
modified = false;
|
modified = false;
|
||||||
|
|
||||||
// Message state should reflect that is came from persistent storage which
|
// Message state should reflect that is came from persistent storage which
|
||||||
|
@ -1164,7 +1206,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
|
||||||
private boolean anyMessageAnnotations(Symbol[] symbols, KMPNeedle[] symbolNeedles) {
|
private boolean anyMessageAnnotations(Symbol[] symbols, KMPNeedle[] symbolNeedles) {
|
||||||
assert symbols.length == symbolNeedles.length;
|
assert symbols.length == symbolNeedles.length;
|
||||||
final int count = symbols.length;
|
final int count = symbols.length;
|
||||||
if (messageDataScanned == SCANNED) {
|
if (messageDataScanned == MessageDataScanningStatus.SCANNED.code) {
|
||||||
final MessageAnnotations messageAnnotations = this.messageAnnotations;
|
final MessageAnnotations messageAnnotations = this.messageAnnotations;
|
||||||
if (messageAnnotations == null) {
|
if (messageAnnotations == null) {
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -162,14 +162,7 @@ public class AMQPStandardMessage extends AMQPMessage {
|
||||||
// Message state is now that the underlying buffer is loaded, but the contents not yet scanned
|
// Message state is now that the underlying buffer is loaded, but the contents not yet scanned
|
||||||
resetMessageData();
|
resetMessageData();
|
||||||
modified = false;
|
modified = false;
|
||||||
messageDataScanned = RELOAD_PERSISTENCE;
|
messageDataScanned = MessageDataScanningStatus.RELOAD_PERSISTENCE.code;
|
||||||
// can happen when moved to a durable location. We must re-encode here to
|
|
||||||
// avoid a subsequent redelivery from suddenly appearing with a durable header
|
|
||||||
// tag when the initial delivery did not.
|
|
||||||
if (!isDurable()) {
|
|
||||||
setDurable(true);
|
|
||||||
reencode();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -207,7 +200,7 @@ public class AMQPStandardMessage extends AMQPMessage {
|
||||||
@Override
|
@Override
|
||||||
protected synchronized void encodeMessage() {
|
protected synchronized void encodeMessage() {
|
||||||
this.modified = false;
|
this.modified = false;
|
||||||
this.messageDataScanned = NOT_SCANNED;
|
this.messageDataScanned = MessageDataScanningStatus.NOT_SCANNED.code;
|
||||||
int estimated = Math.max(1500, data != null ? data.capacity() + 1000 : 0);
|
int estimated = Math.max(1500, data != null ? data.capacity() + 1000 : 0);
|
||||||
ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(estimated);
|
ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(estimated);
|
||||||
EncoderImpl encoder = TLSEncode.getEncoder();
|
EncoderImpl encoder = TLSEncode.getEncoder();
|
||||||
|
|
|
@ -172,11 +172,21 @@ public class AMQPMessageTest {
|
||||||
} catch (NullPointerException npe) {
|
} catch (NullPointerException npe) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.NOT_SCANNED, message.messageDataScanned());
|
||||||
|
|
||||||
// Now reload from encoded data
|
// Now reload from encoded data
|
||||||
message.reloadPersistence(encoded, null);
|
message.reloadPersistence(encoded, null);
|
||||||
|
|
||||||
|
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.messageDataScanned());
|
||||||
|
|
||||||
assertTrue(message.hasScheduledDeliveryTime());
|
assertTrue(message.hasScheduledDeliveryTime());
|
||||||
|
|
||||||
|
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.messageDataScanned());
|
||||||
|
|
||||||
message.getHeader();
|
message.getHeader();
|
||||||
|
|
||||||
|
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.messageDataScanned());
|
||||||
|
|
||||||
assertTrue(message.hasScheduledDeliveryTime());
|
assertTrue(message.hasScheduledDeliveryTime());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -195,11 +205,21 @@ public class AMQPMessageTest {
|
||||||
} catch (NullPointerException npe) {
|
} catch (NullPointerException npe) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.NOT_SCANNED, message.messageDataScanned());
|
||||||
|
|
||||||
// Now reload from encoded data
|
// Now reload from encoded data
|
||||||
message.reloadPersistence(encoded, null);
|
message.reloadPersistence(encoded, null);
|
||||||
|
|
||||||
|
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.messageDataScanned());
|
||||||
|
|
||||||
assertTrue(message.hasScheduledDeliveryTime());
|
assertTrue(message.hasScheduledDeliveryTime());
|
||||||
|
|
||||||
|
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.messageDataScanned());
|
||||||
|
|
||||||
message.getHeader();
|
message.getHeader();
|
||||||
|
|
||||||
|
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.messageDataScanned());
|
||||||
|
|
||||||
assertTrue(message.hasScheduledDeliveryTime());
|
assertTrue(message.hasScheduledDeliveryTime());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -215,11 +235,21 @@ public class AMQPMessageTest {
|
||||||
} catch (NullPointerException npe) {
|
} catch (NullPointerException npe) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.NOT_SCANNED, message.messageDataScanned());
|
||||||
|
|
||||||
// Now reload from encoded data
|
// Now reload from encoded data
|
||||||
message.reloadPersistence(encoded, null);
|
message.reloadPersistence(encoded, null);
|
||||||
|
|
||||||
|
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.messageDataScanned());
|
||||||
|
|
||||||
assertFalse(message.hasScheduledDeliveryTime());
|
assertFalse(message.hasScheduledDeliveryTime());
|
||||||
|
|
||||||
|
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.messageDataScanned());
|
||||||
|
|
||||||
message.getHeader();
|
message.getHeader();
|
||||||
|
|
||||||
|
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.messageDataScanned());
|
||||||
|
|
||||||
assertFalse(message.hasScheduledDeliveryTime());
|
assertFalse(message.hasScheduledDeliveryTime());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,89 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.integration.amqp.journal;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
|
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||||
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
|
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
|
||||||
|
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
|
||||||
|
import org.apache.activemq.artemis.tests.util.Wait;
|
||||||
|
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
|
||||||
|
import org.apache.activemq.transport.amqp.client.AmqpClient;
|
||||||
|
import org.apache.activemq.transport.amqp.client.AmqpConnection;
|
||||||
|
import org.apache.activemq.transport.amqp.client.AmqpMessage;
|
||||||
|
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
|
||||||
|
import org.apache.activemq.transport.amqp.client.AmqpSession;
|
||||||
|
import org.hamcrest.Matchers;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class AmqpJournalLoadingTest extends AmqpClientTestSupport {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void durableMessageDataNotScannedOnRestartTest() throws Exception {
|
||||||
|
sendMessages(getQueueName(), 1, true);
|
||||||
|
final Queue queueView = getProxyToQueue(getQueueName());
|
||||||
|
Wait.assertTrue("All messages should arrive", () -> queueView.getMessageCount() == 1);
|
||||||
|
|
||||||
|
server.stop();
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
final AMQPMessage amqpMessage;
|
||||||
|
|
||||||
|
final Queue afterRestartQueueView = getProxyToQueue(getQueueName());
|
||||||
|
|
||||||
|
Wait.assertTrue("All messages should arrive", () -> afterRestartQueueView.getMessageCount() == 1);
|
||||||
|
|
||||||
|
try (LinkedListIterator<MessageReference> iterator = afterRestartQueueView.iterator()) {
|
||||||
|
Assert.assertTrue(iterator.hasNext());
|
||||||
|
final MessageReference next = iterator.next();
|
||||||
|
Assert.assertNotNull(next);
|
||||||
|
Assert.assertFalse(iterator.hasNext());
|
||||||
|
final Message message = next.getMessage();
|
||||||
|
Assert.assertThat(message, Matchers.instanceOf(AMQPMessage.class));
|
||||||
|
amqpMessage = (AMQPMessage) message;
|
||||||
|
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, amqpMessage.messageDataScanned());
|
||||||
|
}
|
||||||
|
|
||||||
|
AmqpClient client = createAmqpClient();
|
||||||
|
AmqpConnection connection = addConnection(client.connect());
|
||||||
|
AmqpSession session = connection.createSession();
|
||||||
|
|
||||||
|
AmqpReceiver receiver = session.createReceiver(getQueueName());
|
||||||
|
|
||||||
|
receiver.flow(1);
|
||||||
|
AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
|
||||||
|
assertNotNull(receive);
|
||||||
|
assertTrue(receive.isDurable());
|
||||||
|
|
||||||
|
assertEquals(1, afterRestartQueueView.getMessageCount());
|
||||||
|
|
||||||
|
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, amqpMessage.messageDataScanned());
|
||||||
|
|
||||||
|
receive.accept();
|
||||||
|
|
||||||
|
receiver.close();
|
||||||
|
|
||||||
|
Wait.assertEquals(0, () -> afterRestartQueueView.getMessageCount());
|
||||||
|
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue