diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java index 171a27570f..771e745402 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java @@ -150,7 +150,7 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage } public void closeLargeMessage() throws Exception { - largeBody.releaseResources(false); + largeBody.releaseResources(false, true); parsingData.freeDirectBuffer(); parsingData = null; } @@ -443,8 +443,8 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage } @Override - public void releaseResources(boolean sync) { - largeBody.releaseResources(sync); + public void releaseResources(boolean sync, boolean sendEvent) { + largeBody.releaseResources(sync, sendEvent); } @@ -526,7 +526,7 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage } largeBody.copyInto(copy, bufferNewHeader, place.intValue()); - copy.releaseResources(true); + copy.releaseResources(true, true); return copy; } catch (Exception e) { diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java index 7aba86e923..8cdc9de769 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java @@ -380,7 +380,7 @@ public class StompSession implements SessionCallback { largeMessage.addBytes(bytes); - largeMessage.releaseResources(true); + largeMessage.releaseResources(true, true); largeMessage.toMessage().putLongProperty(Message.HDR_LARGE_BODY_SIZE, bytes.length); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java index a35d2a934e..48e44542c3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java @@ -272,6 +272,8 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent { */ SequentialFile createFileForLargeMessage(long messageID, LargeMessageExtension extension); + void largeMessageClosed(LargeServerMessage largeServerMessage) throws ActiveMQException; + void deleteLargeMessageBody(LargeServerMessage largeServerMessage) throws ActiveMQException; default SequentialFile createFileForLargeMessage(long messageID, boolean durable) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index 735ffcc349..b21e86831a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -449,6 +449,18 @@ public class JournalStorageManager extends AbstractJournalStorageManager { } } + @Override + public void largeMessageClosed(LargeServerMessage largeServerMessage) throws ActiveMQException { + readLock(); + try { + if (isReplicated()) { + replicator.largeMessageClosed(largeServerMessage.toMessage().getMessageID(), JournalStorageManager.this); + } + } finally { + readUnLock(); + } + } + @Override public void deleteLargeMessageBody(final LargeServerMessage largeServerMessage) throws ActiveMQException { synchronized (largeServerMessage) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java index 4b1f020f0d..a2e4273122 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java @@ -116,7 +116,7 @@ public class LargeBody { public synchronized void deleteFile() { try { validateFile(); - releaseResources(false); + releaseResources(false, false); storageManager.deleteLargeMessageBody(message); } catch (Exception e) { storageManager.criticalError(e); @@ -303,13 +303,20 @@ public class LargeBody { } } - public synchronized void releaseResources(boolean sync) { + /** + * sendEvent means it's a close happening from end of write largemessage. + * While reading the largemessage we don't need (and shouldn't inform the backup + */ + public synchronized void releaseResources(boolean sync, boolean sendEvent) { if (file != null && file.isOpen()) { try { if (sync) { file.sync(); } file.close(false, false); + if (sendEvent) { + storageManager.largeMessageClosed(message); + } } catch (Exception e) { ActiveMQServerLogger.LOGGER.largeMessageErrorReleasingResources(e); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java index 2ee61148f6..6809808b5a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java @@ -63,7 +63,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar ActiveMQBuffer buffer = coreMessage.getReadOnlyBodyBuffer(); final int readableBytes = buffer.readableBytes(); lsm.addBytes(buffer); - lsm.releaseResources(true); + lsm.releaseResources(true, true); lsm.toMessage().putLongProperty(Message.HDR_LARGE_BODY_SIZE, readableBytes); return lsm.toMessage(); } @@ -254,9 +254,9 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar } @Override - public void releaseResources(boolean sync) { + public void releaseResources(boolean sync, boolean sendEvent) { synchronized (largeBody) { - largeBody.releaseResources(sync); + largeBody.releaseResources(sync, sendEvent); } } @@ -293,7 +293,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar try { LargeServerMessage newMessage = storageManager.createLargeMessage(newID, this); largeBody.copyInto(newMessage); - newMessage.releaseResources(true); + newMessage.releaseResources(true, true); return newMessage.toMessage(); } catch (Exception e) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java index 62d5365e3d..dbf24807a9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java @@ -96,11 +96,11 @@ public final class LargeServerMessageInSync implements ReplicatedLargeMessage { } @Override - public synchronized void releaseResources(boolean sync) { + public synchronized void releaseResources(boolean sync, boolean sendEvent) { if (logger.isTraceEnabled()) { logger.trace("release resources called on " + mainLM, new Exception("trace")); } - mainLM.releaseResources(sync); + mainLM.releaseResources(sync, sendEvent); if (appendFile != null && appendFile.isOpen()) { try { appendFile.close(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java index 69ea514b8a..68c7f88882 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java @@ -38,7 +38,7 @@ class NullStorageLargeServerMessage extends CoreMessage implements CoreLargeServ } @Override - public void releaseResources(boolean sync) { + public void releaseResources(boolean sync, boolean sendEvent) { } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java index d1783b2f22..8375060347 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java @@ -607,6 +607,11 @@ public class NullStorageManager implements StorageManager { } + @Override + public void largeMessageClosed(LargeServerMessage largeServerMessage) throws ActiveMQException { + + } + @Override public boolean addToPage(PagingStore store, Message msg, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index 97d2963617..aea354ade0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -1035,7 +1035,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { currentLargeMessage.addBytes(body); if (!continues) { - currentLargeMessage.releaseResources(true); + currentLargeMessage.releaseResources(true, true); if (messageBodySize >= 0) { currentLargeMessage.toMessage().putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java index a9be86a4cf..2c1d808ace 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java @@ -24,31 +24,38 @@ public class ReplicationLargeMessageEndMessage extends PacketImpl { long messageId; long pendingRecordId; + /** + * True = delete file, False = close file + */ + private boolean isDelete; public ReplicationLargeMessageEndMessage() { super(PacketImpl.REPLICATION_LARGE_MESSAGE_END); } - public ReplicationLargeMessageEndMessage(final long messageId, final long pendingRecordId) { + public ReplicationLargeMessageEndMessage(final long messageId, final long pendingRecordId, final boolean isDelete) { this(); this.messageId = messageId; //we use negative value to indicate that this id is pre-generated by live node //so that it won't be generated at backup. //see https://issues.apache.org/jira/browse/ARTEMIS-1221 this.pendingRecordId = -pendingRecordId; + this.isDelete = isDelete; } @Override public int expectedEncodeSize() { return PACKET_HEADERS_SIZE + DataConstants.SIZE_LONG + // buffer.writeLong(messageId) - DataConstants.SIZE_LONG; // buffer.writeLong(pendingRecordId); + DataConstants.SIZE_LONG + // buffer.writeLong(pendingRecordId); + DataConstants.SIZE_BOOLEAN; // buffer.writeBoolean(isDelete); } @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeLong(messageId); buffer.writeLong(pendingRecordId); + buffer.writeBoolean(isDelete); } @Override @@ -57,6 +64,9 @@ public class ReplicationLargeMessageEndMessage extends PacketImpl { if (buffer.readableBytes() >= DataConstants.SIZE_LONG) { pendingRecordId = buffer.readLong(); } + if (buffer.readableBytes() >= DataConstants.SIZE_BOOLEAN) { + isDelete = buffer.readBoolean(); + } } /** @@ -70,6 +80,7 @@ public class ReplicationLargeMessageEndMessage extends PacketImpl { public int hashCode() { final int prime = 31; int result = super.hashCode(); + result = prime * result + (isDelete ? 1231 : 1237); result = prime * result + (int) (messageId ^ (messageId >>> 32)); return result; } @@ -77,7 +88,7 @@ public class ReplicationLargeMessageEndMessage extends PacketImpl { @Override public String toString() { return "ReplicationLargeMessageEndMessage{" + - "messageId=" + messageId + + "messageId=" + messageId + ", isDelete=" + isDelete + '}'; } @@ -92,10 +103,19 @@ public class ReplicationLargeMessageEndMessage extends PacketImpl { ReplicationLargeMessageEndMessage other = (ReplicationLargeMessageEndMessage) obj; if (messageId != other.messageId) return false; + if (isDelete != other.isDelete) + return false; return true; } public long getPendingRecordId() { return pendingRecordId; } + + /** + * @return the isDelete + */ + public boolean isDelete() { + return isDelete; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedLargeMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedLargeMessage.java index d63167fe9c..fdcd80834a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedLargeMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedLargeMessage.java @@ -38,9 +38,9 @@ public interface ReplicatedLargeMessage { Message setMessageID(long id); /** - * @see org.apache.activemq.artemis.core.server.LargeServerMessage#releaseResources(boolean) + * @see org.apache.activemq.artemis.core.server.LargeServerMessage#releaseResources(boolean,boolean) */ - void releaseResources(boolean sync); + void releaseResources(boolean sync, boolean sendEvent); /** * @see org.apache.activemq.artemis.core.server.LargeServerMessage#deleteFile() diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java index 17294760e2..debdfb1435 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java @@ -346,7 +346,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon } for (ReplicatedLargeMessage largeMessage : largeMessages.values()) { - largeMessage.releaseResources(true); + largeMessage.releaseResources(true, false); } largeMessages.clear(); @@ -615,22 +615,29 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon if (logger.isTraceEnabled()) { logger.trace("handleLargeMessageEnd on " + packet.getMessageId()); } - final ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), true, false); + final ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), packet.isDelete(), false); if (message != null) { message.setPendingRecordID(packet.getPendingRecordId()); - executor.execute(new Runnable() { - @Override - public void run() { - try { - if (logger.isTraceEnabled()) { - logger.trace("Deleting LargeMessage " + packet.getMessageId() + " on the executor @ handleLargeMessageEnd"); - } - message.deleteFile(); - } catch (Exception e) { - ActiveMQServerLogger.LOGGER.errorDeletingLargeMessage(e, packet.getMessageId()); - } + if (!packet.isDelete()) { + if (logger.isTraceEnabled()) { + logger.trace("Closing LargeMessage " + packet.getMessageId() + " on the executor @ handleLargeMessageEnd"); } - }); + message.releaseResources(true, false); + } else { + executor.execute(new Runnable() { + @Override + public void run() { + try { + if (logger.isTraceEnabled()) { + logger.trace("Deleting LargeMessage " + packet.getMessageId() + " on the executor @ handleLargeMessageEnd"); + } + message.deleteFile(); + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.errorDeletingLargeMessage(e, packet.getMessageId()); + } + } + }); + } } } @@ -903,4 +910,11 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon public void setExecutor(Executor executor2) { this.executor = executor2; } + + /** + * This is for tests basically, do not use it as its API is not guaranteed for future usage. + */ + public ConcurrentMap getLargeMessages() { + return largeMessages; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java index 6554cc8446..9a2d629465 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java @@ -256,7 +256,13 @@ public final class ReplicationManager implements ActiveMQComponent { public void largeMessageDelete(final Long messageId, JournalStorageManager storageManager) { if (enabled) { long pendingRecordID = storageManager.generateID(); - sendReplicatePacket(new ReplicationLargeMessageEndMessage(messageId, pendingRecordID)); + sendReplicatePacket(new ReplicationLargeMessageEndMessage(messageId, pendingRecordID, true)); + } + } + + public void largeMessageClosed(final Long messageId, JournalStorageManager storageManager) { + if (enabled) { + sendReplicatePacket(new ReplicationLargeMessageEndMessage(messageId, -1, false)); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java index 6375f3b2c1..b2d3aefeda 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java @@ -52,7 +52,7 @@ public interface LargeServerMessage extends ReplicatedLargeMessage { * Close the files if opened */ @Override - void releaseResources(boolean sync); + void releaseResources(boolean sync, boolean sendEvent); @Override void deleteFile() throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index a3b3ac7510..15864db020 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -1418,7 +1418,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { context = null; } - largeMessage.releaseResources(false); + largeMessage.releaseResources(false, false); largeMessage.toMessage().usageDown(); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java index 1ff7892d2c..2deefdf9e0 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java @@ -251,6 +251,11 @@ public class TransactionImplTest extends ActiveMQTestBase { } + @Override + public void largeMessageClosed(LargeServerMessage largeServerMessage) throws ActiveMQException { + + } + @Override public void addBytesToLargeMessage(SequentialFile file, long messageId, ActiveMQBuffer bytes) throws Exception { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPageTest.java index ff3755438a..4e7823695c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPageTest.java @@ -71,7 +71,7 @@ public class AmqpPageTest extends PageTest { } else { final AMQPLargeMessage message = createLargeMessage(storageManager, address, msgID, content); page.write(new PagedMessageImpl(message, new long[0])); - message.releaseResources(false); + message.releaseResources(false, false); } } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java index ad3a48b7b2..d98876b316 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java @@ -2495,7 +2495,7 @@ public class LargeMessageTest extends LargeMessageTestBase { // The server would be doing this fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, largeMessageSize); - fileMessage.releaseResources(false); + fileMessage.releaseResources(false, false); Assert.assertEquals(largeMessageSize, fileMessage.getBodyBufferSize()); } @@ -2522,7 +2522,7 @@ public class LargeMessageTest extends LargeMessageTestBase { // The server would be doing this fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, largeMessageSize); - fileMessage.releaseResources(false); + fileMessage.releaseResources(false, false); session.createQueue(new QueueConfiguration(ADDRESS)); @@ -2687,7 +2687,7 @@ public class LargeMessageTest extends LargeMessageTestBase { fileMessage.addBytes(new byte[]{ActiveMQTestBase.getSamplebyte(i)}); } - fileMessage.releaseResources(false); + fileMessage.releaseResources(false, false); session.createQueue(new QueueConfiguration(ADDRESS)); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java index 786de34427..d16067a97f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java @@ -862,6 +862,11 @@ public class SendAckFailTest extends SpawnedTestBase { manager.deleteLargeMessageBody(largeServerMessage); } + @Override + public void largeMessageClosed(LargeServerMessage largeServerMessage) throws ActiveMQException { + manager.largeMessageClosed(largeServerMessage); + } + @Override public void addBytesToLargeMessage(SequentialFile file, long messageId, ActiveMQBuffer bytes) throws Exception { manager.addBytesToLargeMessage(file, messageId, bytes); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/ServerLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/ServerLargeMessageTest.java index 35b960cdc3..18be7cd3a5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/ServerLargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/ServerLargeMessageTest.java @@ -116,7 +116,7 @@ public class ServerLargeMessageTest extends ActiveMQTestBase { // The server would be doing this fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE); - fileMessage.releaseResources(false); + fileMessage.releaseResources(false, true); session.createQueue(new QueueConfiguration("A").setRoutingType(RoutingType.ANYCAST)); @@ -339,7 +339,7 @@ public class ServerLargeMessageTest extends ActiveMQTestBase { largeServerMessage.setMessageID(1234); largeServerMessage.addBytes(new byte[0]); assertTrue(open.get()); - largeServerMessage.releaseResources(true); + largeServerMessage.releaseResources(true, true); assertTrue(sync.get()); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java index f4ee96f1af..cc0ec1a3c1 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java @@ -472,7 +472,7 @@ public class XmlImportExportTest extends ActiveMQTestBase { fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE); - fileMessage.releaseResources(false); + fileMessage.releaseResources(false, true); session.createQueue(new QueueConfiguration("A")); @@ -544,7 +544,7 @@ public class XmlImportExportTest extends ActiveMQTestBase { fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE); - fileMessage.releaseResources(false); + fileMessage.releaseResources(false, true); session.createQueue(new QueueConfiguration("A")); @@ -888,7 +888,7 @@ public class XmlImportExportTest extends ActiveMQTestBase { fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE); - fileMessage.releaseResources(false); + fileMessage.releaseResources(false, true); producer.send(fileMessage); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java index da64521aa2..b82e5824b2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java @@ -71,6 +71,7 @@ import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager; +import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl; import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl; import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.protocol.core.Packet; @@ -78,6 +79,7 @@ import org.apache.activemq.artemis.core.replication.ReplicatedJournal; import org.apache.activemq.artemis.core.replication.ReplicationManager; import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.cluster.ClusterController; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; @@ -539,6 +541,29 @@ public final class ReplicationTest extends ActiveMQTestBase { Assert.assertEquals(0, manager.getActiveTokens().size()); } + @Test + public void testReplicationLargeMessageFileClose() throws Exception { + setupServer(true); + + JournalStorageManager storage = getStorage(); + + manager = liveServer.getReplicationManager(); + waitForComponent(manager); + + CoreMessage msg = new CoreMessage().initBuffer(1024).setMessageID(1); + LargeServerMessage largeMsg = liveServer.getStorageManager().createLargeMessage(500, msg); + largeMsg.addBytes(new byte[1024]); + largeMsg.releaseResources(true, true); + + blockOnReplication(storage, manager); + + LargeServerMessageImpl message1 = (LargeServerMessageImpl) backupServer.getReplicationEndpoint().getLargeMessages().get(Long.valueOf(500)); + + Assert.assertNotNull(message1); + Assert.assertFalse(largeMsg.getAppendFile().isOpen()); + Assert.assertFalse(message1.getAppendFile().isOpen()); + } + class FakeData implements EncodingSupport { @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDown3NodeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDown3NodeTest.java index 4a6c78da98..726b079e79 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDown3NodeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDown3NodeTest.java @@ -144,7 +144,7 @@ public class ScaleDown3NodeTest extends ClusterTestBase { fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE); - fileMessage.releaseResources(false); + fileMessage.releaseResources(false, false); message = fileMessage; } else { diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java index eaf0359e14..bcf88084f0 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java @@ -293,7 +293,7 @@ public class PageTest extends ActiveMQTestBase { msg.addBytes(content); msg.setAddress(address); page.write(new PagedMessageImpl(msg, new long[0])); - msg.releaseResources(false); + msg.releaseResources(false, false); } else { ICoreMessage msg = new CoreMessage().initBuffer(100); msg.setMessageID(msgID);