ARTEMIS-2617 use core pools to reduce GC on journal loading

This commit is contained in:
Francesco Nigro 2020-02-07 13:47:02 +01:00 committed by Clebert Suconic
parent 0b8c33bb0f
commit 5897909dc9
26 changed files with 167 additions and 82 deletions

View File

@ -33,7 +33,7 @@ ARTEMIS_INSTANCE_ETC_URI='${artemis.instance.etc.uri}'
# Java Opts
if [ -z "$JAVA_ARGS" ]; then
JAVA_ARGS="${java-opts} -XX:+PrintClassHistogram -XX:+UseG1GC -Xms512M -Xmx2G -Dhawtio.realm=activemq -Dhawtio.offline=true -Dhawtio.role=${role} -Dhawtio.rolePrincipalClasses=org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal -Djolokia.policyLocation=${ARTEMIS_INSTANCE_ETC_URI}jolokia-access.xml"
JAVA_ARGS="${java-opts} -XX:+PrintClassHistogram -XX:+UseG1GC -XX:+UseStringDeduplication -Xms512M -Xmx2G -Dhawtio.realm=activemq -Dhawtio.offline=true -Dhawtio.role=${role} -Dhawtio.rolePrincipalClasses=org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal -Djolokia.policyLocation=${ARTEMIS_INSTANCE_ETC_URI}jolokia-access.xml"
fi
#

View File

@ -33,7 +33,7 @@ rem Cluster Properties: Used to pass arguments to ActiveMQ Artemis which can be
rem set ARTEMIS_CLUSTER_PROPS=-Dactivemq.remoting.default.port=61617 -Dactivemq.remoting.amqp.port=5673 -Dactivemq.remoting.stomp.port=61614 -Dactivemq.remoting.hornetq.port=5446
rem Java Opts
IF "%JAVA_ARGS%"=="" (set JAVA_ARGS=${java-opts} -XX:+PrintClassHistogram -XX:+UseG1GC -Xms512M -Xmx1024M -Xbootclasspath/a:%ARTEMIS_HOME%\lib\${logmanager};%ARTEMIS_HOME%\lib\${wildfly-common} -Djava.security.auth.login.config=%ARTEMIS_ETC_DIR%\login.config -Dhawtio.offline=true -Dhawtio.realm=activemq -Dhawtio.role=${role} -Dhawtio.rolePrincipalClasses=org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal -Djolokia.policyLocation=%ARTEMIS_INSTANCE_ETC_URI%\jolokia-access.xml -Dartemis.instance=%ARTEMIS_INSTANCE%)
IF "%JAVA_ARGS%"=="" (set JAVA_ARGS=${java-opts} -XX:+PrintClassHistogram -XX:+UseG1GC -XX:+UseStringDeduplication -Xms512M -Xmx1024M -Xbootclasspath/a:%ARTEMIS_HOME%\lib\${logmanager};%ARTEMIS_HOME%\lib\${wildfly-common} -Djava.security.auth.login.config=%ARTEMIS_ETC_DIR%\login.config -Dhawtio.offline=true -Dhawtio.realm=activemq -Dhawtio.role=${role} -Dhawtio.rolePrincipalClasses=org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal -Djolokia.policyLocation=%ARTEMIS_INSTANCE_ETC_URI%\jolokia-access.xml -Dartemis.instance=%ARTEMIS_INSTANCE%)
rem Logs Safepoints JVM pauses: Uncomment to enable them
rem In addition to the traditional GC logs you could enable some JVM flags to know any meaningful and "hidden" pause that could

View File

