This commit is contained in:
Clebert Suconic 2018-08-29 13:40:19 -04:00
commit 218e3a97fe
2 changed files with 30 additions and 23 deletions

View File

@ -160,6 +160,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
private final boolean direct;
private final Object largeMessageLock = new Object();
public ServerSessionPacketHandler(final ActiveMQServer server,
final CoreProtocolManager manager,
@ -196,11 +197,15 @@ public class ServerSessionPacketHandler implements ChannelHandler {
}
private void clearLargeMessage() {
if (currentLargeMessage != null) {
try {
currentLargeMessage.deleteFile();
} catch (Throwable error) {
ActiveMQServerLogger.LOGGER.errorDeletingLargeMessageFile(error);
synchronized (largeMessageLock) {
if (currentLargeMessage != null) {
try {
currentLargeMessage.deleteFile();
} catch (Throwable error) {
ActiveMQServerLogger.LOGGER.errorDeletingLargeMessageFile(error);
} finally {
currentLargeMessage = null;
}
}
}
}
@ -958,26 +963,28 @@ public class ServerSessionPacketHandler implements ChannelHandler {
final long messageBodySize,
final byte[] body,
final boolean continues) throws Exception {
if (currentLargeMessage == null) {
throw ActiveMQMessageBundle.BUNDLE.largeMessageNotInitialised();
}
// Immediately release the credits for the continuations- these don't contribute to the in-memory size
// of the message
currentLargeMessage.addBytes(body);
if (!continues) {
currentLargeMessage.releaseResources();
if (messageBodySize >= 0) {
currentLargeMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize);
synchronized (largeMessageLock) {
if (currentLargeMessage == null) {
throw ActiveMQMessageBundle.BUNDLE.largeMessageNotInitialised();
}
LargeServerMessage message = currentLargeMessage;
currentLargeMessage = null;
session.doSend(session.getCurrentTransaction(), message, null, false, false);
// Immediately release the credits for the continuations- these don't contribute to the in-memory size
// of the message
currentLargeMessage.addBytes(body);
if (!continues) {
currentLargeMessage.releaseResources();
if (messageBodySize >= 0) {
currentLargeMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize);
}
LargeServerMessage message = currentLargeMessage;
currentLargeMessage = null;
session.doSend(session.getCurrentTransaction(), message, null, false, false);
}
}
}
}

View File

@ -73,7 +73,7 @@ public class LargeMessageOnShutdownTest extends ActiveMQTestBase {
condition = "!flagged(\"testLargeMessageOnShutdown\")",
action =
"org.apache.activemq.artemis.tests.extras.byteman.LargeMessageOnShutdownTest.stopServer();" +
"waitFor(\"testLargeMessageOnShutdown\");" +
"waitFor(\"testLargeMessageOnShutdown\", 5000);" +
"flag(\"testLargeMessageOnShutdown\")"
),
@BMRule(