From d42267f05a670292c4f8e20a70471a95ed8ff5bc Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Sun, 9 Feb 2020 12:54:14 +0100 Subject: [PATCH] ARTEMIS-2617 Lazy scan AMQP message data --- .../artemis/utils/algo/KMPNeedle.java | 103 +++++++++++++++ .../protocol/amqp/broker/AMQPMessage.java | 125 +++++++++++++++--- .../amqp/broker/AMQPMessageSymbolSearch.java | 87 ++++++++++++ .../protocol/amqp/broker/AMQPMessageTest.java | 71 +++++++++- 4 files changed, 369 insertions(+), 17 deletions(-) create mode 100644 artemis-commons/src/main/java/org/apache/activemq/artemis/utils/algo/KMPNeedle.java create mode 100644 artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageSymbolSearch.java diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/algo/KMPNeedle.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/algo/KMPNeedle.java new file mode 100644 index 0000000000..22910c8b60 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/algo/KMPNeedle.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.algo; + +import java.util.Objects; + +/** + * Abstraction of {@code byte[] }Knuth-Morris-Pratt's needle to be used + * to perform pattern matching over indexed haystack of {@code byte}s. + */ +public final class KMPNeedle { + + @FunctionalInterface + public interface IndexedByteSupplier { + + byte get(S source, int index); + } + + private final int[] jumpTable; + private final byte[] needle; + + private KMPNeedle(byte[] needle) { + Objects.requireNonNull(needle); + this.needle = needle; + this.jumpTable = createJumpTable(needle); + } + + private static int[] createJumpTable(byte[] needle) { + final int[] jumpTable = new int[needle.length + 1]; + int j = 0; + for (int i = 1; i < needle.length; i++) { + while (j > 0 && needle[j] != needle[i]) { + j = jumpTable[j]; + } + if (needle[j] == needle[i]) { + j++; + } + jumpTable[i + 1] = j; + } + for (int i = 1; i < jumpTable.length; i++) { + if (jumpTable[i] != 0) { + return jumpTable; + } + } + // optimization over the original algorithm: it would save from accessing any jump table + return null; + } + + /** + * https://en.wikipedia.org/wiki/Knuth%E2%80%93Morris%E2%80%93Pratt_algorithm search algorithm: + * + * This version differ from the original algorithm, because allows to fail fast (and faster) if + * the remaining haystack to be processed is < of the remaining needle to be matched. + */ + public int searchInto(IndexedByteSupplier haystackReader, H haystack, int end, int start) { + assert end >= 0 && start >= 0 && end >= start; + final int length = end - start; + int j = 0; + final int needleLength = needle.length; + int remainingNeedle = needleLength; + for (int i = 0; i < length; i++) { + final int remainingHayStack = length - i; + if (remainingNeedle > remainingHayStack) { + return -1; + } + final int index = start + i; + final byte value = haystackReader.get(haystack, index); + while (j > 0 && needle[j] != value) { + j = jumpTable == null ? 0 : jumpTable[j]; + remainingNeedle = needleLength - j; + } + if (needle[j] == value) { + j++; + remainingNeedle--; + assert remainingNeedle >= 0; + } + if (j == needleLength) { + final int startMatch = index - needleLength + 1; + return startMatch; + } + } + return -1; + } + + public static KMPNeedle of(byte[] needle) { + return new KMPNeedle(needle); + } + +} diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index be758de230..fa96a5c9e5 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.protocol.amqp.broker; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -43,6 +44,7 @@ import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode; import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.DataConstants; +import org.apache.activemq.artemis.utils.algo.KMPNeedle; import org.apache.activemq.artemis.utils.collections.TypedProperties; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Symbol; @@ -74,12 +76,49 @@ import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; import org.jboss.logging.Logger; -// see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format +/** + * See AMQP v1.0 message format + *
+ *
+ *                                                      Bare Message
+ *                                                            |
+ *                                      .---------------------+--------------------.
+ *                                      |                                          |
+ * +--------+-------------+-------------+------------+--------------+--------------+--------+
+ * | header | delivery-   | message-    | properties | application- | application- | footer |
+ * |        | annotations | annotations |            | properties   | data         |        |
+ * +--------+-------------+-------------+------------+--------------+--------------+--------+
+ * |                                                                                        |
+ * '-------------------------------------------+--------------------------------------------'
+ *                                             |
+ *                                      Annotated Message
+ * 
+ *
    + *
  • Zero or one header sections. + *
  • Zero or one delivery-annotation sections. + *
  • Zero or one message-annotation sections. + *
  • Zero or one properties sections. + *
  • Zero or one application-properties sections. + *
  • The body consists of one of the following three choices: + *
      + *
    • one or more data sections + *
    • one or more amqp-sequence sections + *
    • or a single amqp-value section. + *
    + *
  • Zero or one footer sections. + *
+ */ public class AMQPMessage extends RefCountMessage { private static final Logger logger = Logger.getLogger(AMQPMessage.class); public static final SimpleString ADDRESS_PROPERTY = SimpleString.toSimpleString("_AMQ_AD"); + // used to perform quick search + private static final Symbol[] SCHEDULED_DELIVERY_SYMBOLS = new Symbol[]{ + AMQPMessageSupport.SCHEDULED_DELIVERY_TIME, AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY}; + private static final KMPNeedle[] SCHEDULED_DELIVERY_NEEDLES = new KMPNeedle[]{ + AMQPMessageSymbolSearch.kmpNeedleOf(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME), + AMQPMessageSymbolSearch.kmpNeedleOf(AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY)}; public static final int DEFAULT_MESSAGE_FORMAT = 0; public static final int DEFAULT_MESSAGE_PRIORITY = 4; @@ -89,7 +128,10 @@ public class AMQPMessage extends RefCountMessage { // Buffer and state for the data backing this message. private ReadableBuffer data; - private boolean messageDataScanned; + private static final byte NOT_SCANNED = 0; + private static final byte RELOAD_PERSISTENCE = 1; + private static final byte SCANNED = 2; + private byte messageDataScanned; // Marks the message as needed to be re-encoded to update the backing buffer private boolean modified; @@ -450,16 +492,25 @@ public class AMQPMessage extends RefCountMessage { // re-encode should be done to update the backing data with the in memory elements. private synchronized void ensureMessageDataScanned() { - if (!messageDataScanned) { - scanMessageData(); + final byte state = messageDataScanned; + switch (state) { + case NOT_SCANNED: + scanMessageData(); + break; + case RELOAD_PERSISTENCE: + lazyScanAfterReloadPersistence(); + break; + case SCANNED: + // NO-OP + break; + default: + throw new IllegalStateException("invalid messageDataScanned state: expected within " + + Arrays.toString(new byte[]{NOT_SCANNED, SCANNED, RELOAD_PERSISTENCE}) + + " but " + messageDataScanned); } } - private synchronized void scanMessageData() { - this.messageDataScanned = true; - DecoderImpl decoder = TLSEncode.getDecoder(); - decoder.setBuffer(data.rewind()); - + private synchronized void resetMessageData() { header = null; messageAnnotations = null; properties = null; @@ -474,6 +525,14 @@ public class AMQPMessage extends RefCountMessage { propertiesPosition = VALUE_NOT_PRESENT; applicationPropertiesPosition = VALUE_NOT_PRESENT; remainingBodyPosition = VALUE_NOT_PRESENT; + } + + private synchronized void scanMessageData() { + this.messageDataScanned = SCANNED; + DecoderImpl decoder = TLSEncode.getDecoder(); + decoder.setBuffer(data.rewind()); + + resetMessageData(); try { while (data.hasRemaining()) { @@ -747,12 +806,16 @@ public class AMQPMessage extends RefCountMessage { record.readBytes(recordArray); data = ReadableBuffer.ByteBufferReader.wrap(ByteBuffer.wrap(recordArray)); - // Message state is now that the underlying buffer is loaded but the contents - // not yet scanned, once done the message is fully populated and ready for dispatch. - // Force a scan now and tidy the state variables to reflect where we are following - // this reload from the store. + // Message state is now that the underlying buffer is loaded, but the contents not yet scanned + resetMessageData(); + modified = false; + messageDataScanned = RELOAD_PERSISTENCE; + } + + private synchronized void lazyScanAfterReloadPersistence() { + assert messageDataScanned == RELOAD_PERSISTENCE; scanMessageData(); - messageDataScanned = true; + messageDataScanned = SCANNED; modified = false; // Message state should reflect that is came from persistent storage which @@ -798,7 +861,7 @@ public class AMQPMessage extends RefCountMessage { private synchronized void encodeMessage() { this.modified = false; - this.messageDataScanned = false; + this.messageDataScanned = NOT_SCANNED; int estimated = Math.max(1500, data != null ? data.capacity() + 1000 : 0); ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(estimated); EncoderImpl encoder = TLSEncode.getEncoder(); @@ -1115,6 +1178,7 @@ public class AMQPMessage extends RefCountMessage { @Override public RoutingType getRoutingType() { + ensureMessageDataScanned(); Object routingType = getMessageAnnotation(AMQPMessageSupport.ROUTING_TYPE); if (routingType != null) { @@ -1183,8 +1247,39 @@ public class AMQPMessage extends RefCountMessage { return this; } + @Override + public boolean hasScheduledDeliveryTime() { + if (scheduledTime >= 0) { + return true; + } + return anyMessageAnnotations(SCHEDULED_DELIVERY_SYMBOLS, SCHEDULED_DELIVERY_NEEDLES); + } + + private boolean anyMessageAnnotations(Symbol[] symbols, KMPNeedle[] symbolNeedles) { + assert symbols.length == symbolNeedles.length; + final int count = symbols.length; + if (messageDataScanned == SCANNED) { + final MessageAnnotations messageAnnotations = this.messageAnnotations; + if (messageAnnotations == null) { + return false; + } + Map map = messageAnnotations.getValue(); + if (map == null) { + return false; + } + for (int i = 0; i < count; i++) { + if (map.containsKey(symbols[i])) { + return true; + } + } + return false; + } + return AMQPMessageSymbolSearch.anyMessageAnnotations(data, symbolNeedles); + } + @Override public Long getScheduledDeliveryTime() { + ensureMessageDataScanned(); if (scheduledTime < 0) { Object objscheduledTime = getMessageAnnotation(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME); Object objdelay = getMessageAnnotation(AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageSymbolSearch.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageSymbolSearch.java new file mode 100644 index 0000000000..7b046a5089 --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageSymbolSearch.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.protocol.amqp.broker; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.IdentityHashMap; +import java.util.List; + +import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode; +import org.apache.activemq.artemis.utils.algo.KMPNeedle; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.AmqpSequence; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; +import org.apache.qpid.proton.amqp.messaging.Data; +import org.apache.qpid.proton.amqp.messaging.Footer; +import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; +import org.apache.qpid.proton.amqp.messaging.Properties; +import org.apache.qpid.proton.codec.DecoderImpl; +import org.apache.qpid.proton.codec.ReadableBuffer; +import org.apache.qpid.proton.codec.TypeConstructor; + +final class AMQPMessageSymbolSearch { + + // used to quick search for MessageAnnotations + private static final IdentityHashMap, Boolean> MSG_BODY_TYPES; + + static { + // we're including MessageAnnotations here because it will still cause termination + final List> classList = Arrays.asList(MessageAnnotations.class, Properties.class, + ApplicationProperties.class, Data.class, + AmqpSequence.class, AmqpValue.class, Footer.class); + MSG_BODY_TYPES = new IdentityHashMap<>(classList.size()); + classList.forEach(clazz -> MSG_BODY_TYPES.put(clazz, Boolean.TRUE)); + } + + public static KMPNeedle kmpNeedleOf(Symbol symbol) { + return KMPNeedle.of(symbol.toString().getBytes(StandardCharsets.US_ASCII)); + } + + public static boolean anyMessageAnnotations(ReadableBuffer data, KMPNeedle[] needles) { + DecoderImpl decoder = TLSEncode.getDecoder(); + final int position = data.position(); + decoder.setBuffer(data.rewind()); + try { + while (data.hasRemaining()) { + TypeConstructor constructor = decoder.readConstructor(); + final Class typeClass = constructor.getTypeClass(); + if (MSG_BODY_TYPES.containsKey(typeClass)) { + if (MessageAnnotations.class.equals(typeClass)) { + final int start = data.position(); + constructor.skipValue(); + final int end = data.position(); + for (int i = 0, count = needles.length; i < count; i++) { + final int foundIndex = needles[i].searchInto(ReadableBuffer::get, data, end, start); + if (foundIndex != -1) { + return true; + } + } + } + return false; + } + constructor.skipValue(); + } + return false; + } finally { + decoder.setBuffer(null); + data.position(position); + } + } + +} diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java index 695bf0f943..0f68edfbdb 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java @@ -31,6 +31,7 @@ import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Date; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.UUID; @@ -156,6 +157,72 @@ public class AMQPMessageTest { assertEquals(TEST_TO_ADDRESS, message.getAddress()); } + @Test + public void testHasScheduledDeliveryTimeReloadPersistence() { + final long scheduledTime = System.currentTimeMillis(); + MessageImpl protonMessage = createProtonMessage(); + MessageAnnotations annotations = protonMessage.getMessageAnnotations(); + annotations.getValue().put(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME, scheduledTime); + ActiveMQBuffer encoded = encodeMessageAsPersistedBuffer(protonMessage); + + AMQPMessage message = new AMQPMessage(0); + try { + message.getProtonMessage(); + fail("Should throw NPE due to not being initialized yet"); + } catch (NullPointerException npe) { + } + + // Now reload from encoded data + message.reloadPersistence(encoded, null); + + assertTrue(message.hasScheduledDeliveryTime()); + message.getHeader(); + assertTrue(message.hasScheduledDeliveryTime()); + } + + @Test + public void testHasScheduledDeliveryDelayReloadPersistence() { + final long scheduledDelay = 100000; + MessageImpl protonMessage = createProtonMessage(); + MessageAnnotations annotations = protonMessage.getMessageAnnotations(); + annotations.getValue().put(AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY, scheduledDelay); + ActiveMQBuffer encoded = encodeMessageAsPersistedBuffer(protonMessage); + + AMQPMessage message = new AMQPMessage(0); + try { + message.getProtonMessage(); + fail("Should throw NPE due to not being initialized yet"); + } catch (NullPointerException npe) { + } + + // Now reload from encoded data + message.reloadPersistence(encoded, null); + + assertTrue(message.hasScheduledDeliveryTime()); + message.getHeader(); + assertTrue(message.hasScheduledDeliveryTime()); + } + + @Test + public void testNoScheduledDeliveryTimeOrDelayReloadPersistence() { + MessageImpl protonMessage = createProtonMessage(); + ActiveMQBuffer encoded = encodeMessageAsPersistedBuffer(protonMessage); + + AMQPMessage message = new AMQPMessage(0); + try { + message.getProtonMessage(); + fail("Should throw NPE due to not being initialized yet"); + } catch (NullPointerException npe) { + } + + // Now reload from encoded data + message.reloadPersistence(encoded, null); + + assertFalse(message.hasScheduledDeliveryTime()); + message.getHeader(); + assertFalse(message.hasScheduledDeliveryTime()); + } + //----- Test Memory Estimate access ---------------------------------------// @Test @@ -2010,10 +2077,10 @@ public class AMQPMessageTest { properties.setTo(TEST_TO_ADDRESS); properties.setMessageId(UUID.randomUUID()); - MessageAnnotations annotations = new MessageAnnotations(new HashMap<>()); + MessageAnnotations annotations = new MessageAnnotations(new LinkedHashMap<>()); annotations.getValue().put(Symbol.valueOf(TEST_MESSAGE_ANNOTATION_KEY), TEST_MESSAGE_ANNOTATION_VALUE); - ApplicationProperties applicationProperties = new ApplicationProperties(new HashMap<>()); + ApplicationProperties applicationProperties = new ApplicationProperties(new LinkedHashMap<>()); applicationProperties.getValue().put(TEST_APPLICATION_PROPERTY_KEY, TEST_APPLICATION_PROPERTY_VALUE); AmqpValue body = new AmqpValue(TEST_STRING_BODY);