@ -560,12 +560,16 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
public static final class ByteBufSimpleStringPool extends AbstractByteBufPool<SimpleString> {
private static final int UUID_LENGTH = 36;
public static final int DEFAULT_MAX_LENGTH = 36;
private final int maxLength;
public ByteBufSimpleStringPool() {
this.maxLength = UUID_LENGTH;
this.maxLength = DEFAULT_MAX_LENGTH;
}
public ByteBufSimpleStringPool(final int capacity) {
this(capacity, DEFAULT_MAX_LENGTH);
}
public ByteBufSimpleStringPool(final int capacity, final int maxCharsLength) {

View File

@ -19,20 +19,22 @@ package org.apache.activemq.artemis.core.persistence;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
public interface Persister<T extends Object> {
public interface Persister<T extends Object, A> {
/** This is to be used to store the protocol-id on Messages.
* Messages are stored on their bare format.
* The protocol manager will be responsible to code or decode messages.
* The caveat here is that the first short-sized bytes need to be this constant. */
/**
* This is to be used to store the protocol-id on Messages.
* Messages are stored on their bare format.
* The protocol manager will be responsible to code or decode messages.
* The caveat here is that the first short-sized bytes need to be this constant.
*/
default byte getID() {
return (byte)0;
return (byte) 0;
}
int getEncodeSize(T record);
void encode(ActiveMQBuffer buffer, T record);
T decode(ActiveMQBuffer buffer, T record);
T decode(ActiveMQBuffer buffer, A arg);
}

View File

@ -98,6 +98,7 @@ public final class UUID {
* @param data 16 byte UUID contents
*/
public UUID(final int type, final byte[] data) {
assert data.length == 16;
mId = data;
// Type is multiplexed with time_hi:
mId[UUID.INDEX_TYPE] &= (byte) 0x0F;
@ -108,6 +109,7 @@ public final class UUID {
}
private UUID(final byte[] data) {
assert data.length == 16;
mId = data;
}

View File

@ -446,17 +446,21 @@ public class TypedProperties {
}
public synchronized void decode(final ByteBuf buffer,
final TypedPropertiesDecoderPools keyValuePools) {
final TypedPropertiesDecoderPools keyValuePools,
boolean replaceExisting) {
byte b = buffer.readByte();
if (b == DataConstants.NULL) {
properties = null;
size = 0;
if (replaceExisting) {
properties = null;
size = 0;
}
} else {
int numHeaders = buffer.readInt();
//optimize the case of no collisions to avoid any resize (it doubles the map size!!!) when load factor is reached
properties = new HashMap<>(numHeaders, 1.0f);
size = 0;
if (replaceExisting || properties == null) {
//optimize the case of no collisions to avoid any resize (it doubles the map size!!!) when load factor is reached
properties = new HashMap<>(numHeaders, 1.0f);
}
size = properties.size();
for (int i = 0; i < numHeaders; i++) {
final SimpleString key = SimpleString.readSimpleString(buffer, keyValuePools == null ? null : keyValuePools.getPropertyKeysPool());
@ -529,6 +533,10 @@ public class TypedProperties {
}
}
public synchronized void decode(final ByteBuf buffer, final TypedPropertiesDecoderPools keyValuePools) {
decode(buffer, keyValuePools, true);
}
public void decode(final ByteBuf buffer) {
decode(buffer, null);
}
@ -1029,12 +1037,16 @@ public class TypedProperties {
public static final class ByteBufStringValuePool extends AbstractByteBufPool<StringValue> {
private static final int UUID_LENGTH = 36;
public static final int DEFAULT_MAX_LENGTH = 36;
private final int maxLength;
public ByteBufStringValuePool() {
this.maxLength = UUID_LENGTH;
this.maxLength = DEFAULT_MAX_LENGTH;
}
public ByteBufStringValuePool(final int capacity) {
this(capacity, DEFAULT_MAX_LENGTH);
}
public ByteBufStringValuePool(final int capacity, final int maxCharsLength) {
@ -1074,9 +1086,9 @@ public class TypedProperties {
this.propertyValuesPool = new TypedProperties.StringValue.ByteBufStringValuePool();
}
public TypedPropertiesDecoderPools(int keyPoolCapacity, int valuePoolCapacity, int maxCharsLength) {
this.propertyKeysPool = new SimpleString.ByteBufSimpleStringPool(keyPoolCapacity, maxCharsLength);
this.propertyValuesPool = new TypedProperties.StringValue.ByteBufStringValuePool(valuePoolCapacity, maxCharsLength);
public TypedPropertiesDecoderPools(int keyPoolCapacity, int valuePoolCapacity) {
this.propertyKeysPool = new SimpleString.ByteBufSimpleStringPool(keyPoolCapacity);
this.propertyValuesPool = new TypedProperties.StringValue.ByteBufStringValuePool(valuePoolCapacity);
}
public SimpleString.ByteBufSimpleStringPool getPropertyKeysPool() {

View File

@ -394,7 +394,7 @@ public interface Message {
*/
Message setDurable(boolean durable);
Persister<Message> getPersister();
Persister<Message, CoreMessageObjectPools> getPersister();
String getAddress();
@ -454,7 +454,7 @@ public interface Message {
void persist(ActiveMQBuffer targetRecord);
void reloadPersistence(ActiveMQBuffer record);
void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools);
default void releaseBuffer() {
ByteBuf buffer = getBuffer();

View File

@ -130,7 +130,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
}
@Override
public Persister<Message> getPersister() {
public Persister<Message, CoreMessageObjectPools> getPersister() {
return CoreMessagePersister.getInstance();
}
@ -646,11 +646,15 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
}
private void decode(boolean beforeAddress) {
decode(beforeAddress, coreMessageObjectPools);
}
private void decode(boolean beforeAddress, CoreMessageObjectPools pools) {
endOfBodyPosition = buffer.readInt();
buffer.skipBytes(endOfBodyPosition - BUFFER_HEADER_SPACE);
decodeHeadersAndProperties(buffer, true);
decodeHeadersAndProperties(buffer, true, pools);
buffer.readerIndex(0);
validBuffer = true;
@ -662,14 +666,14 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
}
public void decodeHeadersAndProperties(final ByteBuf buffer) {
decodeHeadersAndProperties(buffer, false);
decodeHeadersAndProperties(buffer, false, coreMessageObjectPools);
}
private void decodeHeadersAndProperties(final ByteBuf buffer, boolean lazyProperties) {
private void decodeHeadersAndProperties(final ByteBuf buffer, boolean lazyProperties, CoreMessageObjectPools pools) {
messageIDPosition = buffer.readerIndex();
messageID = buffer.readLong();
address = SimpleString.readNullableSimpleString(buffer, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressDecoderPool());
address = SimpleString.readNullableSimpleString(buffer, pools == null ? null : pools.getAddressDecoderPool());
if (buffer.readByte() == DataConstants.NOT_NULL) {
byte[] bytes = new byte[16];
buffer.readBytes(bytes);
@ -687,7 +691,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
propertiesLocation = buffer.readerIndex();
} else {
properties = new TypedProperties(INTERNAL_PROPERTY_NAMES_PREDICATE);
properties.decode(buffer, coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesDecoderPools());
properties.decode(buffer, pools == null ? null : pools.getPropertiesDecoderPools());
}
}
@ -1180,11 +1184,11 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
}
@Override
public void reloadPersistence(ActiveMQBuffer record) {
public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools) {
int size = record.readInt();
initBuffer(size);
buffer.setIndex(0, 0).writeBytes(record.byteBuf(), size);
decode(false);
decode(false, pools);
}
@Override

View File

@ -25,14 +25,30 @@ import java.util.function.Supplier;
public class CoreMessageObjectPools {
private Supplier<SimpleString.ByteBufSimpleStringPool> addressDecoderPool = Suppliers.memoize(SimpleString.ByteBufSimpleStringPool::new);
private Supplier<TypedProperties.TypedPropertiesDecoderPools> propertiesDecoderPools = Suppliers.memoize(TypedProperties.TypedPropertiesDecoderPools::new);
private final Supplier<SimpleString.ByteBufSimpleStringPool> addressDecoderPool;
private final Supplier<TypedProperties.TypedPropertiesDecoderPools> propertiesDecoderPools;
private Supplier<SimpleString.StringSimpleStringPool> groupIdStringSimpleStringPool = Suppliers.memoize(SimpleString.StringSimpleStringPool::new);
private Supplier<SimpleString.StringSimpleStringPool> addressStringSimpleStringPool = Suppliers.memoize(SimpleString.StringSimpleStringPool::new);
private Supplier<TypedProperties.TypedPropertiesStringSimpleStringPools> propertiesStringSimpleStringPools = Suppliers.memoize(TypedProperties.TypedPropertiesStringSimpleStringPools::new);
private final Supplier<SimpleString.StringSimpleStringPool> groupIdStringSimpleStringPool;
private final Supplier<SimpleString.StringSimpleStringPool> addressStringSimpleStringPool;
private final Supplier<TypedProperties.TypedPropertiesStringSimpleStringPools> propertiesStringSimpleStringPools;
public CoreMessageObjectPools(int addressPoolCapacity,
int groupIdCapacity,
int propertyKeyCapacity,
int propertyValueCapacity) {
addressDecoderPool = Suppliers.memoize(() -> new SimpleString.ByteBufSimpleStringPool(addressPoolCapacity));
propertiesDecoderPools = Suppliers.memoize(() -> new TypedProperties.TypedPropertiesDecoderPools(propertyKeyCapacity, propertyValueCapacity));
groupIdStringSimpleStringPool = Suppliers.memoize(() -> new SimpleString.StringSimpleStringPool(groupIdCapacity));
addressStringSimpleStringPool = Suppliers.memoize(() -> new SimpleString.StringSimpleStringPool(addressPoolCapacity));
propertiesStringSimpleStringPools = Suppliers.memoize(() -> new TypedProperties.TypedPropertiesStringSimpleStringPools(propertyKeyCapacity, propertyValueCapacity));
}
public CoreMessageObjectPools() {
addressDecoderPool = Suppliers.memoize(SimpleString.ByteBufSimpleStringPool::new);
propertiesDecoderPools = Suppliers.memoize(TypedProperties.TypedPropertiesDecoderPools::new);
groupIdStringSimpleStringPool = Suppliers.memoize(SimpleString.StringSimpleStringPool::new);
addressStringSimpleStringPool = Suppliers.memoize(SimpleString.StringSimpleStringPool::new);
propertiesStringSimpleStringPools = Suppliers.memoize(TypedProperties.TypedPropertiesStringSimpleStringPools::new);
}
public SimpleString.ByteBufSimpleStringPool getAddressDecoderPool() {

View File

@ -23,7 +23,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.utils.DataConstants;
public class CoreMessagePersister implements Persister<Message> {
public class CoreMessagePersister implements Persister<Message, CoreMessageObjectPools> {
public static final byte ID = 1;
private static CoreMessagePersister theInstance;
@ -68,14 +68,18 @@ public class CoreMessagePersister implements Persister<Message> {
record.persist(buffer);
}
@Override
public Message decode(ActiveMQBuffer buffer, Message record) {
public Message decode(ActiveMQBuffer buffer, CoreMessageObjectPools pool) {
// the caller must consume the first byte already, as that will be used to decide what persister (protocol) to use
long id = buffer.readLong();
SimpleString address = buffer.readNullableSimpleString();
record = new CoreMessage();
record.reloadPersistence(buffer);
final SimpleString address;
if (pool == null) {
address = buffer.readNullableSimpleString();
} else {
address = SimpleString.readNullableSimpleString(buffer.byteBuf(), pool.getAddressDecoderPool());
}
CoreMessage record = new CoreMessage();
record.reloadPersistence(buffer, pool);
record.setMessageID(id);
record.setAddress(address);
return record;

View File

@ -248,7 +248,7 @@ public class MessageInternalImpl implements MessageInternal {
}
@Override
public Persister<Message> getPersister() {
public Persister<Message, CoreMessageObjectPools> getPersister() {
throw new UnsupportedOperationException();
}
@ -340,7 +340,7 @@ public class MessageInternalImpl implements MessageInternal {
}
@Override
public void reloadPersistence(ActiveMQBuffer record) {
public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools) {
throw new UnsupportedOperationException();
}

View File

@ -22,7 +22,7 @@ import org.apache.activemq.artemis.core.persistence.Persister;
/** This is a facade between the new Persister and the former EncodingSupport.
* Methods using the old interface will use this as a facade to provide the previous semantic. */
public class EncoderPersister implements Persister<EncodingSupport> {
public class EncoderPersister implements Persister<EncodingSupport, EncodingSupport> {
private static final EncoderPersister theInstance = new EncoderPersister();

View File

@ -741,7 +741,7 @@ public class AMQPMessage extends RefCountMessage {
}
@Override
public void reloadPersistence(ActiveMQBuffer record) {
public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools) {
int size = record.readInt();
byte[] recordArray = new byte[size];
record.readBytes(recordArray);
@ -771,7 +771,7 @@ public class AMQPMessage extends RefCountMessage {
}
@Override
public Persister<org.apache.activemq.artemis.api.core.Message> getPersister() {
public Persister<org.apache.activemq.artemis.api.core.Message, CoreMessageObjectPools> getPersister() {
return AMQPMessagePersisterV2.getInstance();
}

View File

@ -20,6 +20,7 @@ package org.apache.activemq.artemis.protocol.amqp.broker;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
import org.apache.activemq.artemis.utils.DataConstants;
@ -62,12 +63,17 @@ public class AMQPMessagePersister extends MessagePersister {
}
@Override
public Message decode(ActiveMQBuffer buffer, Message record) {
public Message decode(ActiveMQBuffer buffer, CoreMessageObjectPools pool) {
long id = buffer.readLong();
long format = buffer.readLong();
SimpleString address = buffer.readNullableSimpleString();
record = new AMQPMessage(format);
record.reloadPersistence(buffer);
final SimpleString address;
if (pool == null) {
address = buffer.readNullableSimpleString();
} else {
address = SimpleString.readNullableSimpleString(buffer.byteBuf(), pool.getAddressDecoderPool());
}
AMQPMessage record = new AMQPMessage(format);
record.reloadPersistence(buffer, pool);
record.setMessageID(id);
if (address != null) {
record.setAddress(address);

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.protocol.amqp.broker;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
@ -68,16 +69,24 @@ public class AMQPMessagePersisterV2 extends AMQPMessagePersister {
}
}
@Override
public Message decode(ActiveMQBuffer buffer, Message record) {
AMQPMessage message = (AMQPMessage)super.decode(buffer, record);
public Message decode(ActiveMQBuffer buffer, CoreMessageObjectPools pool) {
AMQPMessage message = (AMQPMessage) super.decode(buffer, pool);
int size = buffer.readInt();
if (size != 0) {
TypedProperties properties = new TypedProperties(Message.INTERNAL_PROPERTY_NAMES_PREDICATE);
properties.decode(buffer.byteBuf());
message.setExtraProperties(properties);
// message::setAddress could have populated extra properties
// hence, we can safely replace the value on the properties
// if it has been encoded differently in the rest of the buffer
TypedProperties existingExtraProperties = message.getExtraProperties();
TypedProperties extraProperties = existingExtraProperties;
if (existingExtraProperties == null) {
extraProperties = new TypedProperties(Message.INTERNAL_PROPERTY_NAMES_PREDICATE);
}
extraProperties.decode(buffer.byteBuf(), pool != null ? pool.getPropertiesDecoderPools() : null, existingExtraProperties == null);
if (extraProperties != existingExtraProperties) {
message.setExtraProperties(extraProperties);
}
}
return message;
}

View File

@ -21,6 +21,7 @@ import java.util.Map;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory;
@ -39,7 +40,7 @@ public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory
private static String[] SUPPORTED_PROTOCOLS = {AMQP_PROTOCOL_NAME};
@Override
public Persister<Message>[] getPersister() {
public Persister<Message, CoreMessageObjectPools>[] getPersister() {
Persister[] persisters = new Persister[]{AMQPMessagePersister.getInstance(), AMQPMessagePersisterV2.getInstance()};
return persisters;

View File

@ -147,7 +147,7 @@ public class AMQPMessageTest {
final long persistedSize = (long) encoded.readableBytes();
// Now reload from encoded data
message.reloadPersistence(encoded);
message.reloadPersistence(encoded, null);
assertEquals(persistedSize, message.getPersistSize());
assertEquals(persistedSize - Integer.BYTES, message.getPersistentSize());

View File

@ -140,7 +140,7 @@ public class OpenwireMessage implements Message {
}
@Override
public Persister<Message> getPersister() {
public Persister<Message, CoreMessageObjectPools> getPersister() {
return null;
}
@ -205,7 +205,7 @@ public class OpenwireMessage implements Message {
}
@Override
public void reloadPersistence(ActiveMQBuffer record) {
public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools) {
}

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.core.persistence.impl.journal;
import static org.apache.activemq.artemis.api.core.SimpleString.ByteBufSimpleStringPool.DEFAULT_MAX_LENGTH;
import static org.apache.activemq.artemis.api.core.SimpleString.ByteBufSimpleStringPool.DEFAULT_POOL_CAPACITY;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_CURSOR;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE_PENDING;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.DUPLICATE_ID;
@ -56,6 +58,7 @@ import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
@ -858,6 +861,19 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
}
final MutableLong recordNumber = new MutableLong();
final CoreMessageObjectPools pools;
if (totalSize > 0) {
final int addresses = (int)Math.max(
DEFAULT_POOL_CAPACITY,
queueInfos == null ? 0 :
queueInfos.values().stream()
.map(QueueBindingInfo::getAddress)
.filter(addr -> addr.length() <= DEFAULT_MAX_LENGTH)
.count() * 2);
pools = new CoreMessageObjectPools(addresses, DEFAULT_POOL_CAPACITY, 128, 128);
} else {
pools = null;
}
// This will free up memory sooner while reading the records
records.clear(record -> {
try {
@ -904,7 +920,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
case JournalRecordIds.ADD_MESSAGE_PROTOCOL: {
Message message = MessagePersister.getInstance().decode(buff, null);
Message message = MessagePersister.getInstance().decode(buff, pools);
messages.put(record.id, message);
@ -1716,6 +1732,8 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
final Set<Pair<Long, Long>> pendingLargeMessages,
JournalLoader journalLoader) throws Exception {
// recover prepared transactions
CoreMessageObjectPools pools = null;
for (PreparedTransactionInfo preparedTransaction : preparedTransactions) {
XidEncoding encodingXid = new XidEncoding(preparedTransaction.getExtraData());
@ -1749,7 +1767,10 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
break;
}
case JournalRecordIds.ADD_MESSAGE_PROTOCOL: {
Message message = MessagePersister.getInstance().decode(buff, null);
if (pools == null) {
pools = new CoreMessageObjectPools();
}
Message message = MessagePersister.getInstance().decode(buff, pools);
messages.put(record.id, message);

View File

@ -22,7 +22,7 @@ import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
public class LargeMessagePersister implements Persister<LargeServerMessage> {
public class LargeMessagePersister implements Persister<LargeServerMessage, LargeServerMessage> {
private static final LargeMessagePersister theInstance = new LargeMessagePersister();

View File

@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@ -37,7 +38,7 @@ public class CoreProtocolManagerFactory extends AbstractProtocolManagerFactory<I
private static final String MODULE_NAME = "artemis-server";
@Override
public Persister<Message>[] getPersister() {
public Persister<Message, CoreMessageObjectPools>[] getPersister() {
return new Persister[]{CoreMessagePersister.getInstance()};
}

View File

@ -21,11 +21,12 @@ import java.util.ServiceLoader;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.jboss.logging.Logger;
public class MessagePersister implements Persister<Message> {
public class MessagePersister implements Persister<Message, CoreMessageObjectPools> {
private static final Logger logger = Logger.getLogger(MessagePersister.class);
@ -33,7 +34,7 @@ public class MessagePersister implements Persister<Message> {
/** This will be used for reading messages */
private static final int MAX_PERSISTERS = 3;
private static final Persister<Message>[] persisters = new Persister[MAX_PERSISTERS];
private static final Persister<Message, CoreMessageObjectPools>[] persisters = new Persister[MAX_PERSISTERS];
static {
CoreMessagePersister persister = CoreMessagePersister.getInstance();
@ -46,7 +47,7 @@ public class MessagePersister implements Persister<Message> {
}
public static void registerProtocol(ProtocolManagerFactory manager) {
Persister<Message>[] messagePersisters = manager.getPersister();
Persister<Message, CoreMessageObjectPools>[] messagePersisters = manager.getPersister();
if (messagePersisters == null || messagePersisters.length == 0) {
logger.debug("Cannot find persister for " + manager);
} else {
@ -69,7 +70,7 @@ public class MessagePersister implements Persister<Message> {
return persisters[id - 1];
}
public static void registerPersister(Persister<Message> persister) {
public static void registerPersister(Persister<Message, CoreMessageObjectPools> persister) {
if (persister != null) {
assert persister.getID() <= MAX_PERSISTERS : "You must update MessagePersister::MAX_PERSISTERS to a higher number";
persisters[persister.getID() - 1] = persister;
@ -97,12 +98,12 @@ public class MessagePersister implements Persister<Message> {
}
@Override
public Message decode(ActiveMQBuffer buffer, Message record) {
public Message decode(ActiveMQBuffer buffer, CoreMessageObjectPools pools) {
byte protocol = buffer.readByte();
Persister<Message> persister = getPersister(protocol);
Persister<Message, CoreMessageObjectPools> persister = getPersister(protocol);
if (persister == null) {
throw new NullPointerException("couldn't find factory for type=" + protocol);
}
return persister.decode(buffer, record);
return persister.decode(buffer, pools);
}
}

View File

@ -21,12 +21,13 @@ import java.util.Map;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
public interface ProtocolManagerFactory<P extends BaseInterceptor> {
default Persister<Message>[] getPersister() {
default Persister<Message, CoreMessageObjectPools>[] getPersister() {
return new Persister[]{};
}

View File

@ -323,12 +323,12 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
public void reloadPersistence(ActiveMQBuffer record) {
public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools) {
}
@Override
public Persister<Message> getPersister() {
public Persister<Message, CoreMessageObjectPools> getPersister() {
return null;
}

View File

@ -371,12 +371,12 @@ public class AcknowledgeTest extends ActiveMQTestBase {
}
@Override
public Persister<Message> getPersister() {
public Persister<Message, CoreMessageObjectPools> getPersister() {
return null;
}
@Override
public void reloadPersistence(ActiveMQBuffer record) {
public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools) {
}

View File

@ -39,6 +39,7 @@ import org.apache.activemq.artemis.core.journal.LoaderCallback;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
@ -294,7 +295,7 @@ public class SharedNothingReplicationTest extends ActiveMQTestBase {
return conf;
}
static class SlowMessagePersister extends CoreMessagePersister implements Persister<Message> {
static class SlowMessagePersister extends CoreMessagePersister implements Persister<Message, CoreMessageObjectPools> {
boolean used = false;
@ -343,8 +344,8 @@ public class SharedNothingReplicationTest extends ActiveMQTestBase {
}
@Override
public Message decode(ActiveMQBuffer buffer, Message record) {
return persister.decode(buffer, record);
public Message decode(ActiveMQBuffer buffer, CoreMessageObjectPools pool) {
return persister.decode(buffer, pool);
}
}