ARTEMIS-4171 potential large message file leak
This commit is contained in:
parent
c123a29f8e
commit
82fc42987a
|
@ -1106,7 +1106,12 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
||||||
LargeServerMessage message = currentLargeMessage;
|
LargeServerMessage message = currentLargeMessage;
|
||||||
currentLargeMessage.setStorageManager(storageManager);
|
currentLargeMessage.setStorageManager(storageManager);
|
||||||
currentLargeMessage = null;
|
currentLargeMessage = null;
|
||||||
session.doSend(session.getCurrentTransaction(), EmbedMessageUtil.extractEmbedded((ICoreMessage)message.toMessage(), storageManager), null, false, producers.get(senderID), false);
|
try {
|
||||||
|
session.doSend(session.getCurrentTransaction(), EmbedMessageUtil.extractEmbedded((ICoreMessage) message.toMessage(), storageManager), null, false, producers.get(senderID), false);
|
||||||
|
} catch (Exception e) {
|
||||||
|
message.deleteFile();
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,6 +26,7 @@ import javax.transaction.xa.Xid;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
import java.lang.invoke.MethodHandles;
|
||||||
import java.lang.management.ManagementFactory;
|
import java.lang.management.ManagementFactory;
|
||||||
import java.lang.management.OperatingSystemMXBean;
|
import java.lang.management.OperatingSystemMXBean;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
@ -39,6 +40,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import com.sun.management.UnixOperatingSystemMXBean;
|
import com.sun.management.UnixOperatingSystemMXBean;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
import org.apache.activemq.artemis.api.core.Message;
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
|
@ -62,7 +64,9 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.server.LargeServerMessage;
|
import org.apache.activemq.artemis.core.server.LargeServerMessage;
|
||||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
|
import org.apache.activemq.artemis.core.server.RoutingContext;
|
||||||
import org.apache.activemq.artemis.core.server.ServerProducer;
|
import org.apache.activemq.artemis.core.server.ServerProducer;
|
||||||
|
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase;
|
import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase;
|
||||||
|
@ -77,7 +81,6 @@ import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import java.lang.invoke.MethodHandles;
|
|
||||||
|
|
||||||
public class LargeMessageTest extends LargeMessageTestBase {
|
public class LargeMessageTest extends LargeMessageTestBase {
|
||||||
|
|
||||||
|
@ -326,6 +329,52 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
||||||
validateNoFilesOnLargeDir();
|
validateNoFilesOnLargeDir();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFileRemovalOnFailure() throws Exception {
|
||||||
|
final AtomicBoolean throwException = new AtomicBoolean(false);
|
||||||
|
final String queueName = RandomUtil.randomString();
|
||||||
|
final int messageSize = (int) (3.5 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
|
||||||
|
|
||||||
|
ActiveMQServer server = createServer(true, isNetty(), storeType);
|
||||||
|
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
server.registerBrokerPlugin(new ActiveMQServerMessagePlugin() {
|
||||||
|
@Override
|
||||||
|
public void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) throws ActiveMQException {
|
||||||
|
if (throwException.get()) {
|
||||||
|
throw new ActiveMQException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
server.createQueue(new QueueConfiguration(queueName));
|
||||||
|
|
||||||
|
ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));
|
||||||
|
|
||||||
|
ClientSession session = addClientSession(sf.createSession(false, true, false));
|
||||||
|
|
||||||
|
ClientProducer producer = session.createProducer(queueName);
|
||||||
|
|
||||||
|
Message clientFile = createLargeClientMessageStreaming(session, messageSize, true);
|
||||||
|
|
||||||
|
try {
|
||||||
|
throwException.set(true);
|
||||||
|
producer.send(clientFile);
|
||||||
|
fail("Should have thrown an exception here");
|
||||||
|
} catch (Exception e) {
|
||||||
|
// expected exception from plugin
|
||||||
|
} finally {
|
||||||
|
throwException.set(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(0, server.locateQueue(queueName).getMessageCount());
|
||||||
|
|
||||||
|
session.close();
|
||||||
|
|
||||||
|
validateNoFilesOnLargeDir();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPendingRecord() throws Exception {
|
public void testPendingRecord() throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue