ARTEMIS-2617 Lazy scan AMQP message data

This commit is contained in:
Francesco Nigro 2020-02-09 12:54:14 +01:00 committed by Clebert Suconic
parent 5897909dc9
commit d42267f05a
4 changed files with 369 additions and 17 deletions

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils.algo;
import java.util.Objects;
/**
* Abstraction of {@code byte[] }<a href="https://en.wikipedia.org/wiki/Knuth%E2%80%93Morris%E2%80%93Pratt_algorithm">Knuth-Morris-Pratt</a>'s needle to be used
* to perform pattern matching over indexed haystack of {@code byte}s.
*/
public final class KMPNeedle {
@FunctionalInterface
public interface IndexedByteSupplier<S> {
byte get(S source, int index);
}
private final int[] jumpTable;
private final byte[] needle;
private KMPNeedle(byte[] needle) {
Objects.requireNonNull(needle);
this.needle = needle;
this.jumpTable = createJumpTable(needle);
}
private static int[] createJumpTable(byte[] needle) {
final int[] jumpTable = new int[needle.length + 1];
int j = 0;
for (int i = 1; i < needle.length; i++) {
while (j > 0 && needle[j] != needle[i]) {
j = jumpTable[j];
}
if (needle[j] == needle[i]) {
j++;
}
jumpTable[i + 1] = j;
}
for (int i = 1; i < jumpTable.length; i++) {
if (jumpTable[i] != 0) {
return jumpTable;
}
}
// optimization over the original algorithm: it would save from accessing any jump table
return null;
}
/**
* https://en.wikipedia.org/wiki/Knuth%E2%80%93Morris%E2%80%93Pratt_algorithm search algorithm:
*
* This version differ from the original algorithm, because allows to fail fast (and faster) if
* the remaining haystack to be processed is < of the remaining needle to be matched.
*/
public <H> int searchInto(IndexedByteSupplier<? super H> haystackReader, H haystack, int end, int start) {
assert end >= 0 && start >= 0 && end >= start;
final int length = end - start;
int j = 0;
final int needleLength = needle.length;
int remainingNeedle = needleLength;
for (int i = 0; i < length; i++) {
final int remainingHayStack = length - i;
if (remainingNeedle > remainingHayStack) {
return -1;
}
final int index = start + i;
final byte value = haystackReader.get(haystack, index);
while (j > 0 && needle[j] != value) {
j = jumpTable == null ? 0 : jumpTable[j];
remainingNeedle = needleLength - j;
}
if (needle[j] == value) {
j++;
remainingNeedle--;
assert remainingNeedle >= 0;
}
if (j == needleLength) {
final int startMatch = index - needleLength + 1;
return startMatch;
}
}
return -1;
}
public static KMPNeedle of(byte[] needle) {
return new KMPNeedle(needle);
}
}

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.protocol.amqp.broker;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
@ -43,6 +44,7 @@ import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.algo.KMPNeedle;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
@ -74,12 +76,49 @@ import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import org.jboss.logging.Logger;
// see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format
/**
* See <a href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format">AMQP v1.0 message format</a>
* <pre>
*
* Bare Message
* |
* .---------------------+--------------------.
* | |
* +--------+-------------+-------------+------------+--------------+--------------+--------+
* | header | delivery- | message- | properties | application- | application- | footer |
* | | annotations | annotations | | properties | data | |
* +--------+-------------+-------------+------------+--------------+--------------+--------+
* | |
* '-------------------------------------------+--------------------------------------------'
* |
* Annotated Message
* </pre>
* <ul>
* <li>Zero or one header sections.
* <li>Zero or one delivery-annotation sections.
* <li>Zero or one message-annotation sections.
* <li>Zero or one properties sections.
* <li>Zero or one application-properties sections.
* <li>The body consists of one of the following three choices:
* <ul>
* <li>one or more data sections
* <li>one or more amqp-sequence sections
* <li>or a single amqp-value section.
* </ul>
* <li>Zero or one footer sections.
* </ul>
*/
public class AMQPMessage extends RefCountMessage {
private static final Logger logger = Logger.getLogger(AMQPMessage.class);
public static final SimpleString ADDRESS_PROPERTY = SimpleString.toSimpleString("_AMQ_AD");
// used to perform quick search
private static final Symbol[] SCHEDULED_DELIVERY_SYMBOLS = new Symbol[]{
AMQPMessageSupport.SCHEDULED_DELIVERY_TIME, AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY};
private static final KMPNeedle[] SCHEDULED_DELIVERY_NEEDLES = new KMPNeedle[]{
AMQPMessageSymbolSearch.kmpNeedleOf(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME),
AMQPMessageSymbolSearch.kmpNeedleOf(AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY)};
public static final int DEFAULT_MESSAGE_FORMAT = 0;
public static final int DEFAULT_MESSAGE_PRIORITY = 4;
@ -89,7 +128,10 @@ public class AMQPMessage extends RefCountMessage {
// Buffer and state for the data backing this message.
private ReadableBuffer data;
private boolean messageDataScanned;
private static final byte NOT_SCANNED = 0;
private static final byte RELOAD_PERSISTENCE = 1;
private static final byte SCANNED = 2;
private byte messageDataScanned;
// Marks the message as needed to be re-encoded to update the backing buffer
private boolean modified;
@ -450,16 +492,25 @@ public class AMQPMessage extends RefCountMessage {
// re-encode should be done to update the backing data with the in memory elements.
private synchronized void ensureMessageDataScanned() {
if (!messageDataScanned) {
final byte state = messageDataScanned;
switch (state) {
case NOT_SCANNED:
scanMessageData();
break;
case RELOAD_PERSISTENCE:
lazyScanAfterReloadPersistence();
break;
case SCANNED:
// NO-OP
break;
default:
throw new IllegalStateException("invalid messageDataScanned state: expected within " +
Arrays.toString(new byte[]{NOT_SCANNED, SCANNED, RELOAD_PERSISTENCE}) +
" but " + messageDataScanned);
}
}
private synchronized void scanMessageData() {
this.messageDataScanned = true;
DecoderImpl decoder = TLSEncode.getDecoder();
decoder.setBuffer(data.rewind());
private synchronized void resetMessageData() {
header = null;
messageAnnotations = null;
properties = null;
@ -474,6 +525,14 @@ public class AMQPMessage extends RefCountMessage {
propertiesPosition = VALUE_NOT_PRESENT;
applicationPropertiesPosition = VALUE_NOT_PRESENT;
remainingBodyPosition = VALUE_NOT_PRESENT;
}
private synchronized void scanMessageData() {
this.messageDataScanned = SCANNED;
DecoderImpl decoder = TLSEncode.getDecoder();
decoder.setBuffer(data.rewind());
resetMessageData();
try {
while (data.hasRemaining()) {
@ -747,12 +806,16 @@ public class AMQPMessage extends RefCountMessage {
record.readBytes(recordArray);
data = ReadableBuffer.ByteBufferReader.wrap(ByteBuffer.wrap(recordArray));
// Message state is now that the underlying buffer is loaded but the contents
// not yet scanned, once done the message is fully populated and ready for dispatch.
// Force a scan now and tidy the state variables to reflect where we are following
// this reload from the store.
// Message state is now that the underlying buffer is loaded, but the contents not yet scanned
resetMessageData();
modified = false;
messageDataScanned = RELOAD_PERSISTENCE;
}
private synchronized void lazyScanAfterReloadPersistence() {
assert messageDataScanned == RELOAD_PERSISTENCE;
scanMessageData();
messageDataScanned = true;
messageDataScanned = SCANNED;
modified = false;
// Message state should reflect that is came from persistent storage which
@ -798,7 +861,7 @@ public class AMQPMessage extends RefCountMessage {
private synchronized void encodeMessage() {
this.modified = false;
this.messageDataScanned = false;
this.messageDataScanned = NOT_SCANNED;
int estimated = Math.max(1500, data != null ? data.capacity() + 1000 : 0);
ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(estimated);
EncoderImpl encoder = TLSEncode.getEncoder();
@ -1115,6 +1178,7 @@ public class AMQPMessage extends RefCountMessage {
@Override
public RoutingType getRoutingType() {
ensureMessageDataScanned();
Object routingType = getMessageAnnotation(AMQPMessageSupport.ROUTING_TYPE);
if (routingType != null) {
@ -1183,8 +1247,39 @@ public class AMQPMessage extends RefCountMessage {
return this;
}
@Override
public boolean hasScheduledDeliveryTime() {
if (scheduledTime >= 0) {
return true;
}
return anyMessageAnnotations(SCHEDULED_DELIVERY_SYMBOLS, SCHEDULED_DELIVERY_NEEDLES);
}
private boolean anyMessageAnnotations(Symbol[] symbols, KMPNeedle[] symbolNeedles) {
assert symbols.length == symbolNeedles.length;
final int count = symbols.length;
if (messageDataScanned == SCANNED) {
final MessageAnnotations messageAnnotations = this.messageAnnotations;
if (messageAnnotations == null) {
return false;
}
Map<Symbol, Object> map = messageAnnotations.getValue();
if (map == null) {
return false;
}
for (int i = 0; i < count; i++) {
if (map.containsKey(symbols[i])) {
return true;
}
}
return false;
}
return AMQPMessageSymbolSearch.anyMessageAnnotations(data, symbolNeedles);
}
@Override
public Long getScheduledDeliveryTime() {
ensureMessageDataScanned();
if (scheduledTime < 0) {
Object objscheduledTime = getMessageAnnotation(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME);
Object objdelay = getMessageAnnotation(AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY);

View File

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.broker;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.IdentityHashMap;
import java.util.List;
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
import org.apache.activemq.artemis.utils.algo.KMPNeedle;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.Footer;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.codec.DecoderImpl;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.codec.TypeConstructor;
final class AMQPMessageSymbolSearch {
// used to quick search for MessageAnnotations
private static final IdentityHashMap<Class<?>, Boolean> MSG_BODY_TYPES;
static {
// we're including MessageAnnotations here because it will still cause termination
final List<Class<?>> classList = Arrays.asList(MessageAnnotations.class, Properties.class,
ApplicationProperties.class, Data.class,
AmqpSequence.class, AmqpValue.class, Footer.class);
MSG_BODY_TYPES = new IdentityHashMap<>(classList.size());
classList.forEach(clazz -> MSG_BODY_TYPES.put(clazz, Boolean.TRUE));
}
public static KMPNeedle kmpNeedleOf(Symbol symbol) {
return KMPNeedle.of(symbol.toString().getBytes(StandardCharsets.US_ASCII));
}
public static boolean anyMessageAnnotations(ReadableBuffer data, KMPNeedle[] needles) {
DecoderImpl decoder = TLSEncode.getDecoder();
final int position = data.position();
decoder.setBuffer(data.rewind());
try {
while (data.hasRemaining()) {
TypeConstructor<?> constructor = decoder.readConstructor();
final Class<?> typeClass = constructor.getTypeClass();
if (MSG_BODY_TYPES.containsKey(typeClass)) {
if (MessageAnnotations.class.equals(typeClass)) {
final int start = data.position();
constructor.skipValue();
final int end = data.position();
for (int i = 0, count = needles.length; i < count; i++) {
final int foundIndex = needles[i].searchInto(ReadableBuffer::get, data, end, start);
if (foundIndex != -1) {
return true;
}
}
}
return false;
}
constructor.skipValue();
}
return false;
} finally {
decoder.setBuffer(null);
data.position(position);
}
}
}

View File

@ -31,6 +31,7 @@ import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@ -156,6 +157,72 @@ public class AMQPMessageTest {
assertEquals(TEST_TO_ADDRESS, message.getAddress());
}
@Test
public void testHasScheduledDeliveryTimeReloadPersistence() {
final long scheduledTime = System.currentTimeMillis();
MessageImpl protonMessage = createProtonMessage();
MessageAnnotations annotations = protonMessage.getMessageAnnotations();
annotations.getValue().put(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME, scheduledTime);
ActiveMQBuffer encoded = encodeMessageAsPersistedBuffer(protonMessage);
AMQPMessage message = new AMQPMessage(0);
try {
message.getProtonMessage();
fail("Should throw NPE due to not being initialized yet");
} catch (NullPointerException npe) {
}
// Now reload from encoded data
message.reloadPersistence(encoded, null);
assertTrue(message.hasScheduledDeliveryTime());
message.getHeader();
assertTrue(message.hasScheduledDeliveryTime());
}
@Test
public void testHasScheduledDeliveryDelayReloadPersistence() {
final long scheduledDelay = 100000;
MessageImpl protonMessage = createProtonMessage();
MessageAnnotations annotations = protonMessage.getMessageAnnotations();
annotations.getValue().put(AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY, scheduledDelay);
ActiveMQBuffer encoded = encodeMessageAsPersistedBuffer(protonMessage);
AMQPMessage message = new AMQPMessage(0);
try {
message.getProtonMessage();
fail("Should throw NPE due to not being initialized yet");
} catch (NullPointerException npe) {
}
// Now reload from encoded data
message.reloadPersistence(encoded, null);
assertTrue(message.hasScheduledDeliveryTime());
message.getHeader();
assertTrue(message.hasScheduledDeliveryTime());
}
@Test
public void testNoScheduledDeliveryTimeOrDelayReloadPersistence() {
MessageImpl protonMessage = createProtonMessage();
ActiveMQBuffer encoded = encodeMessageAsPersistedBuffer(protonMessage);
AMQPMessage message = new AMQPMessage(0);
try {
message.getProtonMessage();
fail("Should throw NPE due to not being initialized yet");
} catch (NullPointerException npe) {
}
// Now reload from encoded data
message.reloadPersistence(encoded, null);
assertFalse(message.hasScheduledDeliveryTime());
message.getHeader();
assertFalse(message.hasScheduledDeliveryTime());
}
//----- Test Memory Estimate access ---------------------------------------//
@Test
@ -2010,10 +2077,10 @@ public class AMQPMessageTest {
properties.setTo(TEST_TO_ADDRESS);
properties.setMessageId(UUID.randomUUID());
MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
MessageAnnotations annotations = new MessageAnnotations(new LinkedHashMap<>());
annotations.getValue().put(Symbol.valueOf(TEST_MESSAGE_ANNOTATION_KEY), TEST_MESSAGE_ANNOTATION_VALUE);
ApplicationProperties applicationProperties = new ApplicationProperties(new HashMap<>());
ApplicationProperties applicationProperties = new ApplicationProperties(new LinkedHashMap<>());
applicationProperties.getValue().put(TEST_APPLICATION_PROPERTY_KEY, TEST_APPLICATION_PROPERTY_VALUE);
AmqpValue body = new AmqpValue(TEST_STRING_BODY);