From 379dd4088f7b6cde3a09f4bbd72855a9d1537104 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Tue, 19 Dec 2023 13:37:56 -0500 Subject: [PATCH] ARTEMIS-4544 Option to sync large messages --- .../api/config/ActiveMQDefaultConfiguration.java | 6 ++++++ .../amqp/broker/AMQPConnectionCallback.java | 3 +++ .../amqp/proton/AMQPConnectionContext.java | 4 ++++ .../amqp/proton/AMQPLargeMessageReader.java | 2 +- .../AMQPTunneledCoreLargeMessageReader.java | 2 +- .../AMQPTunneledCoreLargeMessageReaderTest.java | 5 +++++ .../artemis/core/config/Configuration.java | 4 ++++ .../core/config/impl/ConfigurationImpl.java | 14 ++++++++++++++ .../deployers/impl/FileConfigurationParser.java | 2 ++ .../protocol/core/ServerSessionPacketHandler.java | 6 +++++- .../resources/schema/artemis-configuration.xsd | 8 ++++++++ .../config/impl/FileConfigurationParserTest.java | 15 ++++++++++++--- 12 files changed, 65 insertions(+), 6 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index 7661df477e..2ab5d6c66d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -278,6 +278,8 @@ public final class ActiveMQDefaultConfiguration { // if true wait for transaction data to be synchronized to the journal before returning response to client private static boolean DEFAULT_JOURNAL_SYNC_TRANSACTIONAL = true; + private static boolean DEFAULT_LARGE_MESSAGE_SYNC = true; + // if true wait for non transaction data to be synced to the journal before returning response to client. private static boolean DEFAULT_JOURNAL_SYNC_NON_TRANSACTIONAL = true; @@ -1019,6 +1021,10 @@ public final class ActiveMQDefaultConfiguration { return DEFAULT_JOURNAL_SYNC_TRANSACTIONAL; } + public static boolean isDefaultLargeMessageSync() { + return DEFAULT_LARGE_MESSAGE_SYNC; + } + /** * if true wait for non transaction data to be synced to the journal before returning response to client. */ diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java index 25421e0ef4..21d9373f55 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java @@ -182,6 +182,9 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener { return connection.isWritable(readyListener); } + public boolean isLargeMessageSync() { + return server.getConfiguration().isLargeMessageSync(); + } public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) { return new AMQPSessionCallback(this, manager, connection, this.connection, sessionExecutor, server.newOperationContext()); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java index c5b07f3d02..e771e3b1d0 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java @@ -183,6 +183,10 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH } } + public boolean isLargeMessageSync() { + return connectionCallback.isLargeMessageSync(); + } + @Override public void initialize() throws Exception { initialized = true; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java index 5404c1d39b..944427f2a8 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java @@ -101,7 +101,7 @@ public class AMQPLargeMessageReader implements MessageReader { final AMQPLargeMessage result; if (!delivery.isPartial()) { - currentMessage.releaseResources(true, true); + currentMessage.releaseResources(serverReceiver.getConnection().isLargeMessageSync(), true); result = currentMessage; // We don't want a close to delete the file now, we've released the resources. currentMessage = null; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageReader.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageReader.java index 1a79ad1210..2fa661d75a 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageReader.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageReader.java @@ -204,7 +204,7 @@ public class AMQPTunneledCoreLargeMessageReader implements MessageReader { final Message result = coreLargeMessage.toMessage(); // We don't want a close to delete the file now, so we release these resources. - coreLargeMessage.releaseResources(true, true); + coreLargeMessage.releaseResources(serverReceiver.getConnection().isLargeMessageSync(), true); coreLargeMessage = null; state = State.DONE; diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageReaderTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageReaderTest.java index 068aaf5399..68b74b30d8 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageReaderTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageReaderTest.java @@ -74,6 +74,9 @@ public class AMQPTunneledCoreLargeMessageReaderTest { @Mock AMQPSessionCallback sessionSPI; + @Mock + AMQPConnectionContext connectionContext; + @Spy NullStorageManager nullStoreManager = new NullStorageManager(); @@ -81,6 +84,8 @@ public class AMQPTunneledCoreLargeMessageReaderTest { public void setUp() { MockitoAnnotations.openMocks(this); + when(serverReceiver.getConnection()).thenReturn(connectionContext); + when(connectionContext.isLargeMessageSync()).thenReturn(true); when(serverReceiver.getSessionContext()).thenReturn(sessionContext); when(sessionContext.getSessionSPI()).thenReturn(sessionSPI); when(sessionSPI.getStorageManager()).thenReturn(nullStoreManager); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java index 4e8067866c..503304ede3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java @@ -1461,4 +1461,8 @@ public interface Configuration { String getLiteralMatchMarkers(); Configuration setLiteralMatchMarkers(String literalMatchMarkers); + + Configuration setLargeMessageSync(boolean largeMessageSync); + + boolean isLargeMessageSync(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java index 5b70b38894..ea76a66268 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java @@ -256,6 +256,8 @@ public class ConfigurationImpl implements Configuration, Serializable { public JournalType journalType = ConfigurationImpl.DEFAULT_JOURNAL_TYPE; + protected boolean largeMessageSync = ActiveMQDefaultConfiguration.isDefaultLargeMessageSync(); + protected boolean journalSyncTransactional = ActiveMQDefaultConfiguration.isDefaultJournalSyncTransactional(); protected boolean journalSyncNonTransactional = ActiveMQDefaultConfiguration.isDefaultJournalSyncNonTransactional(); @@ -3207,6 +3209,18 @@ public class ConfigurationImpl implements Configuration, Serializable { return this; } + + @Override + public Configuration setLargeMessageSync(boolean largeMessageSync) { + this.largeMessageSync = largeMessageSync; + return this; + } + + @Override + public boolean isLargeMessageSync() { + return largeMessageSync; + } + // extend property utils with ability to auto-fill and locate from collections // collection entries are identified by the name() property private static class CollectionAutoFillPropertiesUtil extends PropertyUtilsBean { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index 3f2649657b..d8fcab9d65 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -812,6 +812,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { config.setLiteralMatchMarkers(getString(e, "literal-match-markers", config.getLiteralMatchMarkers(), Validators.NULL_OR_TWO_CHARACTERS)); + config.setLargeMessageSync(getBoolean(e, "large-message-sync", config.isLargeMessageSync())); + parseAddressSettings(e, config); parseResourceLimits(e, config); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index c570359e74..463a6cd818 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -161,6 +161,8 @@ public class ServerSessionPacketHandler implements ChannelHandler { private static final String PRODUCER_ID_PREFIX = "artemis:sender:ID:"; + private final ActiveMQServer server; + private final ServerSession session; private final StorageManager storageManager; @@ -191,6 +193,8 @@ public class ServerSessionPacketHandler implements ChannelHandler { final Channel channel) { this.session = session; + this.server = server; + session.addCloseable((boolean failed) -> clearLargeMessage()); this.storageManager = server.getStorageManager(); @@ -1121,7 +1125,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { currentLargeMessage.addBytes(body); if (!continues) { - currentLargeMessage.releaseResources(true, true); + currentLargeMessage.releaseResources(server.getConfiguration().isLargeMessageSync(), true); if (messageBodySize >= 0) { currentLargeMessage.toMessage().putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize); diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 6186ff923e..a6b57371c0 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -83,6 +83,14 @@ + + + + Should sync large messages before closing the file. + + + + diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java index 57e3d1a6c1..a5f770bd12 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java @@ -709,12 +709,21 @@ public class FileConfigurationParserTest extends ActiveMQTestBase { } Assert.assertTrue(exception); - - - } + @Test + public void testSyncLargeMessage() throws Throwable { + StringPrintStream stringPrintStream = new StringPrintStream(); + PrintStream stream = stringPrintStream.newStream(); + stream.println(""); + stream.println("false"); + stream.println(""); + FileConfigurationParser parser = new FileConfigurationParser(); + ByteArrayInputStream inputStream = new ByteArrayInputStream(stringPrintStream.getBytes()); + Configuration configuration = parser.parseMainConfig(inputStream); + Assert.assertFalse(configuration.isLargeMessageSync()); + } private static String firstPart = "" + "\n" + "ActiveMQ.main.config" + "\n" +