ARTEMIS-4544 Option to sync large messages

This commit is contained in:
Clebert Suconic 2023-12-19 13:37:56 -05:00 committed by clebertsuconic
parent 9b76574de2
commit 379dd4088f
12 changed files with 65 additions and 6 deletions

View File

@ -278,6 +278,8 @@ public final class ActiveMQDefaultConfiguration {
// if true wait for transaction data to be synchronized to the journal before returning response to client
private static boolean DEFAULT_JOURNAL_SYNC_TRANSACTIONAL = true;
private static boolean DEFAULT_LARGE_MESSAGE_SYNC = true;
// if true wait for non transaction data to be synced to the journal before returning response to client.
private static boolean DEFAULT_JOURNAL_SYNC_NON_TRANSACTIONAL = true;
@ -1019,6 +1021,10 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_JOURNAL_SYNC_TRANSACTIONAL;
}
public static boolean isDefaultLargeMessageSync() {
return DEFAULT_LARGE_MESSAGE_SYNC;
}
/**
* if true wait for non transaction data to be synced to the journal before returning response to client.
*/

View File

@ -182,6 +182,9 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
return connection.isWritable(readyListener);
}
public boolean isLargeMessageSync() {
return server.getConfiguration().isLargeMessageSync();
}
public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) {
return new AMQPSessionCallback(this, manager, connection, this.connection, sessionExecutor, server.newOperationContext());

View File

@ -183,6 +183,10 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
}
}
public boolean isLargeMessageSync() {
return connectionCallback.isLargeMessageSync();
}
@Override
public void initialize() throws Exception {
initialized = true;

View File

@ -101,7 +101,7 @@ public class AMQPLargeMessageReader implements MessageReader {
final AMQPLargeMessage result;
if (!delivery.isPartial()) {
currentMessage.releaseResources(true, true);
currentMessage.releaseResources(serverReceiver.getConnection().isLargeMessageSync(), true);
result = currentMessage;
// We don't want a close to delete the file now, we've released the resources.
currentMessage = null;

View File

@ -204,7 +204,7 @@ public class AMQPTunneledCoreLargeMessageReader implements MessageReader {
final Message result = coreLargeMessage.toMessage();
// We don't want a close to delete the file now, so we release these resources.
coreLargeMessage.releaseResources(true, true);
coreLargeMessage.releaseResources(serverReceiver.getConnection().isLargeMessageSync(), true);
coreLargeMessage = null;
state = State.DONE;

View File

@ -74,6 +74,9 @@ public class AMQPTunneledCoreLargeMessageReaderTest {
@Mock
AMQPSessionCallback sessionSPI;
@Mock
AMQPConnectionContext connectionContext;
@Spy
NullStorageManager nullStoreManager = new NullStorageManager();
@ -81,6 +84,8 @@ public class AMQPTunneledCoreLargeMessageReaderTest {
public void setUp() {
MockitoAnnotations.openMocks(this);
when(serverReceiver.getConnection()).thenReturn(connectionContext);
when(connectionContext.isLargeMessageSync()).thenReturn(true);
when(serverReceiver.getSessionContext()).thenReturn(sessionContext);
when(sessionContext.getSessionSPI()).thenReturn(sessionSPI);
when(sessionSPI.getStorageManager()).thenReturn(nullStoreManager);

View File

@ -1461,4 +1461,8 @@ public interface Configuration {
String getLiteralMatchMarkers();
Configuration setLiteralMatchMarkers(String literalMatchMarkers);
Configuration setLargeMessageSync(boolean largeMessageSync);
boolean isLargeMessageSync();
}

View File

@ -256,6 +256,8 @@ public class ConfigurationImpl implements Configuration, Serializable {
public JournalType journalType = ConfigurationImpl.DEFAULT_JOURNAL_TYPE;
protected boolean largeMessageSync = ActiveMQDefaultConfiguration.isDefaultLargeMessageSync();
protected boolean journalSyncTransactional = ActiveMQDefaultConfiguration.isDefaultJournalSyncTransactional();
protected boolean journalSyncNonTransactional = ActiveMQDefaultConfiguration.isDefaultJournalSyncNonTransactional();
@ -3207,6 +3209,18 @@ public class ConfigurationImpl implements Configuration, Serializable {
return this;
}
@Override
public Configuration setLargeMessageSync(boolean largeMessageSync) {
this.largeMessageSync = largeMessageSync;
return this;
}
@Override
public boolean isLargeMessageSync() {
return largeMessageSync;
}
// extend property utils with ability to auto-fill and locate from collections
// collection entries are identified by the name() property
private static class CollectionAutoFillPropertiesUtil extends PropertyUtilsBean {

View File

@ -812,6 +812,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
config.setLiteralMatchMarkers(getString(e, "literal-match-markers", config.getLiteralMatchMarkers(), Validators.NULL_OR_TWO_CHARACTERS));
config.setLargeMessageSync(getBoolean(e, "large-message-sync", config.isLargeMessageSync()));
parseAddressSettings(e, config);
parseResourceLimits(e, config);

View File

@ -161,6 +161,8 @@ public class ServerSessionPacketHandler implements ChannelHandler {
private static final String PRODUCER_ID_PREFIX = "artemis:sender:ID:";
private final ActiveMQServer server;
private final ServerSession session;
private final StorageManager storageManager;
@ -191,6 +193,8 @@ public class ServerSessionPacketHandler implements ChannelHandler {
final Channel channel) {
this.session = session;
this.server = server;
session.addCloseable((boolean failed) -> clearLargeMessage());
this.storageManager = server.getStorageManager();
@ -1121,7 +1125,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
currentLargeMessage.addBytes(body);
if (!continues) {
currentLargeMessage.releaseResources(true, true);
currentLargeMessage.releaseResources(server.getConfiguration().isLargeMessageSync(), true);
if (messageBodySize >= 0) {
currentLargeMessage.toMessage().putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize);

View File

@ -83,6 +83,14 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="large-message-sync" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Should sync large messages before closing the file.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="persistence-enabled" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>

View File

@ -709,12 +709,21 @@ public class FileConfigurationParserTest extends ActiveMQTestBase {
}
Assert.assertTrue(exception);
}
@Test
public void testSyncLargeMessage() throws Throwable {
StringPrintStream stringPrintStream = new StringPrintStream();
PrintStream stream = stringPrintStream.newStream();
stream.println("<configuration><core>");
stream.println("<large-message-sync>false</large-message-sync>");
stream.println("</core></configuration>");
FileConfigurationParser parser = new FileConfigurationParser();
ByteArrayInputStream inputStream = new ByteArrayInputStream(stringPrintStream.getBytes());
Configuration configuration = parser.parseMainConfig(inputStream);
Assert.assertFalse(configuration.isLargeMessageSync());
}
private static String firstPart = "<core xmlns=\"urn:activemq:core\">" + "\n" +
"<name>ActiveMQ.main.config</name>" + "\n" +