This commit is contained in:
Clebert Suconic 2020-02-12 13:29:51 -05:00
commit 69779eed10
28 changed files with 536 additions and 99 deletions

View File

@ -33,7 +33,7 @@ ARTEMIS_INSTANCE_ETC_URI='${artemis.instance.etc.uri}'
# Java Opts # Java Opts
if [ -z "$JAVA_ARGS" ]; then if [ -z "$JAVA_ARGS" ]; then
JAVA_ARGS="${java-opts} -XX:+PrintClassHistogram -XX:+UseG1GC -Xms512M -Xmx2G -Dhawtio.realm=activemq -Dhawtio.offline=true -Dhawtio.role=${role} -Dhawtio.rolePrincipalClasses=org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal -Djolokia.policyLocation=${ARTEMIS_INSTANCE_ETC_URI}jolokia-access.xml" JAVA_ARGS="${java-opts} -XX:+PrintClassHistogram -XX:+UseG1GC -XX:+UseStringDeduplication -Xms512M -Xmx2G -Dhawtio.realm=activemq -Dhawtio.offline=true -Dhawtio.role=${role} -Dhawtio.rolePrincipalClasses=org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal -Djolokia.policyLocation=${ARTEMIS_INSTANCE_ETC_URI}jolokia-access.xml"
fi fi
# #

View File

@ -33,7 +33,7 @@ rem Cluster Properties: Used to pass arguments to ActiveMQ Artemis which can be
rem set ARTEMIS_CLUSTER_PROPS=-Dactivemq.remoting.default.port=61617 -Dactivemq.remoting.amqp.port=5673 -Dactivemq.remoting.stomp.port=61614 -Dactivemq.remoting.hornetq.port=5446 rem set ARTEMIS_CLUSTER_PROPS=-Dactivemq.remoting.default.port=61617 -Dactivemq.remoting.amqp.port=5673 -Dactivemq.remoting.stomp.port=61614 -Dactivemq.remoting.hornetq.port=5446
rem Java Opts rem Java Opts
IF "%JAVA_ARGS%"=="" (set JAVA_ARGS=${java-opts} -XX:+PrintClassHistogram -XX:+UseG1GC -Xms512M -Xmx1024M -Xbootclasspath/a:%ARTEMIS_HOME%\lib\${logmanager};%ARTEMIS_HOME%\lib\${wildfly-common} -Djava.security.auth.login.config=%ARTEMIS_ETC_DIR%\login.config -Dhawtio.offline=true -Dhawtio.realm=activemq -Dhawtio.role=${role} -Dhawtio.rolePrincipalClasses=org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal -Djolokia.policyLocation=%ARTEMIS_INSTANCE_ETC_URI%\jolokia-access.xml -Dartemis.instance=%ARTEMIS_INSTANCE%) IF "%JAVA_ARGS%"=="" (set JAVA_ARGS=${java-opts} -XX:+PrintClassHistogram -XX:+UseG1GC -XX:+UseStringDeduplication -Xms512M -Xmx1024M -Xbootclasspath/a:%ARTEMIS_HOME%\lib\${logmanager};%ARTEMIS_HOME%\lib\${wildfly-common} -Djava.security.auth.login.config=%ARTEMIS_ETC_DIR%\login.config -Dhawtio.offline=true -Dhawtio.realm=activemq -Dhawtio.role=${role} -Dhawtio.rolePrincipalClasses=org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal -Djolokia.policyLocation=%ARTEMIS_INSTANCE_ETC_URI%\jolokia-access.xml -Dartemis.instance=%ARTEMIS_INSTANCE%)
rem Logs Safepoints JVM pauses: Uncomment to enable them rem Logs Safepoints JVM pauses: Uncomment to enable them
rem In addition to the traditional GC logs you could enable some JVM flags to know any meaningful and "hidden" pause that could rem In addition to the traditional GC logs you could enable some JVM flags to know any meaningful and "hidden" pause that could

View File

