ARTEMIS-739 fix large msg file leak on copy
This commit is contained in:
parent
b1a06b8857
commit
a6974596a0
|
@ -320,6 +320,7 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L
|
|||
|
||||
if (!originallyOpen) {
|
||||
file.close();
|
||||
newMessage.getFile().close();
|
||||
}
|
||||
|
||||
return newMessage;
|
||||
|
|
|
@ -16,6 +16,15 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.client;
|
||||
|
||||
import java.io.File;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.lang.management.OperatingSystemMXBean;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import com.sun.management.UnixOperatingSystemMXBean;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
|
@ -34,14 +43,10 @@ import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
|||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Assume;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class DeadLetterAddressTest extends ActiveMQTestBase {
|
||||
|
||||
private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
|
||||
|
@ -82,6 +87,44 @@ public class DeadLetterAddressTest extends ActiveMQTestBase {
|
|||
Assert.assertEquals(m.getBodyBuffer().readString(), "heyho!");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLargeMessageFileLeak() throws Exception {
|
||||
OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean();
|
||||
|
||||
// only run this on *nix systems which will have the com.sun.management.UnixOperatingSystemMXBean (needed to check open file count)
|
||||
Assume.assumeTrue(os instanceof UnixOperatingSystemMXBean);
|
||||
|
||||
long fdBaseline = ((UnixOperatingSystemMXBean) os).getOpenFileDescriptorCount();
|
||||
final int SIZE = 2 * 1024;
|
||||
SimpleString dla = new SimpleString("DLA");
|
||||
SimpleString qName = new SimpleString("q1");
|
||||
SimpleString adName = new SimpleString("ad1");
|
||||
|
||||
AddressSettings addressSettings = new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla);
|
||||
server.getAddressSettingsRepository().addMatch(adName.toString(), addressSettings);
|
||||
SimpleString dlq = new SimpleString("DLQ1");
|
||||
clientSession.createQueue(dla, dlq, null, false);
|
||||
clientSession.createQueue(adName, qName, null, false);
|
||||
for (int i = 0; i < 10; i++) {
|
||||
ClientProducer producer = clientSession.createProducer(adName);
|
||||
ClientMessage clientFile = clientSession.createMessage(true);
|
||||
clientFile.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(SIZE));
|
||||
producer.send(clientFile);
|
||||
clientSession.start();
|
||||
ClientConsumer clientConsumer = clientSession.createConsumer(qName);
|
||||
ClientMessage m = clientConsumer.receive(500);
|
||||
m.acknowledge();
|
||||
Assert.assertNotNull(m);
|
||||
|
||||
// force a cancel
|
||||
clientSession.rollback();
|
||||
m = clientConsumer.receiveImmediate();
|
||||
Assert.assertNull(m);
|
||||
clientConsumer.close();
|
||||
}
|
||||
assertEquals("File descriptors are leaking", 0, ((UnixOperatingSystemMXBean) os).getOpenFileDescriptorCount() - fdBaseline);
|
||||
}
|
||||
|
||||
// HORNETQ- 1084
|
||||
@Test
|
||||
public void testBasicSendWithDLAButNoBinding() throws Exception {
|
||||
|
@ -462,10 +505,11 @@ public class DeadLetterAddressTest extends ActiveMQTestBase {
|
|||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig(), false));
|
||||
server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig(), true));
|
||||
server.start();
|
||||
// then we create a client as normal
|
||||
locator = createInVMNonHALocator();
|
||||
locator.setMinLargeMessageSize(1024);
|
||||
ClientSessionFactory sessionFactory = createSessionFactory(locator);
|
||||
clientSession = addClientSession(sessionFactory.createSession(false, true, false));
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue