diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java index fcc5d51b3b..d3ebbab4fd 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java @@ -266,11 +266,6 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage } } - @Override - public void validateFile() throws ActiveMQException { - largeBody.validateFile(); - } - public void setFileDurable(boolean value) { this.fileDurable = value; } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java index 944427f2a8..6a44c6339f 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java @@ -93,7 +93,7 @@ public class AMQPLargeMessageReader implements MessageReader { sessionSPI.getStorageManager()); currentMessage.parseHeader(dataBuffer); - sessionSPI.getStorageManager().largeMessageCreated(id, currentMessage); + sessionSPI.getStorageManager().onLargeMessageCreate(id, currentMessage); } currentMessage.addBytes(dataBuffer); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageReader.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageReader.java index 2fa661d75a..b75e6667a7 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageReader.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageReader.java @@ -295,7 +295,7 @@ public class AMQPTunneledCoreLargeMessageReader implements MessageReader { coreMessage.decodeHeadersAndProperties(coreHeadersBuffer); - coreLargeMessage = sessionSPI.getStorageManager().createLargeMessage(id, coreMessage); + coreLargeMessage = sessionSPI.getStorageManager().createCoreLargeMessage(id, coreMessage); coreHeadersBuffer = null; // Buffer can be discarded once the decode is done state = State.BODY_SECTION_PENDING; } catch (ActiveMQException ex) { diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java index cd1b7dc25f..1a366223c3 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java @@ -420,7 +420,7 @@ public class StompSession implements SessionCallback { StorageManager storageManager = ((ServerSessionImpl) session).getStorageManager(); long id = storageManager.generateID(); - LargeServerMessage largeMessage = storageManager.createLargeMessage(id, message); + LargeServerMessage largeMessage = storageManager.createCoreLargeMessage(id, message); ActiveMQBuffer body = message.getReadOnlyBodyBuffer(); byte[] bytes = new byte[body.readableBytes()]; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java index 10f68441c0..ff0d4fef77 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java @@ -161,7 +161,7 @@ public class PagedMessageImpl implements PagedMessage { @Override public void initMessage(StorageManager storage) { if (largeMessageLazyData != null) { - LargeServerMessage lgMessage = storage.createLargeMessage(); + LargeServerMessage lgMessage = storage.createCoreLargeMessage(); ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(largeMessageLazyData); lgMessage = LargeMessagePersister.getInstance().decode(buffer, lgMessage, null); @@ -213,7 +213,7 @@ public class PagedMessageImpl implements PagedMessage { largeMessageLazyData = new byte[largeMessageHeaderSize]; buffer.readBytes(largeMessageLazyData); } else { - this.message = storageManager.createLargeMessage().toMessage(); + this.message = storageManager.createCoreLargeMessage().toMessage(); LargeMessagePersister.getInstance().decode(buffer, (LargeServerMessage) message, null); ((LargeServerMessage) message).setStorageManager(storageManager); ((LargeServerMessage) message).toMessage().usageUp(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java index 8990574005..e2582fba7c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java @@ -221,10 +221,10 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent { void deleteDuplicateIDTransactional(long txID, long recordID) throws Exception; - LargeServerMessage createLargeMessage(); + LargeServerMessage createCoreLargeMessage(); /** - * Creates a new LargeMessage with the given id. + * Creates a new LargeServerMessage for the core Protocol with the given id. * * @param id * @param message This is a temporary message that holds the parsed properties. The remoting @@ -232,9 +232,10 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent { * @return a large message object * @throws Exception */ - LargeServerMessage createLargeMessage(long id, Message message) throws Exception; + LargeServerMessage createCoreLargeMessage(long id, Message message) throws Exception; - LargeServerMessage largeMessageCreated(long id, LargeServerMessage largeMessage) throws Exception; + /** Other protocols may inform the storage manager when a large message was created. */ + LargeServerMessage onLargeMessageCreate(long id, LargeServerMessage largeMessage) throws Exception; enum LargeMessageExtension { DURABLE(".msg"), TEMPORARY(".tmp"), SYNC(".sync"); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index e363a334cd..9ba9861af3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -353,7 +353,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { * @throws Exception */ protected LargeServerMessage parseLargeMessage(final ActiveMQBuffer buff) throws Exception { - LargeServerMessage largeMessage = createLargeMessage(); + LargeServerMessage largeMessage = createCoreLargeMessage(); LargeMessagePersister.getInstance().decode(buff, largeMessage, null); @@ -501,12 +501,12 @@ public class JournalStorageManager extends AbstractJournalStorageManager { } @Override - public LargeServerMessage createLargeMessage() { + public LargeServerMessage createCoreLargeMessage() { return new LargeServerMessageImpl(this); } @Override - public LargeServerMessage createLargeMessage(final long id, final Message message) throws Exception { + public LargeServerMessage createCoreLargeMessage(final long id, final Message message) throws Exception { if (logger.isTraceEnabled()) { logger.trace("Initializing large message {}", id, new Exception("trace")); } @@ -515,16 +515,16 @@ public class JournalStorageManager extends AbstractJournalStorageManager { replicator.largeMessageBegin(id); } - LargeServerMessageImpl largeMessage = (LargeServerMessageImpl) createLargeMessage(); + LargeServerMessageImpl largeMessage = (LargeServerMessageImpl) createCoreLargeMessage(); largeMessage.moveHeadersAndProperties(message); - return largeMessageCreated(id, largeMessage); + return onLargeMessageCreate(id, largeMessage); } } @Override - public LargeServerMessage largeMessageCreated(long id, LargeServerMessage largeMessage) throws Exception { + public LargeServerMessage onLargeMessageCreate(long id, LargeServerMessage largeMessage) throws Exception { largeMessage.setMessageID(id); // Check durable large massage size before to allocate resources if it can't be stored @@ -545,11 +545,6 @@ public class JournalStorageManager extends AbstractJournalStorageManager { } } - // We do this here to avoid a case where the replication gets a list without this file - // to avoid a race - largeMessage.validateFile(); - - return largeMessage; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java index a4eb5973bb..99e217513f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java @@ -158,7 +158,7 @@ public class LargeBody { bodySize += readableBytes; } - public synchronized void validateFile() throws ActiveMQException { + private void validateFile() throws ActiveMQException { this.ensureFileExists(true); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java index 46eefc67b8..cc0768b254 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java @@ -76,7 +76,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar if (logger.isDebugEnabled()) { logger.debug("asLargeMessage create largeMessage with id={}", id); } - LargeServerMessage lsm = storageManager.createLargeMessage(id, coreMessage); + LargeServerMessage lsm = storageManager.createCoreLargeMessage(id, coreMessage); ActiveMQBuffer messageBodyBuffer = coreMessage.getReadOnlyBodyBuffer(); final int readableBytes = messageBodyBuffer.readableBytes(); @@ -327,7 +327,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar if (logger.isDebugEnabled()) { logger.debug("Copy large message id={} as newID={}", this.getMessageID(), newID); } - LargeServerMessage newMessage = storageManager.createLargeMessage(newID, this); + LargeServerMessage newMessage = storageManager.createCoreLargeMessage(newID, this); largeBody.copyInto(newMessage); newMessage.releaseResources(true, true); return newMessage.toMessage(); @@ -361,17 +361,4 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar return "LargeServerMessage[messageID=" + messageID + "]"; } } - - @Override - public void validateFile() throws ActiveMQException { - this.ensureFileExists(true); - } - - public void ensureFileExists(boolean toOpen) throws ActiveMQException { - synchronized (largeBody) { - largeBody.ensureFileExists(toOpen); - } - } - - } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java index 561ada14a8..bcfc7be282 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java @@ -45,7 +45,7 @@ public final class LargeServerMessageInSync implements ReplicatedLargeMessage { * @param storageManager */ public LargeServerMessageInSync(StorageManager storageManager) { - mainLM = storageManager.createLargeMessage(); + mainLM = storageManager.createCoreLargeMessage(); this.storageManager = storageManager; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java index 1f595271b4..6d63f01456 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java @@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.persistence.impl.nullpm; import io.netty.buffer.Unpooled; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.io.SequentialFile; @@ -67,11 +66,6 @@ class NullStorageLargeServerMessage extends CoreMessage implements CoreLargeServ buffer.writeBytes(bytes); } - @Override - public void validateFile() throws ActiveMQException { - - } - @Override public void setStorageManager(StorageManager storageManager) { this.storageManager = storageManager; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java index bdad111535..4f62de0bbe 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java @@ -315,12 +315,12 @@ public class NullStorageManager implements StorageManager { } @Override - public LargeServerMessage createLargeMessage() { + public LargeServerMessage createCoreLargeMessage() { return new NullStorageLargeServerMessage(); } @Override - public LargeServerMessage createLargeMessage(final long id, final Message message) { + public LargeServerMessage createCoreLargeMessage(final long id, final Message message) { NullStorageLargeServerMessage largeMessage = new NullStorageLargeServerMessage(); largeMessage.moveHeadersAndProperties(message); @@ -331,7 +331,7 @@ public class NullStorageManager implements StorageManager { } @Override - public LargeServerMessage largeMessageCreated(long id, LargeServerMessage largeMessage) throws Exception { + public LargeServerMessage onLargeMessageCreate(long id, LargeServerMessage largeMessage) throws Exception { return null; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index cf23f22642..fbaee45bb9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -1083,7 +1083,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { if (logger.isDebugEnabled()) { logger.debug("initializing large message {}", id); } - LargeServerMessage largeMsg = storageManager.createLargeMessage(id, message); + LargeServerMessage largeMsg = storageManager.createCoreLargeMessage(id, message); logger.trace("sendLarge::{}", largeMsg); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java index 86928d36fa..b3951c9d1c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java @@ -683,7 +683,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon if (liveToBackupSync) { msg = new LargeServerMessageInSync(storageManager); } else { - msg = storageManager.createLargeMessage(); + msg = storageManager.createCoreLargeMessage(); } msg.setDurable(true); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java index dca4214f3c..07ca61957d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java @@ -65,6 +65,4 @@ public interface LargeServerMessage extends ReplicatedLargeMessage { LargeBody getLargeBody(); void setStorageManager(StorageManager storageManager); - - void validateFile() throws ActiveMQException; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java index d2470e5a6b..e8cb99b413 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java @@ -277,7 +277,7 @@ public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, Sessi if (message instanceof ClientLargeMessageInternal) { final StorageManager storageManager = server.getStorageManager(); - LargeServerMessage lsm = storageManager.createLargeMessage(storageManager.generateID(), message); + LargeServerMessage lsm = storageManager.createCoreLargeMessage(storageManager.generateID(), message); LargeData largeData = null; do { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 08773fbb2e..5886d5637e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -3879,7 +3879,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { for (Pair msgToDelete : pendingLargeMessages) { ActiveMQServerLogger.LOGGER.deletingPendingMessage(msgToDelete); - LargeServerMessage msg = storageManager.createLargeMessage(); + LargeServerMessage msg = storageManager.createCoreLargeMessage(); msg.setMessageID(msgToDelete.getB()); msg.setDurable(true); msg.deleteFile(); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java index 6f708ab95e..6b0c3005f6 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java @@ -579,17 +579,17 @@ public class TransactionImplTest extends ServerTestBase { } @Override - public LargeServerMessage createLargeMessage() { + public LargeServerMessage createCoreLargeMessage() { return null; } @Override - public LargeServerMessage createLargeMessage(long id, Message message) throws Exception { + public LargeServerMessage createCoreLargeMessage(long id, Message message) throws Exception { return null; } @Override - public LargeServerMessage largeMessageCreated(long id, LargeServerMessage largeMessage) throws Exception { + public LargeServerMessage onLargeMessageCreate(long id, LargeServerMessage largeMessage) throws Exception { return null; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java index 135df8e177..3769b00b23 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java @@ -300,8 +300,8 @@ public class SendAckFailTest extends SpawnedTestBase { } @Override - public LargeServerMessage largeMessageCreated(long id, LargeServerMessage largeMessage) throws Exception { - return manager.largeMessageCreated(id, largeMessage); + public LargeServerMessage onLargeMessageCreate(long id, LargeServerMessage largeMessage) throws Exception { + return manager.onLargeMessageCreate(id, largeMessage); } @Override @@ -547,13 +547,13 @@ public class SendAckFailTest extends SpawnedTestBase { } @Override - public LargeServerMessage createLargeMessage() { - return manager.createLargeMessage(); + public LargeServerMessage createCoreLargeMessage() { + return manager.createCoreLargeMessage(); } @Override - public LargeServerMessage createLargeMessage(long id, Message message) throws Exception { - return manager.createLargeMessage(id, message); + public LargeServerMessage createCoreLargeMessage(long id, Message message) throws Exception { + return manager.createCoreLargeMessage(id, message); } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java index 57bfbcdbde..d6fd127ccc 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java @@ -574,7 +574,7 @@ public final class ReplicationTest extends ActiveMQTestBase { waitForComponent(manager); CoreMessage msg = new CoreMessage().initBuffer(1024).setMessageID(1); - LargeServerMessage largeMsg = liveServer.getStorageManager().createLargeMessage(500, msg); + LargeServerMessage largeMsg = liveServer.getStorageManager().createCoreLargeMessage(500, msg); largeMsg.addBytes(new byte[1024]); largeMsg.releaseResources(true, true);