diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index 7661df477e..2ab5d6c66d 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -278,6 +278,8 @@ public final class ActiveMQDefaultConfiguration {
// if true wait for transaction data to be synchronized to the journal before returning response to client
private static boolean DEFAULT_JOURNAL_SYNC_TRANSACTIONAL = true;
+ private static boolean DEFAULT_LARGE_MESSAGE_SYNC = true;
+
// if true wait for non transaction data to be synced to the journal before returning response to client.
private static boolean DEFAULT_JOURNAL_SYNC_NON_TRANSACTIONAL = true;
@@ -1019,6 +1021,10 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_JOURNAL_SYNC_TRANSACTIONAL;
}
+ public static boolean isDefaultLargeMessageSync() {
+ return DEFAULT_LARGE_MESSAGE_SYNC;
+ }
+
/**
* if true wait for non transaction data to be synced to the journal before returning response to client.
*/
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
index 25421e0ef4..21d9373f55 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
@@ -182,6 +182,9 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
return connection.isWritable(readyListener);
}
+ public boolean isLargeMessageSync() {
+ return server.getConfiguration().isLargeMessageSync();
+ }
public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) {
return new AMQPSessionCallback(this, manager, connection, this.connection, sessionExecutor, server.newOperationContext());
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
index c5b07f3d02..e771e3b1d0 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
@@ -183,6 +183,10 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
}
}
+ public boolean isLargeMessageSync() {
+ return connectionCallback.isLargeMessageSync();
+ }
+
@Override
public void initialize() throws Exception {
initialized = true;
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java
index 5404c1d39b..944427f2a8 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java
@@ -101,7 +101,7 @@ public class AMQPLargeMessageReader implements MessageReader {
final AMQPLargeMessage result;
if (!delivery.isPartial()) {
- currentMessage.releaseResources(true, true);
+ currentMessage.releaseResources(serverReceiver.getConnection().isLargeMessageSync(), true);
result = currentMessage;
// We don't want a close to delete the file now, we've released the resources.
currentMessage = null;
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageReader.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageReader.java
index 1a79ad1210..2fa661d75a 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageReader.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageReader.java
@@ -204,7 +204,7 @@ public class AMQPTunneledCoreLargeMessageReader implements MessageReader {
final Message result = coreLargeMessage.toMessage();
// We don't want a close to delete the file now, so we release these resources.
- coreLargeMessage.releaseResources(true, true);
+ coreLargeMessage.releaseResources(serverReceiver.getConnection().isLargeMessageSync(), true);
coreLargeMessage = null;
state = State.DONE;
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageReaderTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageReaderTest.java
index 068aaf5399..68b74b30d8 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageReaderTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageReaderTest.java
@@ -74,6 +74,9 @@ public class AMQPTunneledCoreLargeMessageReaderTest {
@Mock
AMQPSessionCallback sessionSPI;
+ @Mock
+ AMQPConnectionContext connectionContext;
+
@Spy
NullStorageManager nullStoreManager = new NullStorageManager();
@@ -81,6 +84,8 @@ public class AMQPTunneledCoreLargeMessageReaderTest {
public void setUp() {
MockitoAnnotations.openMocks(this);
+ when(serverReceiver.getConnection()).thenReturn(connectionContext);
+ when(connectionContext.isLargeMessageSync()).thenReturn(true);
when(serverReceiver.getSessionContext()).thenReturn(sessionContext);
when(sessionContext.getSessionSPI()).thenReturn(sessionSPI);
when(sessionSPI.getStorageManager()).thenReturn(nullStoreManager);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
index 4e8067866c..503304ede3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
@@ -1461,4 +1461,8 @@ public interface Configuration {
String getLiteralMatchMarkers();
Configuration setLiteralMatchMarkers(String literalMatchMarkers);
+
+ Configuration setLargeMessageSync(boolean largeMessageSync);
+
+ boolean isLargeMessageSync();
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
index 5b70b38894..ea76a66268 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
@@ -256,6 +256,8 @@ public class ConfigurationImpl implements Configuration, Serializable {
public JournalType journalType = ConfigurationImpl.DEFAULT_JOURNAL_TYPE;
+ protected boolean largeMessageSync = ActiveMQDefaultConfiguration.isDefaultLargeMessageSync();
+
protected boolean journalSyncTransactional = ActiveMQDefaultConfiguration.isDefaultJournalSyncTransactional();
protected boolean journalSyncNonTransactional = ActiveMQDefaultConfiguration.isDefaultJournalSyncNonTransactional();
@@ -3207,6 +3209,18 @@ public class ConfigurationImpl implements Configuration, Serializable {
return this;
}
+
+ @Override
+ public Configuration setLargeMessageSync(boolean largeMessageSync) {
+ this.largeMessageSync = largeMessageSync;
+ return this;
+ }
+
+ @Override
+ public boolean isLargeMessageSync() {
+ return largeMessageSync;
+ }
+
// extend property utils with ability to auto-fill and locate from collections
// collection entries are identified by the name() property
private static class CollectionAutoFillPropertiesUtil extends PropertyUtilsBean {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 3f2649657b..d8fcab9d65 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -812,6 +812,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
config.setLiteralMatchMarkers(getString(e, "literal-match-markers", config.getLiteralMatchMarkers(), Validators.NULL_OR_TWO_CHARACTERS));
+ config.setLargeMessageSync(getBoolean(e, "large-message-sync", config.isLargeMessageSync()));
+
parseAddressSettings(e, config);
parseResourceLimits(e, config);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index c570359e74..463a6cd818 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -161,6 +161,8 @@ public class ServerSessionPacketHandler implements ChannelHandler {
private static final String PRODUCER_ID_PREFIX = "artemis:sender:ID:";
+ private final ActiveMQServer server;
+
private final ServerSession session;
private final StorageManager storageManager;
@@ -191,6 +193,8 @@ public class ServerSessionPacketHandler implements ChannelHandler {
final Channel channel) {
this.session = session;
+ this.server = server;
+
session.addCloseable((boolean failed) -> clearLargeMessage());
this.storageManager = server.getStorageManager();
@@ -1121,7 +1125,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
currentLargeMessage.addBytes(body);
if (!continues) {
- currentLargeMessage.releaseResources(true, true);
+ currentLargeMessage.releaseResources(server.getConfiguration().isLargeMessageSync(), true);
if (messageBodySize >= 0) {
currentLargeMessage.toMessage().putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize);
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 6186ff923e..a6b57371c0 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -83,6 +83,14 @@
+
+
+
+ Should sync large messages before closing the file.
+
+
+
+
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
index 57e3d1a6c1..a5f770bd12 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
@@ -709,12 +709,21 @@ public class FileConfigurationParserTest extends ActiveMQTestBase {
}
Assert.assertTrue(exception);
-
-
-
}
+ @Test
+ public void testSyncLargeMessage() throws Throwable {
+ StringPrintStream stringPrintStream = new StringPrintStream();
+ PrintStream stream = stringPrintStream.newStream();
+ stream.println("");
+ stream.println("false");
+ stream.println("");
+ FileConfigurationParser parser = new FileConfigurationParser();
+ ByteArrayInputStream inputStream = new ByteArrayInputStream(stringPrintStream.getBytes());
+ Configuration configuration = parser.parseMainConfig(inputStream);
+ Assert.assertFalse(configuration.isLargeMessageSync());
+ }
private static String firstPart = "" + "\n" +
"ActiveMQ.main.config" + "\n" +