diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java index f310fb3108..fe51f15cbb 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java @@ -91,7 +91,7 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage private volatile AmqpReadableBuffer parsingData; - private final StorageManager storageManager; + private StorageManager storageManager; public AMQPLargeMessage(long id, long messageFormat, @@ -183,6 +183,12 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage return largeBody.getStorageManager(); } + @Override + public void setStorageManager(StorageManager storageManager) { + largeBody.setStorageManager(storageManager); + this.storageManager = storageManager; + } + @Override public final boolean isDurable() { if (fileDurable != null) { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessagePersister.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessagePersister.java index f573e6b694..61b41c501e 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessagePersister.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessagePersister.java @@ -22,7 +22,6 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools; -import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager; import org.apache.activemq.artemis.spi.core.protocol.MessagePersister; import org.apache.activemq.artemis.utils.DataConstants; import org.apache.activemq.artemis.utils.collections.TypedProperties; @@ -116,7 +115,7 @@ public class AMQPLargeMessagePersister extends MessagePersister { properties = null; } - AMQPLargeMessage largeMessage = new AMQPLargeMessage(id, format, properties, null, AbstractJournalStorageManager.getThreadLocal()); + AMQPLargeMessage largeMessage = new AMQPLargeMessage(id, format, properties, null, null); largeMessage.setFileDurable(durable); if (address != null) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java index 7582eca06f..76f5a05271 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java @@ -100,7 +100,6 @@ public class PagedReferenceImpl extends LinkedListImpl.Node @Override public void onDelivery(Consumer onDelivery) { - assert this.onDelivery == null; this.onDelivery = onDelivery; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java index 315d566c73..b0e84190d6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java @@ -34,6 +34,7 @@ import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.utils.DataConstants; import org.apache.activemq.artemis.utils.Env; import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; @@ -381,6 +382,7 @@ public final class Page implements Comparable { fileBuffer.position(endPosition + 1); assert fileBuffer.get(endPosition) == Page.END_BYTE : "decoding cannot change end byte"; msg.initMessage(storage); + assert msg.getMessage() instanceof LargeServerMessage && ((LargeServerMessage)msg.getMessage()).getStorageManager() != null || !(msg.getMessage() instanceof LargeServerMessage); if (logger.isTraceEnabled()) { logger.tracef("Reading message %s on pageId=%d for address=%s", msg, pageId, storeName); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java index 67d902f449..f137af5311 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java @@ -89,10 +89,17 @@ public class PagedMessageImpl implements PagedMessage { ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(largeMessageLazyData); lgMessage = LargeMessagePersister.getInstance().decode(buffer, lgMessage, null); + if (lgMessage.toMessage() instanceof LargeServerMessage) { + ((LargeServerMessage)lgMessage.toMessage()).setStorageManager(storage); + } lgMessage.toMessage().usageUp(); lgMessage.setPaged(); this.message = lgMessage.toMessage(); largeMessageLazyData = null; + } else { + if (message != null && message instanceof LargeServerMessage) { + ((LargeServerMessage)message).setStorageManager(storageManager); + } } } @@ -123,10 +130,11 @@ public class PagedMessageImpl implements PagedMessage { } else { this.message = storageManager.createLargeMessage().toMessage(); LargeMessagePersister.getInstance().decode(buffer, (LargeServerMessage) message, null); + ((LargeServerMessage) message).setStorageManager(storageManager); ((LargeServerMessage) message).toMessage().usageUp(); } } else { - this.message = MessagePersister.getInstance().decode(buffer, null, null); + this.message = MessagePersister.getInstance().decode(buffer, null, null, storageManager); } int queueIDsSize = buffer.readInt(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index aac727dae5..fdb1e8cfd4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -135,18 +135,6 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp protected static final int CRITICAL_STOP_2 = 2; - public static ThreadLocal storageManagerThreadLocal = new ThreadLocal<>(); - - /** Persisters may need to access this on reloading of the journal, - * for large message processing */ - public static void setupThreadLocal(StorageManager manager) { - storageManagerThreadLocal.set(manager); - } - - public static StorageManager getThreadLocal() { - return storageManagerThreadLocal.get(); - } - private static final Logger logger = Logger.getLogger(AbstractJournalStorageManager.class); public enum JournalContent { @@ -857,7 +845,6 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp Map messages = new HashMap<>(); readLock(); - setupThreadLocal(this); try { JournalLoadInformation info = messageJournal.load(records, preparedTransactions, new LargeMessageTXFailureCallback(this)); @@ -935,15 +922,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp case JournalRecordIds.ADD_MESSAGE_PROTOCOL: { - Message message = MessagePersister.getInstance().decode(buff, null, pools); - - /* if (message instanceof LargeServerMessage) { - try { - ((LargeServerMessage) message).finishParse(); - } catch (Exception e) { - logger.warn(e.getMessage(), e); - } - } */ + Message message = decodeMessage(pools, buff); messages.put(record.id, message); @@ -1240,11 +1219,14 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp return info; } finally { readUnLock(); - // need to clear it, otherwise we may have a permanent leak - setupThreadLocal(null); } } + private Message decodeMessage(CoreMessageObjectPools pools, ActiveMQBuffer buff) { + Message message = MessagePersister.getInstance().decode(buff, null, pools, this); + return message; + } + public void checkInvalidPageTransactions(PagingManager pagingManager, Set invalidPageTransactions) { if (invalidPageTransactions != null && !invalidPageTransactions.isEmpty()) { @@ -1795,7 +1777,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp if (pools == null) { pools = new CoreMessageObjectPools(); } - Message message = MessagePersister.getInstance().decode(buff, null, pools); + Message message = decodeMessage(pools, buff); messages.put(record.id, message); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java index 9ee72ea00e..3ac704e388 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java @@ -569,7 +569,7 @@ public final class DescribeJournal { return "ADD-MESSAGE is not supported any longer, use export/import"; } case ADD_MESSAGE_PROTOCOL: { - Message message = MessagePersister.getInstance().decode(buffer, null, null); + Message message = MessagePersister.getInstance().decode(buffer, null, null, storageManager); return new MessageDescribe(message); } case ADD_REF: { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index 567ee03dfc..71a7f452f7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -359,6 +359,8 @@ public class JournalStorageManager extends AbstractJournalStorageManager { LargeMessagePersister.getInstance().decode(buff, largeMessage, null); + largeMessage.setStorageManager(this); + if (largeMessage.toMessage().containsProperty(Message.HDR_ORIG_MESSAGE_ID)) { // for compatibility: couple with old behaviour, copying the old file to avoid message loss long originalMessageID = largeMessage.toMessage().getLongProperty(Message.HDR_ORIG_MESSAGE_ID); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java index 9d17f6e003..dec2c2a4a6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java @@ -44,7 +44,7 @@ public class LargeBody { private long pendingRecordID = NO_PENDING_ID; - final StorageManager storageManager; + StorageManager storageManager; private long messageID = -1; @@ -69,6 +69,10 @@ public class LargeBody { return storageManager; } + public void setStorageManager(StorageManager storageManager) { + this.storageManager = storageManager; + } + public ByteBuffer map() throws Exception { ensureFileExists(true); if (!file.isOpen()) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java index 3ab3f21b92..c8a82be6ba 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java @@ -252,6 +252,11 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar } } + @Override + public void setStorageManager(StorageManager storageManager) { + this.largeBody.setStorageManager(storageManager); + } + @Override public Message copy() { SequentialFile newfile = storageManager.createFileForLargeMessage(messageID, durable); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java index 76165e8892..8252f34bbc 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java @@ -70,6 +70,11 @@ class NullStorageLargeServerMessage extends CoreMessage implements CoreLargeServ } + @Override + public void setStorageManager(StorageManager storageManager) { + + } + @Override public synchronized void addBytes(ActiveMQBuffer bytes) { final int readableBytes = bytes.readableBytes(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java index 311cd30e1b..2c40da32d1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java @@ -17,7 +17,9 @@ package org.apache.activemq.artemis.core.protocol; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.message.impl.CoreMessage; +import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; @@ -28,6 +30,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupResp import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ClusterConnectMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ClusterConnectReplyMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.FederationDownstreamConnectMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.MessagePacketI; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NodeAnnounceMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.QuorumVoteMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.QuorumVoteReplyMessage; @@ -55,6 +58,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSen import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_V2; +import org.apache.activemq.artemis.core.server.LargeServerMessage; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST_RESPONSE; @@ -88,6 +92,13 @@ public class ServerPacketDecoder extends ClientPacketDecoder { private static final long serialVersionUID = 3348673114388400766L; + private final StorageManager storageManager; + + public ServerPacketDecoder(StorageManager storageManager) { + assert storageManager != null; + this.storageManager = storageManager; + } + private SessionSendMessage decodeSessionSendMessage(final ActiveMQBuffer in, CoreRemotingConnection connection) { final SessionSendMessage sendMessage; @@ -265,6 +276,14 @@ public class ServerPacketDecoder extends ClientPacketDecoder { packet.decode(in); + if (packet instanceof MessagePacketI) { + Message message = ((MessagePacketI)packet).getMessage(); + if (message instanceof LargeServerMessage) { + assert storageManager != null; + ((LargeServerMessage) message).setStorageManager(storageManager); + } + } + return packet; } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index 058d636dff..46702cf15f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -1031,6 +1031,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { } LargeServerMessage message = currentLargeMessage; + currentLargeMessage.setStorageManager(storageManager); currentLargeMessage = null; session.doSend(session.getCurrentTransaction(), EmbedMessageUtil.extractEmbedded((ICoreMessage)message.toMessage(), storageManager), null, false, false); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java index c596c6eb58..2e60d63ea6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java @@ -127,7 +127,7 @@ public class CoreProtocolManager implements ProtocolManager { Executor connectionExecutor = server.getExecutorFactory().getExecutor(); - final CoreRemotingConnection rc = new RemotingConnectionImpl(new ServerPacketDecoder(), + final CoreRemotingConnection rc = new RemotingConnectionImpl(new ServerPacketDecoder(server.getStorageManager()), connection, incomingInterceptors, outgoingInterceptors, server.getNodeID(), connectionExecutor); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java index 7aa9f8a1f9..df43c146cc 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java @@ -17,12 +17,13 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.impl.PagedMessageImpl; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.utils.DataConstants; -public class ReplicationPageWriteMessage extends PacketImpl { +public class ReplicationPageWriteMessage extends PacketImpl implements MessagePacketI { private int pageNumber; @@ -83,6 +84,17 @@ public class ReplicationPageWriteMessage extends PacketImpl { return result; } + @Override + public Message getMessage() { + return pagedMessage.getMessage(); + } + + @Override + public ReplicationPageWriteMessage replaceMessage(Message message) { + // nothing to be done + return this; + } + @Override public boolean equals(Object obj) { if (this == obj) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java index 9175781005..2dcf4042c3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java @@ -64,5 +64,7 @@ public interface LargeServerMessage extends ReplicatedLargeMessage { LargeBody getLargeBody(); + void setStorageManager(StorageManager storageManager); + void finishParse() throws Exception; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ActiveMQServerSideProtocolManagerFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ActiveMQServerSideProtocolManagerFactory.java index 209f68f333..eae6c36de9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ActiveMQServerSideProtocolManagerFactory.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ActiveMQServerSideProtocolManagerFactory.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.server.cluster; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.protocol.ServerPacketDecoder; import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQClientProtocolManager; import org.apache.activemq.artemis.core.protocol.core.impl.PacketDecoder; @@ -31,6 +32,12 @@ public class ActiveMQServerSideProtocolManagerFactory implements ClientProtocolM ServerLocator locator; + final StorageManager storageManager; + + private ActiveMQServerSideProtocolManagerFactory(StorageManager storageManager) { + this.storageManager = storageManager; + } + @Override public ServerLocator getLocator() { return locator; @@ -41,15 +48,12 @@ public class ActiveMQServerSideProtocolManagerFactory implements ClientProtocolM this.locator = locator; } - public static ActiveMQServerSideProtocolManagerFactory getInstance(ServerLocator locator) { - ActiveMQServerSideProtocolManagerFactory instance = new ActiveMQServerSideProtocolManagerFactory(); + public static ActiveMQServerSideProtocolManagerFactory getInstance(ServerLocator locator, StorageManager storageManager) { + ActiveMQServerSideProtocolManagerFactory instance = new ActiveMQServerSideProtocolManagerFactory(storageManager); instance.setLocator(locator); return instance; } - private ActiveMQServerSideProtocolManagerFactory() { - } - private static final long serialVersionUID = 1; @Override @@ -66,7 +70,7 @@ public class ActiveMQServerSideProtocolManagerFactory implements ClientProtocolM @Override protected PacketDecoder createPacketDecoder() { - return new ServerPacketDecoder(); + return new ServerPacketDecoder(storageManager); } } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java index 5aef7ac46f..25456b47b4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java @@ -227,7 +227,7 @@ public class BackupManager implements ActiveMQComponent { backupServerLocator.setIdentity("backupLocatorFor='" + server + "'"); backupServerLocator.setReconnectAttempts(-1); backupServerLocator.setInitialConnectAttempts(-1); - backupServerLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(backupServerLocator)); + backupServerLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(backupServerLocator, server.getStorageManager())); } } @@ -359,7 +359,7 @@ public class BackupManager implements ActiveMQComponent { ServerLocatorImpl locator = new ServerLocatorImpl(topology, true, tcConfigs); locator.setClusterConnection(true); locator.setRetryInterval(retryInterval); - locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator)); + locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator, server.getStorageManager())); return locator; } return null; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java index 86cd0df154..42fcf75c0e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java @@ -191,7 +191,7 @@ public class ClusterController implements ActiveMQComponent { serverLocator.setRetryIntervalMultiplier(config.getRetryIntervalMultiplier()); serverLocator.setMaxRetryInterval(config.getMaxRetryInterval()); //this is used for replication so need to use the server packet decoder - serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator)); + serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator, server.getStorageManager())); serverLocator.setThreadPools(server.getThreadPool(), server.getScheduledPool()); try { serverLocator.initialize(); @@ -254,7 +254,7 @@ public class ClusterController implements ActiveMQComponent { * @return the Cluster Control */ public ClusterControl connectToNodeInCluster(ClientSessionFactoryInternal sf) { - sf.getServerLocator().setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(sf.getServerLocator())); + sf.getServerLocator().setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(sf.getServerLocator(), server.getStorageManager())); return new ClusterControl(sf, server); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java index ade5d0cb01..77bc936fd8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java @@ -40,6 +40,7 @@ import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal; import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl; import org.apache.activemq.artemis.core.filter.Filter; +import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.postoffice.BindingType; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; @@ -79,6 +80,8 @@ public class ClusterConnectionBridge extends BridgeImpl { private final long targetNodeEventUID; + private final StorageManager storageManager; + private final ServerLocatorInternal discoveryLocator; private final String storeAndForwardPrefix; @@ -111,7 +114,8 @@ public class ClusterConnectionBridge extends BridgeImpl { final SimpleString managementNotificationAddress, final MessageFlowRecord flowRecord, final TransportConfiguration connector, - final String storeAndForwardPrefix) { + final String storeAndForwardPrefix, + final StorageManager storageManager) { super(targetLocator, initialConnectAttempts, reconnectAttempts, 0, // reconnectAttemptsOnSameNode means nothing on the clustering bridge since we always try the same retryInterval, retryMultiplier, maxRetryInterval, nodeUUID, name, queue, executor, filterString, forwardingAddress, scheduledExecutor, transformer, useDuplicateDetection, user, password, server, ComponentConfigurationRoutingType.valueOf(ActiveMQDefaultConfiguration.getDefaultBridgeRoutingType())); @@ -134,11 +138,13 @@ public class ClusterConnectionBridge extends BridgeImpl { } this.storeAndForwardPrefix = storeAndForwardPrefix; + + this.storageManager = storageManager; } @Override protected ClientSessionFactoryInternal createSessionFactory() throws Exception { - serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator)); + serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator, storageManager)); ClientSessionFactoryInternal factory = (ClientSessionFactoryInternal) serverLocator.createSessionFactory(targetNodeID); //if it is null then its possible the broker was removed after a disconnect so lets try the original connectors if (factory == null) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java index d82b2ea5df..b223fc133c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java @@ -628,7 +628,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn serverLocator.setAfterConnectionInternalListener(this); - serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator)); + serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator, server.getStorageManager())); serverLocator.start(server.getExecutorFactory().getExecutor()); } @@ -816,7 +816,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn targetLocator.setAfterConnectionInternalListener(this); - serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator)); + serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator, server.getStorageManager())); targetLocator.setNodeID(nodeId); @@ -830,7 +830,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn targetLocator.addIncomingInterceptor(new IncomingInterceptorLookingForExceptionMessage(manager, executorFactory.getExecutor())); MessageFlowRecordImpl record = new MessageFlowRecordImpl(targetLocator, eventUID, targetNodeID, connector, queueName, queue); - ClusterConnectionBridge bridge = new ClusterConnectionBridge(this, manager, targetLocator, serverLocator, initialConnectAttempts, reconnectAttempts, retryInterval, retryIntervalMultiplier, maxRetryInterval, nodeManager.getUUID(), record.getEventUID(), record.getTargetNodeID(), record.getQueueName(), record.getQueue(), executorFactory.getExecutor(), null, null, scheduledExecutor, null, useDuplicateDetection, clusterUser, clusterPassword, server, managementService.getManagementAddress(), managementService.getManagementNotificationAddress(), record, record.getConnector(), storeAndForwardPrefix); + ClusterConnectionBridge bridge = new ClusterConnectionBridge(this, manager, targetLocator, serverLocator, initialConnectAttempts, reconnectAttempts, retryInterval, retryIntervalMultiplier, maxRetryInterval, nodeManager.getUUID(), record.getEventUID(), record.getTargetNodeID(), record.getQueueName(), record.getQueue(), executorFactory.getExecutor(), null, null, scheduledExecutor, null, useDuplicateDetection, clusterUser, clusterPassword, server, managementService.getManagementAddress(), managementService.getManagementNotificationAddress(), record, record.getConnector(), storeAndForwardPrefix, server.getStorageManager()); targetLocator.setIdentity("(Cluster-connection-bridge::" + bridge.toString() + "::" + this.toString() + ")"); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BackupRecoveryJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BackupRecoveryJournalLoader.java index 77b9f3f9c4..300ee704c4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BackupRecoveryJournalLoader.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BackupRecoveryJournalLoader.java @@ -49,6 +49,7 @@ public class BackupRecoveryJournalLoader extends PostOfficeJournalLoader { private ActiveMQServer parentServer; private ServerLocator locator; private final ClusterController clusterController; + private final StorageManager storageManager; public BackupRecoveryJournalLoader(PostOffice postOffice, PagingManager pagingManager, @@ -66,6 +67,7 @@ public class BackupRecoveryJournalLoader extends PostOfficeJournalLoader { this.parentServer = parentServer; this.locator = locator; this.clusterController = clusterController; + this.storageManager = storageManager; } @Override @@ -88,7 +90,7 @@ public class BackupRecoveryJournalLoader extends PostOfficeJournalLoader { ResourceManager resourceManager, Map>> duplicateIDMap) throws Exception { ScaleDownHandler scaleDownHandler = new ScaleDownHandler(pagingManager, postOffice, nodeManager, clusterController, parentServer.getStorageManager()); - locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator)); + locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator, storageManager)); try (ClientSessionFactory sessionFactory = locator.createSessionFactory()) { scaleDownHandler.scaleDown(sessionFactory, resourceManager, duplicateIDMap, parentServer.getConfiguration().getManagementAddress(), parentServer.getNodeID()); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LiveOnlyActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LiveOnlyActivation.java index 4c7d4fbf83..cea644fc56 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LiveOnlyActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LiveOnlyActivation.java @@ -139,7 +139,7 @@ public class LiveOnlyActivation extends Activation { try { scaleDownServerLocator = ScaleDownPolicy.getScaleDownConnector(scaleDownPolicy, activeMQServer); //use a Node Locator to connect to the cluster - scaleDownServerLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(scaleDownServerLocator)); + scaleDownServerLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(scaleDownServerLocator, activeMQServer.getStorageManager())); LiveNodeLocator nodeLocator = scaleDownPolicy.getGroupName() == null ? new AnyLiveNodeLocatorForScaleDown(activeMQServer) : new NamedLiveNodeLocatorForScaleDown(scaleDownPolicy.getGroupName(), activeMQServer); scaleDownServerLocator.addClusterTopologyListener(nodeLocator); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/EmbedMessageUtil.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/EmbedMessageUtil.java index 310971f0f6..279816efa0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/EmbedMessageUtil.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/EmbedMessageUtil.java @@ -25,7 +25,6 @@ import org.apache.activemq.artemis.api.core.RefCountMessage; import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.persistence.StorageManager; -import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager; import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl; import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.jboss.logging.Logger; @@ -95,18 +94,16 @@ public class EmbedMessageUtil { } private static Message readEncoded(ICoreMessage message, StorageManager storageManager, ActiveMQBuffer buffer) { - - - AbstractJournalStorageManager.setupThreadLocal(storageManager); try { - Message returnMessage = MessagePersister.getInstance().decode(buffer, null, null); + Message returnMessage = MessagePersister.getInstance().decode(buffer, null, null, storageManager); + if (returnMessage instanceof LargeServerMessage) { + ((LargeServerMessage)returnMessage).setStorageManager(storageManager); + } returnMessage.setMessageID(message.getMessageID()); return returnMessage; } catch (Exception e) { logger.warn(e.getMessage(), e); return message; - } finally { - AbstractJournalStorageManager.setupThreadLocal(null); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java index 808d094d98..d4d47d69e9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java @@ -24,6 +24,8 @@ import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools; import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister; import org.apache.activemq.artemis.core.persistence.Persister; +import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.jboss.logging.Logger; import static org.apache.activemq.artemis.core.persistence.PersisterIDs.MAX_PERSISTERS; @@ -105,11 +107,20 @@ public class MessagePersister implements Persister { @Override public Message decode(ActiveMQBuffer buffer, Message record, CoreMessageObjectPools pools) { + return decode(buffer, record, pools, null); + } + + + public Message decode(ActiveMQBuffer buffer, Message record, CoreMessageObjectPools pools, StorageManager storageManager) { byte protocol = buffer.readByte(); Persister persister = getPersister(protocol); if (persister == null) { throw new NullPointerException("couldn't find factory for type=" + protocol); } - return persister.decode(buffer, record, pools); + Message message = persister.decode(buffer, record, pools); + if (message instanceof LargeServerMessage) { + ((LargeServerMessage) message).setStorageManager(storageManager); + } + return message; } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java index f7cbd6200e..50f0a62659 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java @@ -126,7 +126,7 @@ public class ClusterControllerTest extends ClusterTestBase { @Test public void controlWithDifferentConnector() throws Exception { try (ServerLocatorImpl locator = (ServerLocatorImpl) createInVMNonHALocator()) { - locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator)); + locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator, servers[0].getStorageManager())); ClusterController controller = new ClusterController(getServer(0), getServer(0).getScheduledPool()); ClusterControl clusterControl = controller.connectToNodeInCluster((ClientSessionFactoryInternal) locator.createSessionFactory()); clusterControl.authorize(); @@ -136,7 +136,7 @@ public class ClusterControllerTest extends ClusterTestBase { @Test public void controlWithDifferentPassword() throws Exception { try (ServerLocatorImpl locator = (ServerLocatorImpl) createInVMNonHALocator()) { - locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator)); + locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator, servers[0].getStorageManager())); ClusterController controller = new ClusterController(getServer(1), getServer(1).getScheduledPool()); ClusterControl clusterControl = controller.connectToNodeInCluster((ClientSessionFactoryInternal) locator.createSessionFactory()); try { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java index 4f92cfdd65..32d5d2918d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java @@ -1396,7 +1396,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase { setSessionFactoryCreateLocator(node, ha, serverTotc); - locators[node].setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locators[node])); + locators[node].setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locators[node], servers[node].getStorageManager())); locators[node].setBlockOnNonDurableSend(true).setBlockOnDurableSend(true); addServerLocator(locators[node]); diff --git a/tests/smoke-tests/pom.xml b/tests/smoke-tests/pom.xml index 5efbfcb7bf..2482a8de43 100644 --- a/tests/smoke-tests/pom.xml +++ b/tests/smoke-tests/pom.xml @@ -147,6 +147,10 @@ -Djava.net.preferIPv4Stack=true ${basedir}/target/replicated-static0 ${basedir}/target/classes/servers/replicated-static0 + + --java-options + -ea + @@ -160,6 +164,10 @@ -Djava.net.preferIPv4Stack=true ${basedir}/target/replicated-static1 ${basedir}/target/classes/servers/replicated-static1 + + --java-options + -ea + diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/SoakPagingTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/SoakPagingTest.java index f72990da86..d89fd4a983 100644 --- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/SoakPagingTest.java +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/SoakPagingTest.java @@ -115,6 +115,11 @@ public class SoakPagingTest extends SmokeTestBase { public void produce(ConnectionFactory factory) { try { + + StringBuffer bufferlarge = new StringBuffer(); + while (bufferlarge.length() < 110000) { + bufferlarge.append("asdflkajdhsf akljsdfh akljsdfh alksjdfh alkdjsf "); + } Connection connection = factory.createConnection("admin", "admin"); connection.start(); @@ -125,7 +130,13 @@ public class SoakPagingTest extends SmokeTestBase { int i = 0; while (true) { - Message message = session.createTextMessage("fkjdslkfjdskljf;lkdsjf;kdsajf;lkjdf;kdsajf;kjdsa;flkjdsa;lfkjdsa;flkj;dsakjf;dsajf;askjd;fkj;dsajflaskfja;fdlkajs;lfdkja;kfj;dsakfj;akdsjf;dsakjf;akfj;lakdsjf;lkasjdf;ksajf;kjdsa;fkj;adskjf;akdsjf;kja;sdkfj;akdsjf;akjdsf;adskjf;akdsjf;askfj;aksjfkdjafndmnfmdsnfjadshfjdsalkfjads;fkjdsa;kfja;skfj;akjfd;akjfd;ksaj;fkja;kfj;dsakjf;dsakjf;dksjf;akdsjf;kdsajf"); + + Message message; + if (i % 100 == 0) { + message = session.createTextMessage(bufferlarge.toString()); + } else { + message = session.createTextMessage("fkjdslkfjdskljf;lkdsjf;kdsajf;lkjdf;kdsajf;kjdsa;flkjdsa;lfkjdsa;flkj;dsakjf;dsajf;askjd;fkj;dsajflaskfja;fdlkajs;lfdkja;kfj;dsakfj;akdsjf;dsakjf;akfj;lakdsjf;lkasjdf;ksajf;kjdsa;fkj;adskjf;akdsjf;kja;sdkfj;akdsjf;akjdsf;adskjf;akdsjf;askfj;aksjfkdjafndmnfmdsnfjadshfjdsalkfjads;fkjdsa;kfja;skfj;akjfd;akjfd;ksaj;fkja;kfj;dsakjf;dsakjf;dksjf;akdsjf;kdsajf"); + } messageProducer.send(message); produced.incrementAndGet();