diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java index 578db6b705..fd7eeeb4f3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java @@ -320,6 +320,7 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L if (!originallyOpen) { file.close(); + newMessage.getFile().close(); } return newMessage; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/DeadLetterAddressTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/DeadLetterAddressTest.java index 8ce7a3ea9e..de151c2fbd 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/DeadLetterAddressTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/DeadLetterAddressTest.java @@ -16,6 +16,15 @@ */ package org.apache.activemq.artemis.tests.integration.client; +import java.io.File; +import java.lang.management.ManagementFactory; +import java.lang.management.OperatingSystemMXBean; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import com.sun.management.UnixOperatingSystemMXBean; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; @@ -34,14 +43,10 @@ import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.RandomUtil; import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; import org.junit.Test; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - public class DeadLetterAddressTest extends ActiveMQTestBase { private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER; @@ -82,6 +87,44 @@ public class DeadLetterAddressTest extends ActiveMQTestBase { Assert.assertEquals(m.getBodyBuffer().readString(), "heyho!"); } + @Test + public void testLargeMessageFileLeak() throws Exception { + OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean(); + + // only run this on *nix systems which will have the com.sun.management.UnixOperatingSystemMXBean (needed to check open file count) + Assume.assumeTrue(os instanceof UnixOperatingSystemMXBean); + + long fdBaseline = ((UnixOperatingSystemMXBean) os).getOpenFileDescriptorCount(); + final int SIZE = 2 * 1024; + SimpleString dla = new SimpleString("DLA"); + SimpleString qName = new SimpleString("q1"); + SimpleString adName = new SimpleString("ad1"); + + AddressSettings addressSettings = new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla); + server.getAddressSettingsRepository().addMatch(adName.toString(), addressSettings); + SimpleString dlq = new SimpleString("DLQ1"); + clientSession.createQueue(dla, dlq, null, false); + clientSession.createQueue(adName, qName, null, false); + for (int i = 0; i < 10; i++) { + ClientProducer producer = clientSession.createProducer(adName); + ClientMessage clientFile = clientSession.createMessage(true); + clientFile.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(SIZE)); + producer.send(clientFile); + clientSession.start(); + ClientConsumer clientConsumer = clientSession.createConsumer(qName); + ClientMessage m = clientConsumer.receive(500); + m.acknowledge(); + Assert.assertNotNull(m); + + // force a cancel + clientSession.rollback(); + m = clientConsumer.receiveImmediate(); + Assert.assertNull(m); + clientConsumer.close(); + } + assertEquals("File descriptors are leaking", 0, ((UnixOperatingSystemMXBean) os).getOpenFileDescriptorCount() - fdBaseline); + } + // HORNETQ- 1084 @Test public void testBasicSendWithDLAButNoBinding() throws Exception { @@ -462,10 +505,11 @@ public class DeadLetterAddressTest extends ActiveMQTestBase { @Before public void setUp() throws Exception { super.setUp(); - server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig(), false)); + server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig(), true)); server.start(); // then we create a client as normal locator = createInVMNonHALocator(); + locator.setMinLargeMessageSize(1024); ClientSessionFactory sessionFactory = createSessionFactory(locator); clientSession = addClientSession(sessionFactory.createSession(false, true, false)); }