diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile index b50cf2841b..2ce58f2a6d 100644 --- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile +++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile @@ -33,7 +33,7 @@ ARTEMIS_INSTANCE_ETC_URI='${artemis.instance.etc.uri}' # Java Opts if [ -z "$JAVA_ARGS" ]; then - JAVA_ARGS="${java-opts} -XX:+PrintClassHistogram -XX:+UseG1GC -Xms512M -Xmx2G -Dhawtio.realm=activemq -Dhawtio.offline=true -Dhawtio.role=${role} -Dhawtio.rolePrincipalClasses=org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal -Djolokia.policyLocation=${ARTEMIS_INSTANCE_ETC_URI}jolokia-access.xml" + JAVA_ARGS="${java-opts} -XX:+PrintClassHistogram -XX:+UseG1GC -XX:+UseStringDeduplication -Xms512M -Xmx2G -Dhawtio.realm=activemq -Dhawtio.offline=true -Dhawtio.role=${role} -Dhawtio.rolePrincipalClasses=org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal -Djolokia.policyLocation=${ARTEMIS_INSTANCE_ETC_URI}jolokia-access.xml" fi # diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile.cmd b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile.cmd index 6778ebae74..f653b278b9 100644 --- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile.cmd +++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile.cmd @@ -33,7 +33,7 @@ rem Cluster Properties: Used to pass arguments to ActiveMQ Artemis which can be rem set ARTEMIS_CLUSTER_PROPS=-Dactivemq.remoting.default.port=61617 -Dactivemq.remoting.amqp.port=5673 -Dactivemq.remoting.stomp.port=61614 -Dactivemq.remoting.hornetq.port=5446 rem Java Opts -IF "%JAVA_ARGS%"=="" (set JAVA_ARGS=${java-opts} -XX:+PrintClassHistogram -XX:+UseG1GC -Xms512M -Xmx1024M -Xbootclasspath/a:%ARTEMIS_HOME%\lib\${logmanager};%ARTEMIS_HOME%\lib\${wildfly-common} -Djava.security.auth.login.config=%ARTEMIS_ETC_DIR%\login.config -Dhawtio.offline=true -Dhawtio.realm=activemq -Dhawtio.role=${role} -Dhawtio.rolePrincipalClasses=org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal -Djolokia.policyLocation=%ARTEMIS_INSTANCE_ETC_URI%\jolokia-access.xml -Dartemis.instance=%ARTEMIS_INSTANCE%) +IF "%JAVA_ARGS%"=="" (set JAVA_ARGS=${java-opts} -XX:+PrintClassHistogram -XX:+UseG1GC -XX:+UseStringDeduplication -Xms512M -Xmx1024M -Xbootclasspath/a:%ARTEMIS_HOME%\lib\${logmanager};%ARTEMIS_HOME%\lib\${wildfly-common} -Djava.security.auth.login.config=%ARTEMIS_ETC_DIR%\login.config -Dhawtio.offline=true -Dhawtio.realm=activemq -Dhawtio.role=${role} -Dhawtio.rolePrincipalClasses=org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal -Djolokia.policyLocation=%ARTEMIS_INSTANCE_ETC_URI%\jolokia-access.xml -Dartemis.instance=%ARTEMIS_INSTANCE%) rem Logs Safepoints JVM pauses: Uncomment to enable them rem In addition to the traditional GC logs you could enable some JVM flags to know any meaningful and "hidden" pause that could diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java index 7767fdd9ad..91d884b375 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java @@ -560,12 +560,16 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl public static final class ByteBufSimpleStringPool extends AbstractByteBufPool { - private static final int UUID_LENGTH = 36; + public static final int DEFAULT_MAX_LENGTH = 36; private final int maxLength; public ByteBufSimpleStringPool() { - this.maxLength = UUID_LENGTH; + this.maxLength = DEFAULT_MAX_LENGTH; + } + + public ByteBufSimpleStringPool(final int capacity) { + this(capacity, DEFAULT_MAX_LENGTH); } public ByteBufSimpleStringPool(final int capacity, final int maxCharsLength) { diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/Persister.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/Persister.java index 124dfcff13..bf8df3f1a6 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/Persister.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/Persister.java @@ -19,20 +19,22 @@ package org.apache.activemq.artemis.core.persistence; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -public interface Persister { +public interface Persister { - /** This is to be used to store the protocol-id on Messages. - * Messages are stored on their bare format. - * The protocol manager will be responsible to code or decode messages. - * The caveat here is that the first short-sized bytes need to be this constant. */ + /** + * This is to be used to store the protocol-id on Messages. + * Messages are stored on their bare format. + * The protocol manager will be responsible to code or decode messages. + * The caveat here is that the first short-sized bytes need to be this constant. + */ default byte getID() { - return (byte)0; + return (byte) 0; } int getEncodeSize(T record); void encode(ActiveMQBuffer buffer, T record); - T decode(ActiveMQBuffer buffer, T record); + T decode(ActiveMQBuffer buffer, A arg); } diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUID.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUID.java index 7d8e984c0d..64ff4320dc 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUID.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUID.java @@ -98,6 +98,7 @@ public final class UUID { * @param data 16 byte UUID contents */ public UUID(final int type, final byte[] data) { + assert data.length == 16; mId = data; // Type is multiplexed with time_hi: mId[UUID.INDEX_TYPE] &= (byte) 0x0F; @@ -108,6 +109,7 @@ public final class UUID { } private UUID(final byte[] data) { + assert data.length == 16; mId = data; } diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/algo/KMPNeedle.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/algo/KMPNeedle.java new file mode 100644 index 0000000000..22910c8b60 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/algo/KMPNeedle.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.algo; + +import java.util.Objects; + +/** + * Abstraction of {@code byte[] }Knuth-Morris-Pratt's needle to be used + * to perform pattern matching over indexed haystack of {@code byte}s. + */ +public final class KMPNeedle { + + @FunctionalInterface + public interface IndexedByteSupplier { + + byte get(S source, int index); + } + + private final int[] jumpTable; + private final byte[] needle; + + private KMPNeedle(byte[] needle) { + Objects.requireNonNull(needle); + this.needle = needle; + this.jumpTable = createJumpTable(needle); + } + + private static int[] createJumpTable(byte[] needle) { + final int[] jumpTable = new int[needle.length + 1]; + int j = 0; + for (int i = 1; i < needle.length; i++) { + while (j > 0 && needle[j] != needle[i]) { + j = jumpTable[j]; + } + if (needle[j] == needle[i]) { + j++; + } + jumpTable[i + 1] = j; + } + for (int i = 1; i < jumpTable.length; i++) { + if (jumpTable[i] != 0) { + return jumpTable; + } + } + // optimization over the original algorithm: it would save from accessing any jump table + return null; + } + + /** + * https://en.wikipedia.org/wiki/Knuth%E2%80%93Morris%E2%80%93Pratt_algorithm search algorithm: + * + * This version differ from the original algorithm, because allows to fail fast (and faster) if + * the remaining haystack to be processed is < of the remaining needle to be matched. + */ + public int searchInto(IndexedByteSupplier haystackReader, H haystack, int end, int start) { + assert end >= 0 && start >= 0 && end >= start; + final int length = end - start; + int j = 0; + final int needleLength = needle.length; + int remainingNeedle = needleLength; + for (int i = 0; i < length; i++) { + final int remainingHayStack = length - i; + if (remainingNeedle > remainingHayStack) { + return -1; + } + final int index = start + i; + final byte value = haystackReader.get(haystack, index); + while (j > 0 && needle[j] != value) { + j = jumpTable == null ? 0 : jumpTable[j]; + remainingNeedle = needleLength - j; + } + if (needle[j] == value) { + j++; + remainingNeedle--; + assert remainingNeedle >= 0; + } + if (j == needleLength) { + final int startMatch = index - needleLength + 1; + return startMatch; + } + } + return -1; + } + + public static KMPNeedle of(byte[] needle) { + return new KMPNeedle(needle); + } + +} diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java index 223da17bf9..df81c289c0 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java @@ -446,17 +446,21 @@ public class TypedProperties { } public synchronized void decode(final ByteBuf buffer, - final TypedPropertiesDecoderPools keyValuePools) { + final TypedPropertiesDecoderPools keyValuePools, + boolean replaceExisting) { byte b = buffer.readByte(); if (b == DataConstants.NULL) { - properties = null; - size = 0; + if (replaceExisting) { + properties = null; + size = 0; + } } else { int numHeaders = buffer.readInt(); - - //optimize the case of no collisions to avoid any resize (it doubles the map size!!!) when load factor is reached - properties = new HashMap<>(numHeaders, 1.0f); - size = 0; + if (replaceExisting || properties == null) { + //optimize the case of no collisions to avoid any resize (it doubles the map size!!!) when load factor is reached + properties = new HashMap<>(numHeaders, 1.0f); + } + size = properties.size(); for (int i = 0; i < numHeaders; i++) { final SimpleString key = SimpleString.readSimpleString(buffer, keyValuePools == null ? null : keyValuePools.getPropertyKeysPool()); @@ -529,6 +533,10 @@ public class TypedProperties { } } + public synchronized void decode(final ByteBuf buffer, final TypedPropertiesDecoderPools keyValuePools) { + decode(buffer, keyValuePools, true); + } + public void decode(final ByteBuf buffer) { decode(buffer, null); } @@ -1029,12 +1037,16 @@ public class TypedProperties { public static final class ByteBufStringValuePool extends AbstractByteBufPool { - private static final int UUID_LENGTH = 36; + public static final int DEFAULT_MAX_LENGTH = 36; private final int maxLength; public ByteBufStringValuePool() { - this.maxLength = UUID_LENGTH; + this.maxLength = DEFAULT_MAX_LENGTH; + } + + public ByteBufStringValuePool(final int capacity) { + this(capacity, DEFAULT_MAX_LENGTH); } public ByteBufStringValuePool(final int capacity, final int maxCharsLength) { @@ -1074,9 +1086,9 @@ public class TypedProperties { this.propertyValuesPool = new TypedProperties.StringValue.ByteBufStringValuePool(); } - public TypedPropertiesDecoderPools(int keyPoolCapacity, int valuePoolCapacity, int maxCharsLength) { - this.propertyKeysPool = new SimpleString.ByteBufSimpleStringPool(keyPoolCapacity, maxCharsLength); - this.propertyValuesPool = new TypedProperties.StringValue.ByteBufStringValuePool(valuePoolCapacity, maxCharsLength); + public TypedPropertiesDecoderPools(int keyPoolCapacity, int valuePoolCapacity) { + this.propertyKeysPool = new SimpleString.ByteBufSimpleStringPool(keyPoolCapacity); + this.propertyValuesPool = new TypedProperties.StringValue.ByteBufStringValuePool(valuePoolCapacity); } public SimpleString.ByteBufSimpleStringPool getPropertyKeysPool() { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java index 1de81500fc..09c2d39da9 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java @@ -394,7 +394,7 @@ public interface Message { */ Message setDurable(boolean durable); - Persister getPersister(); + Persister getPersister(); String getAddress(); @@ -454,7 +454,7 @@ public interface Message { void persist(ActiveMQBuffer targetRecord); - void reloadPersistence(ActiveMQBuffer record); + void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools); default void releaseBuffer() { ByteBuf buffer = getBuffer(); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java index 905055022d..b61b27e111 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java @@ -130,7 +130,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { } @Override - public Persister getPersister() { + public Persister getPersister() { return CoreMessagePersister.getInstance(); } @@ -646,11 +646,15 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { } private void decode(boolean beforeAddress) { + decode(beforeAddress, coreMessageObjectPools); + } + + private void decode(boolean beforeAddress, CoreMessageObjectPools pools) { endOfBodyPosition = buffer.readInt(); buffer.skipBytes(endOfBodyPosition - BUFFER_HEADER_SPACE); - decodeHeadersAndProperties(buffer, true); + decodeHeadersAndProperties(buffer, true, pools); buffer.readerIndex(0); validBuffer = true; @@ -662,14 +666,14 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { } public void decodeHeadersAndProperties(final ByteBuf buffer) { - decodeHeadersAndProperties(buffer, false); + decodeHeadersAndProperties(buffer, false, coreMessageObjectPools); } - private void decodeHeadersAndProperties(final ByteBuf buffer, boolean lazyProperties) { + private void decodeHeadersAndProperties(final ByteBuf buffer, boolean lazyProperties, CoreMessageObjectPools pools) { messageIDPosition = buffer.readerIndex(); messageID = buffer.readLong(); - address = SimpleString.readNullableSimpleString(buffer, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressDecoderPool()); + address = SimpleString.readNullableSimpleString(buffer, pools == null ? null : pools.getAddressDecoderPool()); if (buffer.readByte() == DataConstants.NOT_NULL) { byte[] bytes = new byte[16]; buffer.readBytes(bytes); @@ -687,7 +691,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { propertiesLocation = buffer.readerIndex(); } else { properties = new TypedProperties(INTERNAL_PROPERTY_NAMES_PREDICATE); - properties.decode(buffer, coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesDecoderPools()); + properties.decode(buffer, pools == null ? null : pools.getPropertiesDecoderPools()); } } @@ -1180,11 +1184,11 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { } @Override - public void reloadPersistence(ActiveMQBuffer record) { + public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools) { int size = record.readInt(); initBuffer(size); buffer.setIndex(0, 0).writeBytes(record.byteBuf(), size); - decode(false); + decode(false, pools); } @Override diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java index 4c56eac346..62ee5ed9a1 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java @@ -25,14 +25,30 @@ import java.util.function.Supplier; public class CoreMessageObjectPools { - private Supplier addressDecoderPool = Suppliers.memoize(SimpleString.ByteBufSimpleStringPool::new); - private Supplier propertiesDecoderPools = Suppliers.memoize(TypedProperties.TypedPropertiesDecoderPools::new); + private final Supplier addressDecoderPool; + private final Supplier propertiesDecoderPools; - private Supplier groupIdStringSimpleStringPool = Suppliers.memoize(SimpleString.StringSimpleStringPool::new); - private Supplier addressStringSimpleStringPool = Suppliers.memoize(SimpleString.StringSimpleStringPool::new); - private Supplier propertiesStringSimpleStringPools = Suppliers.memoize(TypedProperties.TypedPropertiesStringSimpleStringPools::new); + private final Supplier groupIdStringSimpleStringPool; + private final Supplier addressStringSimpleStringPool; + private final Supplier propertiesStringSimpleStringPools; + + public CoreMessageObjectPools(int addressPoolCapacity, + int groupIdCapacity, + int propertyKeyCapacity, + int propertyValueCapacity) { + addressDecoderPool = Suppliers.memoize(() -> new SimpleString.ByteBufSimpleStringPool(addressPoolCapacity)); + propertiesDecoderPools = Suppliers.memoize(() -> new TypedProperties.TypedPropertiesDecoderPools(propertyKeyCapacity, propertyValueCapacity)); + groupIdStringSimpleStringPool = Suppliers.memoize(() -> new SimpleString.StringSimpleStringPool(groupIdCapacity)); + addressStringSimpleStringPool = Suppliers.memoize(() -> new SimpleString.StringSimpleStringPool(addressPoolCapacity)); + propertiesStringSimpleStringPools = Suppliers.memoize(() -> new TypedProperties.TypedPropertiesStringSimpleStringPools(propertyKeyCapacity, propertyValueCapacity)); + } public CoreMessageObjectPools() { + addressDecoderPool = Suppliers.memoize(SimpleString.ByteBufSimpleStringPool::new); + propertiesDecoderPools = Suppliers.memoize(TypedProperties.TypedPropertiesDecoderPools::new); + groupIdStringSimpleStringPool = Suppliers.memoize(SimpleString.StringSimpleStringPool::new); + addressStringSimpleStringPool = Suppliers.memoize(SimpleString.StringSimpleStringPool::new); + propertiesStringSimpleStringPools = Suppliers.memoize(TypedProperties.TypedPropertiesStringSimpleStringPools::new); } public SimpleString.ByteBufSimpleStringPool getAddressDecoderPool() { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessagePersister.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessagePersister.java index cbd565d3a8..3861d67b2c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessagePersister.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessagePersister.java @@ -23,7 +23,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.utils.DataConstants; -public class CoreMessagePersister implements Persister { +public class CoreMessagePersister implements Persister { public static final byte ID = 1; private static CoreMessagePersister theInstance; @@ -68,14 +68,18 @@ public class CoreMessagePersister implements Persister { record.persist(buffer); } - @Override - public Message decode(ActiveMQBuffer buffer, Message record) { + public Message decode(ActiveMQBuffer buffer, CoreMessageObjectPools pool) { // the caller must consume the first byte already, as that will be used to decide what persister (protocol) to use long id = buffer.readLong(); - SimpleString address = buffer.readNullableSimpleString(); - record = new CoreMessage(); - record.reloadPersistence(buffer); + final SimpleString address; + if (pool == null) { + address = buffer.readNullableSimpleString(); + } else { + address = SimpleString.readNullableSimpleString(buffer.byteBuf(), pool.getAddressDecoderPool()); + } + CoreMessage record = new CoreMessage(); + record.reloadPersistence(buffer, pool); record.setMessageID(id); record.setAddress(address); return record; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java index e1dcf97042..0f809ad56e 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java @@ -248,7 +248,7 @@ public class MessageInternalImpl implements MessageInternal { } @Override - public Persister getPersister() { + public Persister getPersister() { throw new UnsupportedOperationException(); } @@ -340,7 +340,7 @@ public class MessageInternalImpl implements MessageInternal { } @Override - public void reloadPersistence(ActiveMQBuffer record) { + public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools) { throw new UnsupportedOperationException(); } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncoderPersister.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncoderPersister.java index 8fc2a5aaa2..1e734d31cc 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncoderPersister.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncoderPersister.java @@ -22,7 +22,7 @@ import org.apache.activemq.artemis.core.persistence.Persister; /** This is a facade between the new Persister and the former EncodingSupport. * Methods using the old interface will use this as a facade to provide the previous semantic. */ -public class EncoderPersister implements Persister { +public class EncoderPersister implements Persister { private static final EncoderPersister theInstance = new EncoderPersister(); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index fcfe10eaa3..fa96a5c9e5 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.protocol.amqp.broker; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -43,6 +44,7 @@ import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode; import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.DataConstants; +import org.apache.activemq.artemis.utils.algo.KMPNeedle; import org.apache.activemq.artemis.utils.collections.TypedProperties; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Symbol; @@ -74,12 +76,49 @@ import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; import org.jboss.logging.Logger; -// see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format +/** + * See AMQP v1.0 message format + *
+ *
+ *                                                      Bare Message
+ *                                                            |
+ *                                      .---------------------+--------------------.
+ *                                      |                                          |
+ * +--------+-------------+-------------+------------+--------------+--------------+--------+
+ * | header | delivery-   | message-    | properties | application- | application- | footer |
+ * |        | annotations | annotations |            | properties   | data         |        |
+ * +--------+-------------+-------------+------------+--------------+--------------+--------+
+ * |                                                                                        |
+ * '-------------------------------------------+--------------------------------------------'
+ *                                             |
+ *                                      Annotated Message
+ * 
+ *
    + *
  • Zero or one header sections. + *
  • Zero or one delivery-annotation sections. + *
  • Zero or one message-annotation sections. + *
  • Zero or one properties sections. + *
  • Zero or one application-properties sections. + *
  • The body consists of one of the following three choices: + *
      + *
    • one or more data sections + *
    • one or more amqp-sequence sections + *
    • or a single amqp-value section. + *
    + *
  • Zero or one footer sections. + *
+ */ public class AMQPMessage extends RefCountMessage { private static final Logger logger = Logger.getLogger(AMQPMessage.class); public static final SimpleString ADDRESS_PROPERTY = SimpleString.toSimpleString("_AMQ_AD"); + // used to perform quick search + private static final Symbol[] SCHEDULED_DELIVERY_SYMBOLS = new Symbol[]{ + AMQPMessageSupport.SCHEDULED_DELIVERY_TIME, AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY}; + private static final KMPNeedle[] SCHEDULED_DELIVERY_NEEDLES = new KMPNeedle[]{ + AMQPMessageSymbolSearch.kmpNeedleOf(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME), + AMQPMessageSymbolSearch.kmpNeedleOf(AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY)}; public static final int DEFAULT_MESSAGE_FORMAT = 0; public static final int DEFAULT_MESSAGE_PRIORITY = 4; @@ -89,7 +128,10 @@ public class AMQPMessage extends RefCountMessage { // Buffer and state for the data backing this message. private ReadableBuffer data; - private boolean messageDataScanned; + private static final byte NOT_SCANNED = 0; + private static final byte RELOAD_PERSISTENCE = 1; + private static final byte SCANNED = 2; + private byte messageDataScanned; // Marks the message as needed to be re-encoded to update the backing buffer private boolean modified; @@ -450,16 +492,25 @@ public class AMQPMessage extends RefCountMessage { // re-encode should be done to update the backing data with the in memory elements. private synchronized void ensureMessageDataScanned() { - if (!messageDataScanned) { - scanMessageData(); + final byte state = messageDataScanned; + switch (state) { + case NOT_SCANNED: + scanMessageData(); + break; + case RELOAD_PERSISTENCE: + lazyScanAfterReloadPersistence(); + break; + case SCANNED: + // NO-OP + break; + default: + throw new IllegalStateException("invalid messageDataScanned state: expected within " + + Arrays.toString(new byte[]{NOT_SCANNED, SCANNED, RELOAD_PERSISTENCE}) + + " but " + messageDataScanned); } } - private synchronized void scanMessageData() { - this.messageDataScanned = true; - DecoderImpl decoder = TLSEncode.getDecoder(); - decoder.setBuffer(data.rewind()); - + private synchronized void resetMessageData() { header = null; messageAnnotations = null; properties = null; @@ -474,6 +525,14 @@ public class AMQPMessage extends RefCountMessage { propertiesPosition = VALUE_NOT_PRESENT; applicationPropertiesPosition = VALUE_NOT_PRESENT; remainingBodyPosition = VALUE_NOT_PRESENT; + } + + private synchronized void scanMessageData() { + this.messageDataScanned = SCANNED; + DecoderImpl decoder = TLSEncode.getDecoder(); + decoder.setBuffer(data.rewind()); + + resetMessageData(); try { while (data.hasRemaining()) { @@ -741,18 +800,22 @@ public class AMQPMessage extends RefCountMessage { } @Override - public void reloadPersistence(ActiveMQBuffer record) { + public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools) { int size = record.readInt(); byte[] recordArray = new byte[size]; record.readBytes(recordArray); data = ReadableBuffer.ByteBufferReader.wrap(ByteBuffer.wrap(recordArray)); - // Message state is now that the underlying buffer is loaded but the contents - // not yet scanned, once done the message is fully populated and ready for dispatch. - // Force a scan now and tidy the state variables to reflect where we are following - // this reload from the store. + // Message state is now that the underlying buffer is loaded, but the contents not yet scanned + resetMessageData(); + modified = false; + messageDataScanned = RELOAD_PERSISTENCE; + } + + private synchronized void lazyScanAfterReloadPersistence() { + assert messageDataScanned == RELOAD_PERSISTENCE; scanMessageData(); - messageDataScanned = true; + messageDataScanned = SCANNED; modified = false; // Message state should reflect that is came from persistent storage which @@ -771,7 +834,7 @@ public class AMQPMessage extends RefCountMessage { } @Override - public Persister getPersister() { + public Persister getPersister() { return AMQPMessagePersisterV2.getInstance(); } @@ -798,7 +861,7 @@ public class AMQPMessage extends RefCountMessage { private synchronized void encodeMessage() { this.modified = false; - this.messageDataScanned = false; + this.messageDataScanned = NOT_SCANNED; int estimated = Math.max(1500, data != null ? data.capacity() + 1000 : 0); ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(estimated); EncoderImpl encoder = TLSEncode.getEncoder(); @@ -1115,6 +1178,7 @@ public class AMQPMessage extends RefCountMessage { @Override public RoutingType getRoutingType() { + ensureMessageDataScanned(); Object routingType = getMessageAnnotation(AMQPMessageSupport.ROUTING_TYPE); if (routingType != null) { @@ -1183,8 +1247,39 @@ public class AMQPMessage extends RefCountMessage { return this; } + @Override + public boolean hasScheduledDeliveryTime() { + if (scheduledTime >= 0) { + return true; + } + return anyMessageAnnotations(SCHEDULED_DELIVERY_SYMBOLS, SCHEDULED_DELIVERY_NEEDLES); + } + + private boolean anyMessageAnnotations(Symbol[] symbols, KMPNeedle[] symbolNeedles) { + assert symbols.length == symbolNeedles.length; + final int count = symbols.length; + if (messageDataScanned == SCANNED) { + final MessageAnnotations messageAnnotations = this.messageAnnotations; + if (messageAnnotations == null) { + return false; + } + Map map = messageAnnotations.getValue(); + if (map == null) { + return false; + } + for (int i = 0; i < count; i++) { + if (map.containsKey(symbols[i])) { + return true; + } + } + return false; + } + return AMQPMessageSymbolSearch.anyMessageAnnotations(data, symbolNeedles); + } + @Override public Long getScheduledDeliveryTime() { + ensureMessageDataScanned(); if (scheduledTime < 0) { Object objscheduledTime = getMessageAnnotation(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME); Object objdelay = getMessageAnnotation(AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java index c6881245dc..9ab084278b 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java @@ -20,6 +20,7 @@ package org.apache.activemq.artemis.protocol.amqp.broker; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.spi.core.protocol.MessagePersister; import org.apache.activemq.artemis.utils.DataConstants; @@ -62,12 +63,17 @@ public class AMQPMessagePersister extends MessagePersister { } @Override - public Message decode(ActiveMQBuffer buffer, Message record) { + public Message decode(ActiveMQBuffer buffer, CoreMessageObjectPools pool) { long id = buffer.readLong(); long format = buffer.readLong(); - SimpleString address = buffer.readNullableSimpleString(); - record = new AMQPMessage(format); - record.reloadPersistence(buffer); + final SimpleString address; + if (pool == null) { + address = buffer.readNullableSimpleString(); + } else { + address = SimpleString.readNullableSimpleString(buffer.byteBuf(), pool.getAddressDecoderPool()); + } + AMQPMessage record = new AMQPMessage(format); + record.reloadPersistence(buffer, pool); record.setMessageID(id); if (address != null) { record.setAddress(address); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersisterV2.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersisterV2.java index c2686949cf..d263fe9a4f 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersisterV2.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersisterV2.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.protocol.amqp.broker; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.utils.DataConstants; import org.apache.activemq.artemis.utils.collections.TypedProperties; @@ -68,16 +69,24 @@ public class AMQPMessagePersisterV2 extends AMQPMessagePersister { } } - @Override - public Message decode(ActiveMQBuffer buffer, Message record) { - AMQPMessage message = (AMQPMessage)super.decode(buffer, record); + public Message decode(ActiveMQBuffer buffer, CoreMessageObjectPools pool) { + AMQPMessage message = (AMQPMessage) super.decode(buffer, pool); int size = buffer.readInt(); if (size != 0) { - TypedProperties properties = new TypedProperties(Message.INTERNAL_PROPERTY_NAMES_PREDICATE); - properties.decode(buffer.byteBuf()); - message.setExtraProperties(properties); + // message::setAddress could have populated extra properties + // hence, we can safely replace the value on the properties + // if it has been encoded differently in the rest of the buffer + TypedProperties existingExtraProperties = message.getExtraProperties(); + TypedProperties extraProperties = existingExtraProperties; + if (existingExtraProperties == null) { + extraProperties = new TypedProperties(Message.INTERNAL_PROPERTY_NAMES_PREDICATE); + } + extraProperties.decode(buffer.byteBuf(), pool != null ? pool.getPropertiesDecoderPools() : null, existingExtraProperties == null); + if (extraProperties != existingExtraProperties) { + message.setExtraProperties(extraProperties); + } } return message; } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageSymbolSearch.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageSymbolSearch.java new file mode 100644 index 0000000000..7b046a5089 --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageSymbolSearch.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.protocol.amqp.broker; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.IdentityHashMap; +import java.util.List; + +import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode; +import org.apache.activemq.artemis.utils.algo.KMPNeedle; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.AmqpSequence; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; +import org.apache.qpid.proton.amqp.messaging.Data; +import org.apache.qpid.proton.amqp.messaging.Footer; +import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; +import org.apache.qpid.proton.amqp.messaging.Properties; +import org.apache.qpid.proton.codec.DecoderImpl; +import org.apache.qpid.proton.codec.ReadableBuffer; +import org.apache.qpid.proton.codec.TypeConstructor; + +final class AMQPMessageSymbolSearch { + + // used to quick search for MessageAnnotations + private static final IdentityHashMap, Boolean> MSG_BODY_TYPES; + + static { + // we're including MessageAnnotations here because it will still cause termination + final List> classList = Arrays.asList(MessageAnnotations.class, Properties.class, + ApplicationProperties.class, Data.class, + AmqpSequence.class, AmqpValue.class, Footer.class); + MSG_BODY_TYPES = new IdentityHashMap<>(classList.size()); + classList.forEach(clazz -> MSG_BODY_TYPES.put(clazz, Boolean.TRUE)); + } + + public static KMPNeedle kmpNeedleOf(Symbol symbol) { + return KMPNeedle.of(symbol.toString().getBytes(StandardCharsets.US_ASCII)); + } + + public static boolean anyMessageAnnotations(ReadableBuffer data, KMPNeedle[] needles) { + DecoderImpl decoder = TLSEncode.getDecoder(); + final int position = data.position(); + decoder.setBuffer(data.rewind()); + try { + while (data.hasRemaining()) { + TypeConstructor constructor = decoder.readConstructor(); + final Class typeClass = constructor.getTypeClass(); + if (MSG_BODY_TYPES.containsKey(typeClass)) { + if (MessageAnnotations.class.equals(typeClass)) { + final int start = data.position(); + constructor.skipValue(); + final int end = data.position(); + for (int i = 0, count = needles.length; i < count; i++) { + final int foundIndex = needles[i].searchInto(ReadableBuffer::get, data, end, start); + if (foundIndex != -1) { + return true; + } + } + } + return false; + } + constructor.skipValue(); + } + return false; + } finally { + decoder.setBuffer(null); + data.position(position); + } + } + +} diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java index cd21e460fb..7470d60888 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java @@ -21,6 +21,7 @@ import java.util.Map; import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory; @@ -39,7 +40,7 @@ public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory private static String[] SUPPORTED_PROTOCOLS = {AMQP_PROTOCOL_NAME}; @Override - public Persister[] getPersister() { + public Persister[] getPersister() { Persister[] persisters = new Persister[]{AMQPMessagePersister.getInstance(), AMQPMessagePersisterV2.getInstance()}; return persisters; diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java index 88414b4788..0f68edfbdb 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java @@ -31,6 +31,7 @@ import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Date; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.UUID; @@ -147,7 +148,7 @@ public class AMQPMessageTest { final long persistedSize = (long) encoded.readableBytes(); // Now reload from encoded data - message.reloadPersistence(encoded); + message.reloadPersistence(encoded, null); assertEquals(persistedSize, message.getPersistSize()); assertEquals(persistedSize - Integer.BYTES, message.getPersistentSize()); @@ -156,6 +157,72 @@ public class AMQPMessageTest { assertEquals(TEST_TO_ADDRESS, message.getAddress()); } + @Test + public void testHasScheduledDeliveryTimeReloadPersistence() { + final long scheduledTime = System.currentTimeMillis(); + MessageImpl protonMessage = createProtonMessage(); + MessageAnnotations annotations = protonMessage.getMessageAnnotations(); + annotations.getValue().put(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME, scheduledTime); + ActiveMQBuffer encoded = encodeMessageAsPersistedBuffer(protonMessage); + + AMQPMessage message = new AMQPMessage(0); + try { + message.getProtonMessage(); + fail("Should throw NPE due to not being initialized yet"); + } catch (NullPointerException npe) { + } + + // Now reload from encoded data + message.reloadPersistence(encoded, null); + + assertTrue(message.hasScheduledDeliveryTime()); + message.getHeader(); + assertTrue(message.hasScheduledDeliveryTime()); + } + + @Test + public void testHasScheduledDeliveryDelayReloadPersistence() { + final long scheduledDelay = 100000; + MessageImpl protonMessage = createProtonMessage(); + MessageAnnotations annotations = protonMessage.getMessageAnnotations(); + annotations.getValue().put(AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY, scheduledDelay); + ActiveMQBuffer encoded = encodeMessageAsPersistedBuffer(protonMessage); + + AMQPMessage message = new AMQPMessage(0); + try { + message.getProtonMessage(); + fail("Should throw NPE due to not being initialized yet"); + } catch (NullPointerException npe) { + } + + // Now reload from encoded data + message.reloadPersistence(encoded, null); + + assertTrue(message.hasScheduledDeliveryTime()); + message.getHeader(); + assertTrue(message.hasScheduledDeliveryTime()); + } + + @Test + public void testNoScheduledDeliveryTimeOrDelayReloadPersistence() { + MessageImpl protonMessage = createProtonMessage(); + ActiveMQBuffer encoded = encodeMessageAsPersistedBuffer(protonMessage); + + AMQPMessage message = new AMQPMessage(0); + try { + message.getProtonMessage(); + fail("Should throw NPE due to not being initialized yet"); + } catch (NullPointerException npe) { + } + + // Now reload from encoded data + message.reloadPersistence(encoded, null); + + assertFalse(message.hasScheduledDeliveryTime()); + message.getHeader(); + assertFalse(message.hasScheduledDeliveryTime()); + } + //----- Test Memory Estimate access ---------------------------------------// @Test @@ -2010,10 +2077,10 @@ public class AMQPMessageTest { properties.setTo(TEST_TO_ADDRESS); properties.setMessageId(UUID.randomUUID()); - MessageAnnotations annotations = new MessageAnnotations(new HashMap<>()); + MessageAnnotations annotations = new MessageAnnotations(new LinkedHashMap<>()); annotations.getValue().put(Symbol.valueOf(TEST_MESSAGE_ANNOTATION_KEY), TEST_MESSAGE_ANNOTATION_VALUE); - ApplicationProperties applicationProperties = new ApplicationProperties(new HashMap<>()); + ApplicationProperties applicationProperties = new ApplicationProperties(new LinkedHashMap<>()); applicationProperties.getValue().put(TEST_APPLICATION_PROPERTY_KEY, TEST_APPLICATION_PROPERTY_VALUE); AmqpValue body = new AmqpValue(TEST_STRING_BODY); diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java index 45e89530ce..9644b706cc 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java @@ -140,7 +140,7 @@ public class OpenwireMessage implements Message { } @Override - public Persister getPersister() { + public Persister getPersister() { return null; } @@ -205,7 +205,7 @@ public class OpenwireMessage implements Message { } @Override - public void reloadPersistence(ActiveMQBuffer record) { + public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools) { } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index effb7e1cbe..296b221f8f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.core.persistence.impl.journal; +import static org.apache.activemq.artemis.api.core.SimpleString.ByteBufSimpleStringPool.DEFAULT_MAX_LENGTH; +import static org.apache.activemq.artemis.api.core.SimpleString.ByteBufSimpleStringPool.DEFAULT_POOL_CAPACITY; import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_CURSOR; import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE_PENDING; import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.DUPLICATE_ID; @@ -56,6 +58,7 @@ import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.JournalLoadInformation; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; +import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.core.paging.PageTransactionInfo; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; @@ -858,6 +861,19 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp } final MutableLong recordNumber = new MutableLong(); + final CoreMessageObjectPools pools; + if (totalSize > 0) { + final int addresses = (int)Math.max( + DEFAULT_POOL_CAPACITY, + queueInfos == null ? 0 : + queueInfos.values().stream() + .map(QueueBindingInfo::getAddress) + .filter(addr -> addr.length() <= DEFAULT_MAX_LENGTH) + .count() * 2); + pools = new CoreMessageObjectPools(addresses, DEFAULT_POOL_CAPACITY, 128, 128); + } else { + pools = null; + } // This will free up memory sooner while reading the records records.clear(record -> { try { @@ -904,7 +920,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp case JournalRecordIds.ADD_MESSAGE_PROTOCOL: { - Message message = MessagePersister.getInstance().decode(buff, null); + Message message = MessagePersister.getInstance().decode(buff, pools); messages.put(record.id, message); @@ -1716,6 +1732,8 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp final Set> pendingLargeMessages, JournalLoader journalLoader) throws Exception { // recover prepared transactions + CoreMessageObjectPools pools = null; + for (PreparedTransactionInfo preparedTransaction : preparedTransactions) { XidEncoding encodingXid = new XidEncoding(preparedTransaction.getExtraData()); @@ -1749,7 +1767,10 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp break; } case JournalRecordIds.ADD_MESSAGE_PROTOCOL: { - Message message = MessagePersister.getInstance().decode(buff, null); + if (pools == null) { + pools = new CoreMessageObjectPools(); + } + Message message = MessagePersister.getInstance().decode(buff, pools); messages.put(record.id, message); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/LargeMessagePersister.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/LargeMessagePersister.java index b715f976f3..4e02f68c0b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/LargeMessagePersister.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/LargeMessagePersister.java @@ -22,7 +22,7 @@ import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.server.LargeServerMessage; -public class LargeMessagePersister implements Persister { +public class LargeMessagePersister implements Persister { private static final LargeMessagePersister theInstance = new LargeMessagePersister(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java index 75909246e0..6d5e3520c7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java @@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; +import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister; import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -37,7 +38,7 @@ public class CoreProtocolManagerFactory extends AbstractProtocolManagerFactory[] getPersister() { + public Persister[] getPersister() { return new Persister[]{CoreMessagePersister.getInstance()}; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java index ad1317f012..2fddc564f0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java @@ -21,11 +21,12 @@ import java.util.ServiceLoader; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister; import org.apache.activemq.artemis.core.persistence.Persister; import org.jboss.logging.Logger; -public class MessagePersister implements Persister { +public class MessagePersister implements Persister { private static final Logger logger = Logger.getLogger(MessagePersister.class); @@ -33,7 +34,7 @@ public class MessagePersister implements Persister { /** This will be used for reading messages */ private static final int MAX_PERSISTERS = 3; - private static final Persister[] persisters = new Persister[MAX_PERSISTERS]; + private static final Persister[] persisters = new Persister[MAX_PERSISTERS]; static { CoreMessagePersister persister = CoreMessagePersister.getInstance(); @@ -46,7 +47,7 @@ public class MessagePersister implements Persister { } public static void registerProtocol(ProtocolManagerFactory manager) { - Persister[] messagePersisters = manager.getPersister(); + Persister[] messagePersisters = manager.getPersister(); if (messagePersisters == null || messagePersisters.length == 0) { logger.debug("Cannot find persister for " + manager); } else { @@ -69,7 +70,7 @@ public class MessagePersister implements Persister { return persisters[id - 1]; } - public static void registerPersister(Persister persister) { + public static void registerPersister(Persister persister) { if (persister != null) { assert persister.getID() <= MAX_PERSISTERS : "You must update MessagePersister::MAX_PERSISTERS to a higher number"; persisters[persister.getID() - 1] = persister; @@ -97,12 +98,12 @@ public class MessagePersister implements Persister { } @Override - public Message decode(ActiveMQBuffer buffer, Message record) { + public Message decode(ActiveMQBuffer buffer, CoreMessageObjectPools pools) { byte protocol = buffer.readByte(); - Persister persister = getPersister(protocol); + Persister persister = getPersister(protocol); if (persister == null) { throw new NullPointerException("couldn't find factory for type=" + protocol); } - return persister.decode(buffer, record); + return persister.decode(buffer, pools); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java index 4ab34ebf56..77c66f977f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java @@ -21,12 +21,13 @@ import java.util.Map; import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.server.ActiveMQServer; public interface ProtocolManagerFactory

{ - default Persister[] getPersister() { + default Persister[] getPersister() { return new Persister[]{}; } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index b3ae24007a..d0ed6f8ae2 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -323,12 +323,12 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override - public void reloadPersistence(ActiveMQBuffer record) { + public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools) { } @Override - public Persister getPersister() { + public Persister getPersister() { return null; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java index 511d4765b8..dc23bc935a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java @@ -371,12 +371,12 @@ public class AcknowledgeTest extends ActiveMQTestBase { } @Override - public Persister getPersister() { + public Persister getPersister() { return null; } @Override - public void reloadPersistence(ActiveMQBuffer record) { + public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools) { } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationTest.java index 7cd4b05de9..f05552b0eb 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationTest.java @@ -39,6 +39,7 @@ import org.apache.activemq.artemis.core.journal.LoaderCallback; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; +import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister; import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds; @@ -294,7 +295,7 @@ public class SharedNothingReplicationTest extends ActiveMQTestBase { return conf; } - static class SlowMessagePersister extends CoreMessagePersister implements Persister { + static class SlowMessagePersister extends CoreMessagePersister implements Persister { boolean used = false; @@ -343,8 +344,8 @@ public class SharedNothingReplicationTest extends ActiveMQTestBase { } @Override - public Message decode(ActiveMQBuffer buffer, Message record) { - return persister.decode(buffer, record); + public Message decode(ActiveMQBuffer buffer, CoreMessageObjectPools pool) { + return persister.decode(buffer, pool); } }