@ -560,12 +560,16 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
public static final class ByteBufSimpleStringPool extends AbstractByteBufPool<SimpleString> { public static final class ByteBufSimpleStringPool extends AbstractByteBufPool<SimpleString> {
private static final int UUID_LENGTH = 36; public static final int DEFAULT_MAX_LENGTH = 36;
private final int maxLength; private final int maxLength;
public ByteBufSimpleStringPool() { public ByteBufSimpleStringPool() {
this.maxLength = UUID_LENGTH; this.maxLength = DEFAULT_MAX_LENGTH;
}
public ByteBufSimpleStringPool(final int capacity) {
this(capacity, DEFAULT_MAX_LENGTH);
} }
public ByteBufSimpleStringPool(final int capacity, final int maxCharsLength) { public ByteBufSimpleStringPool(final int capacity, final int maxCharsLength) {

View File

@ -19,12 +19,14 @@ package org.apache.activemq.artemis.core.persistence;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
public interface Persister<T extends Object> { public interface Persister<T extends Object, A> {
/** This is to be used to store the protocol-id on Messages. /**
* This is to be used to store the protocol-id on Messages.
* Messages are stored on their bare format. * Messages are stored on their bare format.
* The protocol manager will be responsible to code or decode messages. * The protocol manager will be responsible to code or decode messages.
* The caveat here is that the first short-sized bytes need to be this constant. */ * The caveat here is that the first short-sized bytes need to be this constant.
*/
default byte getID() { default byte getID() {
return (byte) 0; return (byte) 0;
} }
@ -33,6 +35,6 @@ public interface Persister<T extends Object> {
void encode(ActiveMQBuffer buffer, T record); void encode(ActiveMQBuffer buffer, T record);
T decode(ActiveMQBuffer buffer, T record); T decode(ActiveMQBuffer buffer, A arg);
} }

View File

@ -98,6 +98,7 @@ public final class UUID {
* @param data 16 byte UUID contents * @param data 16 byte UUID contents
*/ */
public UUID(final int type, final byte[] data) { public UUID(final int type, final byte[] data) {
assert data.length == 16;
mId = data; mId = data;
// Type is multiplexed with time_hi: // Type is multiplexed with time_hi:
mId[UUID.INDEX_TYPE] &= (byte) 0x0F; mId[UUID.INDEX_TYPE] &= (byte) 0x0F;
@ -108,6 +109,7 @@ public final class UUID {
} }
private UUID(final byte[] data) { private UUID(final byte[] data) {
assert data.length == 16;
mId = data; mId = data;
} }

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils.algo;
import java.util.Objects;
/**
* Abstraction of {@code byte[] }<a href="https://en.wikipedia.org/wiki/Knuth%E2%80%93Morris%E2%80%93Pratt_algorithm">Knuth-Morris-Pratt</a>'s needle to be used
* to perform pattern matching over indexed haystack of {@code byte}s.
*/
public final class KMPNeedle {
@FunctionalInterface
public interface IndexedByteSupplier<S> {
byte get(S source, int index);
}
private final int[] jumpTable;
private final byte[] needle;
private KMPNeedle(byte[] needle) {
Objects.requireNonNull(needle);
this.needle = needle;
this.jumpTable = createJumpTable(needle);
}
private static int[] createJumpTable(byte[] needle) {
final int[] jumpTable = new int[needle.length + 1];
int j = 0;
for (int i = 1; i < needle.length; i++) {
while (j > 0 && needle[j] != needle[i]) {
j = jumpTable[j];
}
if (needle[j] == needle[i]) {
j++;
}
jumpTable[i + 1] = j;
}
for (int i = 1; i < jumpTable.length; i++) {
if (jumpTable[i] != 0) {
return jumpTable;
}
}
// optimization over the original algorithm: it would save from accessing any jump table
return null;
}
/**
* https://en.wikipedia.org/wiki/Knuth%E2%80%93Morris%E2%80%93Pratt_algorithm search algorithm:
*
* This version differ from the original algorithm, because allows to fail fast (and faster) if
* the remaining haystack to be processed is < of the remaining needle to be matched.
*/
public <H> int searchInto(IndexedByteSupplier<? super H> haystackReader, H haystack, int end, int start) {
assert end >= 0 && start >= 0 && end >= start;
final int length = end - start;
int j = 0;
final int needleLength = needle.length;
int remainingNeedle = needleLength;
for (int i = 0; i < length; i++) {
final int remainingHayStack = length - i;
if (remainingNeedle > remainingHayStack) {
return -1;
}
final int index = start + i;
final byte value = haystackReader.get(haystack, index);
while (j > 0 && needle[j] != value) {
j = jumpTable == null ? 0 : jumpTable[j];
remainingNeedle = needleLength - j;
}
if (needle[j] == value) {
j++;
remainingNeedle--;
assert remainingNeedle >= 0;
}
if (j == needleLength) {
final int startMatch = index - needleLength + 1;
return startMatch;
}
}
return -1;
}
public static KMPNeedle of(byte[] needle) {
return new KMPNeedle(needle);
}
}

View File

@ -446,17 +446,21 @@ public class TypedProperties {
} }
public synchronized void decode(final ByteBuf buffer, public synchronized void decode(final ByteBuf buffer,
final TypedPropertiesDecoderPools keyValuePools) { final TypedPropertiesDecoderPools keyValuePools,
boolean replaceExisting) {
byte b = buffer.readByte(); byte b = buffer.readByte();
if (b == DataConstants.NULL) { if (b == DataConstants.NULL) {
if (replaceExisting) {
properties = null; properties = null;
size = 0; size = 0;
}
} else { } else {
int numHeaders = buffer.readInt(); int numHeaders = buffer.readInt();
if (replaceExisting || properties == null) {
//optimize the case of no collisions to avoid any resize (it doubles the map size!!!) when load factor is reached //optimize the case of no collisions to avoid any resize (it doubles the map size!!!) when load factor is reached
properties = new HashMap<>(numHeaders, 1.0f); properties = new HashMap<>(numHeaders, 1.0f);
size = 0; }
size = properties.size();
for (int i = 0; i < numHeaders; i++) { for (int i = 0; i < numHeaders; i++) {
final SimpleString key = SimpleString.readSimpleString(buffer, keyValuePools == null ? null : keyValuePools.getPropertyKeysPool()); final SimpleString key = SimpleString.readSimpleString(buffer, keyValuePools == null ? null : keyValuePools.getPropertyKeysPool());
@ -529,6 +533,10 @@ public class TypedProperties {
} }
} }
public synchronized void decode(final ByteBuf buffer, final TypedPropertiesDecoderPools keyValuePools) {
decode(buffer, keyValuePools, true);
}
public void decode(final ByteBuf buffer) { public void decode(final ByteBuf buffer) {
decode(buffer, null); decode(buffer, null);
} }
@ -1029,12 +1037,16 @@ public class TypedProperties {
public static final class ByteBufStringValuePool extends AbstractByteBufPool<StringValue> { public static final class ByteBufStringValuePool extends AbstractByteBufPool<StringValue> {
private static final int UUID_LENGTH = 36; public static final int DEFAULT_MAX_LENGTH = 36;
private final int maxLength; private final int maxLength;
public ByteBufStringValuePool() { public ByteBufStringValuePool() {
this.maxLength = UUID_LENGTH; this.maxLength = DEFAULT_MAX_LENGTH;
}
public ByteBufStringValuePool(final int capacity) {
this(capacity, DEFAULT_MAX_LENGTH);
} }
public ByteBufStringValuePool(final int capacity, final int maxCharsLength) { public ByteBufStringValuePool(final int capacity, final int maxCharsLength) {
@ -1074,9 +1086,9 @@ public class TypedProperties {
this.propertyValuesPool = new TypedProperties.StringValue.ByteBufStringValuePool(); this.propertyValuesPool = new TypedProperties.StringValue.ByteBufStringValuePool();
} }
public TypedPropertiesDecoderPools(int keyPoolCapacity, int valuePoolCapacity, int maxCharsLength) { public TypedPropertiesDecoderPools(int keyPoolCapacity, int valuePoolCapacity) {
this.propertyKeysPool = new SimpleString.ByteBufSimpleStringPool(keyPoolCapacity, maxCharsLength); this.propertyKeysPool = new SimpleString.ByteBufSimpleStringPool(keyPoolCapacity);
this.propertyValuesPool = new TypedProperties.StringValue.ByteBufStringValuePool(valuePoolCapacity, maxCharsLength); this.propertyValuesPool = new TypedProperties.StringValue.ByteBufStringValuePool(valuePoolCapacity);
} }
public SimpleString.ByteBufSimpleStringPool getPropertyKeysPool() { public SimpleString.ByteBufSimpleStringPool getPropertyKeysPool() {

View File

@ -394,7 +394,7 @@ public interface Message {
*/ */
Message setDurable(boolean durable); Message setDurable(boolean durable);
Persister<Message> getPersister(); Persister<Message, CoreMessageObjectPools> getPersister();
String getAddress(); String getAddress();
@ -454,7 +454,7 @@ public interface Message {
void persist(ActiveMQBuffer targetRecord); void persist(ActiveMQBuffer targetRecord);
void reloadPersistence(ActiveMQBuffer record); void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools);
default void releaseBuffer() { default void releaseBuffer() {
ByteBuf buffer = getBuffer(); ByteBuf buffer = getBuffer();

View File

@ -130,7 +130,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
} }
@Override @Override
public Persister<Message> getPersister() { public Persister<Message, CoreMessageObjectPools> getPersister() {
return CoreMessagePersister.getInstance(); return CoreMessagePersister.getInstance();
} }
@ -646,11 +646,15 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
} }
private void decode(boolean beforeAddress) { private void decode(boolean beforeAddress) {
decode(beforeAddress, coreMessageObjectPools);
}
private void decode(boolean beforeAddress, CoreMessageObjectPools pools) {
endOfBodyPosition = buffer.readInt(); endOfBodyPosition = buffer.readInt();
buffer.skipBytes(endOfBodyPosition - BUFFER_HEADER_SPACE); buffer.skipBytes(endOfBodyPosition - BUFFER_HEADER_SPACE);
decodeHeadersAndProperties(buffer, true); decodeHeadersAndProperties(buffer, true, pools);
buffer.readerIndex(0); buffer.readerIndex(0);
validBuffer = true; validBuffer = true;
@ -662,14 +666,14 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
} }
public void decodeHeadersAndProperties(final ByteBuf buffer) { public void decodeHeadersAndProperties(final ByteBuf buffer) {
decodeHeadersAndProperties(buffer, false); decodeHeadersAndProperties(buffer, false, coreMessageObjectPools);
} }
private void decodeHeadersAndProperties(final ByteBuf buffer, boolean lazyProperties) { private void decodeHeadersAndProperties(final ByteBuf buffer, boolean lazyProperties, CoreMessageObjectPools pools) {
messageIDPosition = buffer.readerIndex(); messageIDPosition = buffer.readerIndex();
messageID = buffer.readLong(); messageID = buffer.readLong();
address = SimpleString.readNullableSimpleString(buffer, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressDecoderPool()); address = SimpleString.readNullableSimpleString(buffer, pools == null ? null : pools.getAddressDecoderPool());
if (buffer.readByte() == DataConstants.NOT_NULL) { if (buffer.readByte() == DataConstants.NOT_NULL) {
byte[] bytes = new byte[16]; byte[] bytes = new byte[16];
buffer.readBytes(bytes); buffer.readBytes(bytes);
@ -687,7 +691,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
propertiesLocation = buffer.readerIndex(); propertiesLocation = buffer.readerIndex();
} else { } else {
properties = new TypedProperties(INTERNAL_PROPERTY_NAMES_PREDICATE); properties = new TypedProperties(INTERNAL_PROPERTY_NAMES_PREDICATE);
properties.decode(buffer, coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesDecoderPools()); properties.decode(buffer, pools == null ? null : pools.getPropertiesDecoderPools());
} }
} }
@ -1180,11 +1184,11 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
} }
@Override @Override
public void reloadPersistence(ActiveMQBuffer record) { public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools) {
int size = record.readInt(); int size = record.readInt();
initBuffer(size); initBuffer(size);
buffer.setIndex(0, 0).writeBytes(record.byteBuf(), size); buffer.setIndex(0, 0).writeBytes(record.byteBuf(), size);
decode(false); decode(false, pools);
} }
@Override @Override

View File

@ -25,14 +25,30 @@ import java.util.function.Supplier;
public class CoreMessageObjectPools { public class CoreMessageObjectPools {
private Supplier<SimpleString.ByteBufSimpleStringPool> addressDecoderPool = Suppliers.memoize(SimpleString.ByteBufSimpleStringPool::new); private final Supplier<SimpleString.ByteBufSimpleStringPool> addressDecoderPool;
private Supplier<TypedProperties.TypedPropertiesDecoderPools> propertiesDecoderPools = Suppliers.memoize(TypedProperties.TypedPropertiesDecoderPools::new); private final Supplier<TypedProperties.TypedPropertiesDecoderPools> propertiesDecoderPools;
private Supplier<SimpleString.StringSimpleStringPool> groupIdStringSimpleStringPool = Suppliers.memoize(SimpleString.StringSimpleStringPool::new); private final Supplier<SimpleString.StringSimpleStringPool> groupIdStringSimpleStringPool;
private Supplier<SimpleString.StringSimpleStringPool> addressStringSimpleStringPool = Suppliers.memoize(SimpleString.StringSimpleStringPool::new); private final Supplier<SimpleString.StringSimpleStringPool> addressStringSimpleStringPool;
private Supplier<TypedProperties.TypedPropertiesStringSimpleStringPools> propertiesStringSimpleStringPools = Suppliers.memoize(TypedProperties.TypedPropertiesStringSimpleStringPools::new); private final Supplier<TypedProperties.TypedPropertiesStringSimpleStringPools> propertiesStringSimpleStringPools;
public CoreMessageObjectPools(int addressPoolCapacity,
int groupIdCapacity,
int propertyKeyCapacity,
int propertyValueCapacity) {
addressDecoderPool = Suppliers.memoize(() -> new SimpleString.ByteBufSimpleStringPool(addressPoolCapacity));
propertiesDecoderPools = Suppliers.memoize(() -> new TypedProperties.TypedPropertiesDecoderPools(propertyKeyCapacity, propertyValueCapacity));
groupIdStringSimpleStringPool = Suppliers.memoize(() -> new SimpleString.StringSimpleStringPool(groupIdCapacity));
addressStringSimpleStringPool = Suppliers.memoize(() -> new SimpleString.StringSimpleStringPool(addressPoolCapacity));
propertiesStringSimpleStringPools = Suppliers.memoize(() -> new TypedProperties.TypedPropertiesStringSimpleStringPools(propertyKeyCapacity, propertyValueCapacity));
}
public CoreMessageObjectPools() { public CoreMessageObjectPools() {
addressDecoderPool = Suppliers.memoize(SimpleString.ByteBufSimpleStringPool::new);
propertiesDecoderPools = Suppliers.memoize(TypedProperties.TypedPropertiesDecoderPools::new);
groupIdStringSimpleStringPool = Suppliers.memoize(SimpleString.StringSimpleStringPool::new);
addressStringSimpleStringPool = Suppliers.memoize(SimpleString.StringSimpleStringPool::new);
propertiesStringSimpleStringPools = Suppliers.memoize(TypedProperties.TypedPropertiesStringSimpleStringPools::new);
} }
public SimpleString.ByteBufSimpleStringPool getAddressDecoderPool() { public SimpleString.ByteBufSimpleStringPool getAddressDecoderPool() {

View File

@ -23,7 +23,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.utils.DataConstants; import org.apache.activemq.artemis.utils.DataConstants;
public class CoreMessagePersister implements Persister<Message> { public class CoreMessagePersister implements Persister<Message, CoreMessageObjectPools> {
public static final byte ID = 1; public static final byte ID = 1;
private static CoreMessagePersister theInstance; private static CoreMessagePersister theInstance;
@ -68,14 +68,18 @@ public class CoreMessagePersister implements Persister<Message> {
record.persist(buffer); record.persist(buffer);
} }
@Override @Override
public Message decode(ActiveMQBuffer buffer, Message record) { public Message decode(ActiveMQBuffer buffer, CoreMessageObjectPools pool) {
// the caller must consume the first byte already, as that will be used to decide what persister (protocol) to use // the caller must consume the first byte already, as that will be used to decide what persister (protocol) to use
long id = buffer.readLong(); long id = buffer.readLong();
SimpleString address = buffer.readNullableSimpleString(); final SimpleString address;
record = new CoreMessage(); if (pool == null) {
record.reloadPersistence(buffer); address = buffer.readNullableSimpleString();
} else {
address = SimpleString.readNullableSimpleString(buffer.byteBuf(), pool.getAddressDecoderPool());
}
CoreMessage record = new CoreMessage();
record.reloadPersistence(buffer, pool);
record.setMessageID(id); record.setMessageID(id);
record.setAddress(address); record.setAddress(address);
return record; return record;

View File

@ -248,7 +248,7 @@ public class MessageInternalImpl implements MessageInternal {
} }
@Override @Override
public Persister<Message> getPersister() { public Persister<Message, CoreMessageObjectPools> getPersister() {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@ -340,7 +340,7 @@ public class MessageInternalImpl implements MessageInternal {
} }
@Override @Override
public void reloadPersistence(ActiveMQBuffer record) { public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }

View File

@ -22,7 +22,7 @@ import org.apache.activemq.artemis.core.persistence.Persister;
/** This is a facade between the new Persister and the former EncodingSupport. /** This is a facade between the new Persister and the former EncodingSupport.
* Methods using the old interface will use this as a facade to provide the previous semantic. */ * Methods using the old interface will use this as a facade to provide the previous semantic. */
public class EncoderPersister implements Persister<EncodingSupport> { public class EncoderPersister implements Persister<EncodingSupport, EncodingSupport> {
private static final EncoderPersister theInstance = new EncoderPersister(); private static final EncoderPersister theInstance = new EncoderPersister();

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.protocol.amqp.broker;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
@ -43,6 +44,7 @@ import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.DataConstants; import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.algo.KMPNeedle;
import org.apache.activemq.artemis.utils.collections.TypedProperties; import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.Symbol;
@ -74,12 +76,49 @@ import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
// see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format /**
* See <a href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format">AMQP v1.0 message format</a>
* <pre>
*
* Bare Message
* |
* .---------------------+--------------------.
* | |
* +--------+-------------+-------------+------------+--------------+--------------+--------+
* | header | delivery- | message- | properties | application- | application- | footer |
* | | annotations | annotations | | properties | data | |
* +--------+-------------+-------------+------------+--------------+--------------+--------+
* | |
* '-------------------------------------------+--------------------------------------------'
* |
* Annotated Message
* </pre>
* <ul>
* <li>Zero or one header sections.
* <li>Zero or one delivery-annotation sections.
* <li>Zero or one message-annotation sections.
* <li>Zero or one properties sections.
* <li>Zero or one application-properties sections.
* <li>The body consists of one of the following three choices:
* <ul>
* <li>one or more data sections
* <li>one or more amqp-sequence sections
* <li>or a single amqp-value section.
* </ul>
* <li>Zero or one footer sections.
* </ul>
*/
public class AMQPMessage extends RefCountMessage { public class AMQPMessage extends RefCountMessage {
private static final Logger logger = Logger.getLogger(AMQPMessage.class); private static final Logger logger = Logger.getLogger(AMQPMessage.class);
public static final SimpleString ADDRESS_PROPERTY = SimpleString.toSimpleString("_AMQ_AD"); public static final SimpleString ADDRESS_PROPERTY = SimpleString.toSimpleString("_AMQ_AD");
// used to perform quick search
private static final Symbol[] SCHEDULED_DELIVERY_SYMBOLS = new Symbol[]{
AMQPMessageSupport.SCHEDULED_DELIVERY_TIME, AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY};
private static final KMPNeedle[] SCHEDULED_DELIVERY_NEEDLES = new KMPNeedle[]{
AMQPMessageSymbolSearch.kmpNeedleOf(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME),
AMQPMessageSymbolSearch.kmpNeedleOf(AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY)};
public static final int DEFAULT_MESSAGE_FORMAT = 0; public static final int DEFAULT_MESSAGE_FORMAT = 0;
public static final int DEFAULT_MESSAGE_PRIORITY = 4; public static final int DEFAULT_MESSAGE_PRIORITY = 4;
@ -89,7 +128,10 @@ public class AMQPMessage extends RefCountMessage {
// Buffer and state for the data backing this message. // Buffer and state for the data backing this message.
private ReadableBuffer data; private ReadableBuffer data;
private boolean messageDataScanned; private static final byte NOT_SCANNED = 0;
private static final byte RELOAD_PERSISTENCE = 1;
private static final byte SCANNED = 2;
private byte messageDataScanned;
// Marks the message as needed to be re-encoded to update the backing buffer // Marks the message as needed to be re-encoded to update the backing buffer
private boolean modified; private boolean modified;
@ -450,16 +492,25 @@ public class AMQPMessage extends RefCountMessage {
// re-encode should be done to update the backing data with the in memory elements. // re-encode should be done to update the backing data with the in memory elements.
private synchronized void ensureMessageDataScanned() { private synchronized void ensureMessageDataScanned() {
if (!messageDataScanned) { final byte state = messageDataScanned;
switch (state) {
case NOT_SCANNED:
scanMessageData(); scanMessageData();
break;
case RELOAD_PERSISTENCE:
lazyScanAfterReloadPersistence();
break;
case SCANNED:
// NO-OP
break;
default:
throw new IllegalStateException("invalid messageDataScanned state: expected within " +
Arrays.toString(new byte[]{NOT_SCANNED, SCANNED, RELOAD_PERSISTENCE}) +
" but " + messageDataScanned);
} }
} }
private synchronized void scanMessageData() { private synchronized void resetMessageData() {
this.messageDataScanned = true;
DecoderImpl decoder = TLSEncode.getDecoder();
decoder.setBuffer(data.rewind());
header = null; header = null;
messageAnnotations = null; messageAnnotations = null;
properties = null; properties = null;
@ -474,6 +525,14 @@ public class AMQPMessage extends RefCountMessage {
propertiesPosition = VALUE_NOT_PRESENT; propertiesPosition = VALUE_NOT_PRESENT;
applicationPropertiesPosition = VALUE_NOT_PRESENT; applicationPropertiesPosition = VALUE_NOT_PRESENT;
remainingBodyPosition = VALUE_NOT_PRESENT; remainingBodyPosition = VALUE_NOT_PRESENT;
}
private synchronized void scanMessageData() {
this.messageDataScanned = SCANNED;
DecoderImpl decoder = TLSEncode.getDecoder();
decoder.setBuffer(data.rewind());
resetMessageData();
try { try {
while (data.hasRemaining()) { while (data.hasRemaining()) {
@ -741,18 +800,22 @@ public class AMQPMessage extends RefCountMessage {
} }
@Override @Override
public void reloadPersistence(ActiveMQBuffer record) { public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools) {
int size = record.readInt(); int size = record.readInt();
byte[] recordArray = new byte[size]; byte[] recordArray = new byte[size];
record.readBytes(recordArray); record.readBytes(recordArray);
data = ReadableBuffer.ByteBufferReader.wrap(ByteBuffer.wrap(recordArray)); data = ReadableBuffer.ByteBufferReader.wrap(ByteBuffer.wrap(recordArray));
// Message state is now that the underlying buffer is loaded but the contents // Message state is now that the underlying buffer is loaded, but the contents not yet scanned
// not yet scanned, once done the message is fully populated and ready for dispatch. resetMessageData();
// Force a scan now and tidy the state variables to reflect where we are following modified = false;
// this reload from the store. messageDataScanned = RELOAD_PERSISTENCE;
}
private synchronized void lazyScanAfterReloadPersistence() {
assert messageDataScanned == RELOAD_PERSISTENCE;
scanMessageData(); scanMessageData();
messageDataScanned = true; messageDataScanned = SCANNED;
modified = false; modified = false;
// Message state should reflect that is came from persistent storage which // Message state should reflect that is came from persistent storage which
@ -771,7 +834,7 @@ public class AMQPMessage extends RefCountMessage {
} }
@Override @Override
public Persister<org.apache.activemq.artemis.api.core.Message> getPersister() { public Persister<org.apache.activemq.artemis.api.core.Message, CoreMessageObjectPools> getPersister() {
return AMQPMessagePersisterV2.getInstance(); return AMQPMessagePersisterV2.getInstance();
} }
@ -798,7 +861,7 @@ public class AMQPMessage extends RefCountMessage {
private synchronized void encodeMessage() { private synchronized void encodeMessage() {
this.modified = false; this.modified = false;
this.messageDataScanned = false; this.messageDataScanned = NOT_SCANNED;
int estimated = Math.max(1500, data != null ? data.capacity() + 1000 : 0); int estimated = Math.max(1500, data != null ? data.capacity() + 1000 : 0);
ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(estimated); ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(estimated);
EncoderImpl encoder = TLSEncode.getEncoder(); EncoderImpl encoder = TLSEncode.getEncoder();
@ -1115,6 +1178,7 @@ public class AMQPMessage extends RefCountMessage {
@Override @Override
public RoutingType getRoutingType() { public RoutingType getRoutingType() {
ensureMessageDataScanned();
Object routingType = getMessageAnnotation(AMQPMessageSupport.ROUTING_TYPE); Object routingType = getMessageAnnotation(AMQPMessageSupport.ROUTING_TYPE);
if (routingType != null) { if (routingType != null) {
@ -1183,8 +1247,39 @@ public class AMQPMessage extends RefCountMessage {
return this; return this;
} }
@Override
public boolean hasScheduledDeliveryTime() {
if (scheduledTime >= 0) {
return true;
}
return anyMessageAnnotations(SCHEDULED_DELIVERY_SYMBOLS, SCHEDULED_DELIVERY_NEEDLES);
}
private boolean anyMessageAnnotations(Symbol[] symbols, KMPNeedle[] symbolNeedles) {
assert symbols.length == symbolNeedles.length;
final int count = symbols.length;
if (messageDataScanned == SCANNED) {
final MessageAnnotations messageAnnotations = this.messageAnnotations;
if (messageAnnotations == null) {
return false;
}
Map<Symbol, Object> map = messageAnnotations.getValue();
if (map == null) {
return false;
}
for (int i = 0; i < count; i++) {
if (map.containsKey(symbols[i])) {
return true;
}
}
return false;
}
return AMQPMessageSymbolSearch.anyMessageAnnotations(data, symbolNeedles);
}
@Override @Override
public Long getScheduledDeliveryTime() { public Long getScheduledDeliveryTime() {
ensureMessageDataScanned();
if (scheduledTime < 0) { if (scheduledTime < 0) {
Object objscheduledTime = getMessageAnnotation(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME); Object objscheduledTime = getMessageAnnotation(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME);
Object objdelay = getMessageAnnotation(AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY); Object objdelay = getMessageAnnotation(AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY);

View File

@ -20,6 +20,7 @@ package org.apache.activemq.artemis.protocol.amqp.broker;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.spi.core.protocol.MessagePersister; import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
import org.apache.activemq.artemis.utils.DataConstants; import org.apache.activemq.artemis.utils.DataConstants;
@ -62,12 +63,17 @@ public class AMQPMessagePersister extends MessagePersister {
} }
@Override @Override
public Message decode(ActiveMQBuffer buffer, Message record) { public Message decode(ActiveMQBuffer buffer, CoreMessageObjectPools pool) {
long id = buffer.readLong(); long id = buffer.readLong();
long format = buffer.readLong(); long format = buffer.readLong();
SimpleString address = buffer.readNullableSimpleString(); final SimpleString address;
record = new AMQPMessage(format); if (pool == null) {
record.reloadPersistence(buffer); address = buffer.readNullableSimpleString();
} else {
address = SimpleString.readNullableSimpleString(buffer.byteBuf(), pool.getAddressDecoderPool());
}
AMQPMessage record = new AMQPMessage(format);
record.reloadPersistence(buffer, pool);
record.setMessageID(id); record.setMessageID(id);
if (address != null) { if (address != null) {
record.setAddress(address); record.setAddress(address);

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.protocol.amqp.broker;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.utils.DataConstants; import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.collections.TypedProperties; import org.apache.activemq.artemis.utils.collections.TypedProperties;
@ -68,16 +69,24 @@ public class AMQPMessagePersisterV2 extends AMQPMessagePersister {
} }
} }
@Override @Override
public Message decode(ActiveMQBuffer buffer, Message record) { public Message decode(ActiveMQBuffer buffer, CoreMessageObjectPools pool) {
AMQPMessage message = (AMQPMessage)super.decode(buffer, record); AMQPMessage message = (AMQPMessage) super.decode(buffer, pool);
int size = buffer.readInt(); int size = buffer.readInt();
if (size != 0) { if (size != 0) {
TypedProperties properties = new TypedProperties(Message.INTERNAL_PROPERTY_NAMES_PREDICATE); // message::setAddress could have populated extra properties
properties.decode(buffer.byteBuf()); // hence, we can safely replace the value on the properties
message.setExtraProperties(properties); // if it has been encoded differently in the rest of the buffer
TypedProperties existingExtraProperties = message.getExtraProperties();
TypedProperties extraProperties = existingExtraProperties;
if (existingExtraProperties == null) {
extraProperties = new TypedProperties(Message.INTERNAL_PROPERTY_NAMES_PREDICATE);
}
extraProperties.decode(buffer.byteBuf(), pool != null ? pool.getPropertiesDecoderPools() : null, existingExtraProperties == null);
if (extraProperties != existingExtraProperties) {
message.setExtraProperties(extraProperties);
}
} }
return message; return message;
} }

View File

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.broker;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.IdentityHashMap;
import java.util.List;
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
import org.apache.activemq.artemis.utils.algo.KMPNeedle;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.Footer;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.codec.DecoderImpl;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.codec.TypeConstructor;
final class AMQPMessageSymbolSearch {
// used to quick search for MessageAnnotations
private static final IdentityHashMap<Class<?>, Boolean> MSG_BODY_TYPES;
static {
// we're including MessageAnnotations here because it will still cause termination
final List<Class<?>> classList = Arrays.asList(MessageAnnotations.class, Properties.class,
ApplicationProperties.class, Data.class,
AmqpSequence.class, AmqpValue.class, Footer.class);
MSG_BODY_TYPES = new IdentityHashMap<>(classList.size());
classList.forEach(clazz -> MSG_BODY_TYPES.put(clazz, Boolean.TRUE));
}
public static KMPNeedle kmpNeedleOf(Symbol symbol) {
return KMPNeedle.of(symbol.toString().getBytes(StandardCharsets.US_ASCII));
}
public static boolean anyMessageAnnotations(ReadableBuffer data, KMPNeedle[] needles) {
DecoderImpl decoder = TLSEncode.getDecoder();
final int position = data.position();
decoder.setBuffer(data.rewind());
try {
while (data.hasRemaining()) {
TypeConstructor<?> constructor = decoder.readConstructor();
final Class<?> typeClass = constructor.getTypeClass();
if (MSG_BODY_TYPES.containsKey(typeClass)) {
if (MessageAnnotations.class.equals(typeClass)) {
final int start = data.position();
constructor.skipValue();
final int end = data.position();
for (int i = 0, count = needles.length; i < count; i++) {
final int foundIndex = needles[i].searchInto(ReadableBuffer::get, data, end, start);
if (foundIndex != -1) {
return true;
}
}
}
return false;
}
constructor.skipValue();
}
return false;
} finally {
decoder.setBuffer(null);
data.position(position);
}
}
}

View File

@ -21,6 +21,7 @@ import java.util.Map;
import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory;
@ -39,7 +40,7 @@ public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory
private static String[] SUPPORTED_PROTOCOLS = {AMQP_PROTOCOL_NAME}; private static String[] SUPPORTED_PROTOCOLS = {AMQP_PROTOCOL_NAME};
@Override @Override
public Persister<Message>[] getPersister() { public Persister<Message, CoreMessageObjectPools>[] getPersister() {
Persister[] persisters = new Persister[]{AMQPMessagePersister.getInstance(), AMQPMessagePersisterV2.getInstance()}; Persister[] persisters = new Persister[]{AMQPMessagePersister.getInstance(), AMQPMessagePersisterV2.getInstance()};
return persisters; return persisters;

View File

@ -31,6 +31,7 @@ import java.nio.charset.StandardCharsets;
import java.util.Arrays; import java.util.Arrays;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
@ -147,7 +148,7 @@ public class AMQPMessageTest {
final long persistedSize = (long) encoded.readableBytes(); final long persistedSize = (long) encoded.readableBytes();
// Now reload from encoded data // Now reload from encoded data
message.reloadPersistence(encoded); message.reloadPersistence(encoded, null);
assertEquals(persistedSize, message.getPersistSize()); assertEquals(persistedSize, message.getPersistSize());
assertEquals(persistedSize - Integer.BYTES, message.getPersistentSize()); assertEquals(persistedSize - Integer.BYTES, message.getPersistentSize());
@ -156,6 +157,72 @@ public class AMQPMessageTest {
assertEquals(TEST_TO_ADDRESS, message.getAddress()); assertEquals(TEST_TO_ADDRESS, message.getAddress());
} }
@Test
public void testHasScheduledDeliveryTimeReloadPersistence() {
final long scheduledTime = System.currentTimeMillis();
MessageImpl protonMessage = createProtonMessage();
MessageAnnotations annotations = protonMessage.getMessageAnnotations();
annotations.getValue().put(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME, scheduledTime);
ActiveMQBuffer encoded = encodeMessageAsPersistedBuffer(protonMessage);
AMQPMessage message = new AMQPMessage(0);
try {
message.getProtonMessage();
fail("Should throw NPE due to not being initialized yet");
} catch (NullPointerException npe) {
}
// Now reload from encoded data
message.reloadPersistence(encoded, null);
assertTrue(message.hasScheduledDeliveryTime());
message.getHeader();
assertTrue(message.hasScheduledDeliveryTime());
}
@Test
public void testHasScheduledDeliveryDelayReloadPersistence() {
final long scheduledDelay = 100000;
MessageImpl protonMessage = createProtonMessage();
MessageAnnotations annotations = protonMessage.getMessageAnnotations();
annotations.getValue().put(AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY, scheduledDelay);
ActiveMQBuffer encoded = encodeMessageAsPersistedBuffer(protonMessage);
AMQPMessage message = new AMQPMessage(0);
try {
message.getProtonMessage();
fail("Should throw NPE due to not being initialized yet");
} catch (NullPointerException npe) {
}
// Now reload from encoded data
message.reloadPersistence(encoded, null);
assertTrue(message.hasScheduledDeliveryTime());
message.getHeader();
assertTrue(message.hasScheduledDeliveryTime());
}
@Test
public void testNoScheduledDeliveryTimeOrDelayReloadPersistence() {
MessageImpl protonMessage = createProtonMessage();
ActiveMQBuffer encoded = encodeMessageAsPersistedBuffer(protonMessage);
AMQPMessage message = new AMQPMessage(0);
try {
message.getProtonMessage();
fail("Should throw NPE due to not being initialized yet");
} catch (NullPointerException npe) {
}
// Now reload from encoded data
message.reloadPersistence(encoded, null);
assertFalse(message.hasScheduledDeliveryTime());
message.getHeader();
assertFalse(message.hasScheduledDeliveryTime());
}
//----- Test Memory Estimate access ---------------------------------------// //----- Test Memory Estimate access ---------------------------------------//
@Test @Test
@ -2010,10 +2077,10 @@ public class AMQPMessageTest {
properties.setTo(TEST_TO_ADDRESS); properties.setTo(TEST_TO_ADDRESS);
properties.setMessageId(UUID.randomUUID()); properties.setMessageId(UUID.randomUUID());
MessageAnnotations annotations = new MessageAnnotations(new HashMap<>()); MessageAnnotations annotations = new MessageAnnotations(new LinkedHashMap<>());
annotations.getValue().put(Symbol.valueOf(TEST_MESSAGE_ANNOTATION_KEY), TEST_MESSAGE_ANNOTATION_VALUE); annotations.getValue().put(Symbol.valueOf(TEST_MESSAGE_ANNOTATION_KEY), TEST_MESSAGE_ANNOTATION_VALUE);
ApplicationProperties applicationProperties = new ApplicationProperties(new HashMap<>()); ApplicationProperties applicationProperties = new ApplicationProperties(new LinkedHashMap<>());
applicationProperties.getValue().put(TEST_APPLICATION_PROPERTY_KEY, TEST_APPLICATION_PROPERTY_VALUE); applicationProperties.getValue().put(TEST_APPLICATION_PROPERTY_KEY, TEST_APPLICATION_PROPERTY_VALUE);
AmqpValue body = new AmqpValue(TEST_STRING_BODY); AmqpValue body = new AmqpValue(TEST_STRING_BODY);

View File

@ -140,7 +140,7 @@ public class OpenwireMessage implements Message {
} }
@Override @Override
public Persister<Message> getPersister() { public Persister<Message, CoreMessageObjectPools> getPersister() {
return null; return null;
} }
@ -205,7 +205,7 @@ public class OpenwireMessage implements Message {
} }
@Override @Override
public void reloadPersistence(ActiveMQBuffer record) { public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools) {
} }

View File

@ -16,6 +16,8 @@
*/ */
package org.apache.activemq.artemis.core.persistence.impl.journal; package org.apache.activemq.artemis.core.persistence.impl.journal;
import static org.apache.activemq.artemis.api.core.SimpleString.ByteBufSimpleStringPool.DEFAULT_MAX_LENGTH;
import static org.apache.activemq.artemis.api.core.SimpleString.ByteBufSimpleStringPool.DEFAULT_POOL_CAPACITY;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_CURSOR; import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_CURSOR;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE_PENDING; import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE_PENDING;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.DUPLICATE_ID; import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.DUPLICATE_ID;
@ -56,6 +58,7 @@ import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation; import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.paging.PageTransactionInfo; import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.PagingStore;
@ -858,6 +861,19 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
} }
final MutableLong recordNumber = new MutableLong(); final MutableLong recordNumber = new MutableLong();
final CoreMessageObjectPools pools;
if (totalSize > 0) {
final int addresses = (int)Math.max(
DEFAULT_POOL_CAPACITY,
queueInfos == null ? 0 :
queueInfos.values().stream()
.map(QueueBindingInfo::getAddress)
.filter(addr -> addr.length() <= DEFAULT_MAX_LENGTH)
.count() * 2);
pools = new CoreMessageObjectPools(addresses, DEFAULT_POOL_CAPACITY, 128, 128);
} else {
pools = null;
}
// This will free up memory sooner while reading the records // This will free up memory sooner while reading the records
records.clear(record -> { records.clear(record -> {
try { try {
@ -904,7 +920,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
case JournalRecordIds.ADD_MESSAGE_PROTOCOL: { case JournalRecordIds.ADD_MESSAGE_PROTOCOL: {
Message message = MessagePersister.getInstance().decode(buff, null); Message message = MessagePersister.getInstance().decode(buff, pools);
messages.put(record.id, message); messages.put(record.id, message);
@ -1716,6 +1732,8 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
final Set<Pair<Long, Long>> pendingLargeMessages, final Set<Pair<Long, Long>> pendingLargeMessages,
JournalLoader journalLoader) throws Exception { JournalLoader journalLoader) throws Exception {
// recover prepared transactions // recover prepared transactions
CoreMessageObjectPools pools = null;
for (PreparedTransactionInfo preparedTransaction : preparedTransactions) { for (PreparedTransactionInfo preparedTransaction : preparedTransactions) {
XidEncoding encodingXid = new XidEncoding(preparedTransaction.getExtraData()); XidEncoding encodingXid = new XidEncoding(preparedTransaction.getExtraData());
@ -1749,7 +1767,10 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
break; break;
} }
case JournalRecordIds.ADD_MESSAGE_PROTOCOL: { case JournalRecordIds.ADD_MESSAGE_PROTOCOL: {
Message message = MessagePersister.getInstance().decode(buff, null); if (pools == null) {
pools = new CoreMessageObjectPools();
}
Message message = MessagePersister.getInstance().decode(buff, pools);
messages.put(record.id, message); messages.put(record.id, message);

View File

@ -22,7 +22,7 @@ import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.LargeServerMessage;
public class LargeMessagePersister implements Persister<LargeServerMessage> { public class LargeMessagePersister implements Persister<LargeServerMessage, LargeServerMessage> {
private static final LargeMessagePersister theInstance = new LargeMessagePersister(); private static final LargeMessagePersister theInstance = new LargeMessagePersister();

View File

@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister; import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister;
import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
@ -37,7 +38,7 @@ public class CoreProtocolManagerFactory extends AbstractProtocolManagerFactory<I
private static final String MODULE_NAME = "artemis-server"; private static final String MODULE_NAME = "artemis-server";
@Override @Override
public Persister<Message>[] getPersister() { public Persister<Message, CoreMessageObjectPools>[] getPersister() {
return new Persister[]{CoreMessagePersister.getInstance()}; return new Persister[]{CoreMessagePersister.getInstance()};
} }

View File

@ -21,11 +21,12 @@ import java.util.ServiceLoader;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister; import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister;
import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.persistence.Persister;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
public class MessagePersister implements Persister<Message> { public class MessagePersister implements Persister<Message, CoreMessageObjectPools> {
private static final Logger logger = Logger.getLogger(MessagePersister.class); private static final Logger logger = Logger.getLogger(MessagePersister.class);
@ -33,7 +34,7 @@ public class MessagePersister implements Persister<Message> {
/** This will be used for reading messages */ /** This will be used for reading messages */
private static final int MAX_PERSISTERS = 3; private static final int MAX_PERSISTERS = 3;
private static final Persister<Message>[] persisters = new Persister[MAX_PERSISTERS]; private static final Persister<Message, CoreMessageObjectPools>[] persisters = new Persister[MAX_PERSISTERS];
static { static {
CoreMessagePersister persister = CoreMessagePersister.getInstance(); CoreMessagePersister persister = CoreMessagePersister.getInstance();
@ -46,7 +47,7 @@ public class MessagePersister implements Persister<Message> {
} }
public static void registerProtocol(ProtocolManagerFactory manager) { public static void registerProtocol(ProtocolManagerFactory manager) {
Persister<Message>[] messagePersisters = manager.getPersister(); Persister<Message, CoreMessageObjectPools>[] messagePersisters = manager.getPersister();
if (messagePersisters == null || messagePersisters.length == 0) { if (messagePersisters == null || messagePersisters.length == 0) {
logger.debug("Cannot find persister for " + manager); logger.debug("Cannot find persister for " + manager);
} else { } else {
@ -69,7 +70,7 @@ public class MessagePersister implements Persister<Message> {
return persisters[id - 1]; return persisters[id - 1];
} }
public static void registerPersister(Persister<Message> persister) { public static void registerPersister(Persister<Message, CoreMessageObjectPools> persister) {
if (persister != null) { if (persister != null) {
assert persister.getID() <= MAX_PERSISTERS : "You must update MessagePersister::MAX_PERSISTERS to a higher number"; assert persister.getID() <= MAX_PERSISTERS : "You must update MessagePersister::MAX_PERSISTERS to a higher number";
persisters[persister.getID() - 1] = persister; persisters[persister.getID() - 1] = persister;
@ -97,12 +98,12 @@ public class MessagePersister implements Persister<Message> {
} }
@Override @Override
public Message decode(ActiveMQBuffer buffer, Message record) { public Message decode(ActiveMQBuffer buffer, CoreMessageObjectPools pools) {
byte protocol = buffer.readByte(); byte protocol = buffer.readByte();
Persister<Message> persister = getPersister(protocol); Persister<Message, CoreMessageObjectPools> persister = getPersister(protocol);
if (persister == null) { if (persister == null) {
throw new NullPointerException("couldn't find factory for type=" + protocol); throw new NullPointerException("couldn't find factory for type=" + protocol);
} }
return persister.decode(buffer, record); return persister.decode(buffer, pools);
} }
} }

View File

@ -21,12 +21,13 @@ import java.util.Map;
import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
public interface ProtocolManagerFactory<P extends BaseInterceptor> { public interface ProtocolManagerFactory<P extends BaseInterceptor> {
default Persister<Message>[] getPersister() { default Persister<Message, CoreMessageObjectPools>[] getPersister() {
return new Persister[]{}; return new Persister[]{};
} }

View File

@ -323,12 +323,12 @@ public class ScheduledDeliveryHandlerTest extends Assert {
} }
@Override @Override
public void reloadPersistence(ActiveMQBuffer record) { public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools) {
} }
@Override @Override
public Persister<Message> getPersister() { public Persister<Message, CoreMessageObjectPools> getPersister() {
return null; return null;
} }

View File

@ -371,12 +371,12 @@ public class AcknowledgeTest extends ActiveMQTestBase {
} }
@Override @Override
public Persister<Message> getPersister() { public Persister<Message, CoreMessageObjectPools> getPersister() {
return null; return null;
} }
@Override @Override
public void reloadPersistence(ActiveMQBuffer record) { public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools) {
} }

View File

@ -39,6 +39,7 @@ import org.apache.activemq.artemis.core.journal.LoaderCallback;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl; import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister; import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister;
import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds; import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
@ -294,7 +295,7 @@ public class SharedNothingReplicationTest extends ActiveMQTestBase {
return conf; return conf;
} }
static class SlowMessagePersister extends CoreMessagePersister implements Persister<Message> { static class SlowMessagePersister extends CoreMessagePersister implements Persister<Message, CoreMessageObjectPools> {
boolean used = false; boolean used = false;
@ -343,8 +344,8 @@ public class SharedNothingReplicationTest extends ActiveMQTestBase {
} }
@Override @Override
public Message decode(ActiveMQBuffer buffer, Message record) { public Message decode(ActiveMQBuffer buffer, CoreMessageObjectPools pool) {
return persister.decode(buffer, record); return persister.decode(buffer, pool);
} }
} }