This closes #806
This commit is contained in:
commit
0d3ff1a9d2
|
@ -320,6 +320,7 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L
|
||||||
|
|
||||||
if (!originallyOpen) {
|
if (!originallyOpen) {
|
||||||
file.close();
|
file.close();
|
||||||
|
newMessage.getFile().close();
|
||||||
}
|
}
|
||||||
|
|
||||||
return newMessage;
|
return newMessage;
|
||||||
|
|
|
@ -16,6 +16,15 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.tests.integration.client;
|
package org.apache.activemq.artemis.tests.integration.client;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.lang.management.ManagementFactory;
|
||||||
|
import java.lang.management.OperatingSystemMXBean;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import com.sun.management.UnixOperatingSystemMXBean;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
import org.apache.activemq.artemis.api.core.Message;
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
@ -34,14 +43,10 @@ import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
||||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
import org.junit.Assume;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
public class DeadLetterAddressTest extends ActiveMQTestBase {
|
public class DeadLetterAddressTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
|
private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
|
||||||
|
@ -82,6 +87,44 @@ public class DeadLetterAddressTest extends ActiveMQTestBase {
|
||||||
Assert.assertEquals(m.getBodyBuffer().readString(), "heyho!");
|
Assert.assertEquals(m.getBodyBuffer().readString(), "heyho!");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLargeMessageFileLeak() throws Exception {
|
||||||
|
OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean();
|
||||||
|
|
||||||
|
// only run this on *nix systems which will have the com.sun.management.UnixOperatingSystemMXBean (needed to check open file count)
|
||||||
|
Assume.assumeTrue(os instanceof UnixOperatingSystemMXBean);
|
||||||
|
|
||||||
|
long fdBaseline = ((UnixOperatingSystemMXBean) os).getOpenFileDescriptorCount();
|
||||||
|
final int SIZE = 2 * 1024;
|
||||||
|
SimpleString dla = new SimpleString("DLA");
|
||||||
|
SimpleString qName = new SimpleString("q1");
|
||||||
|
SimpleString adName = new SimpleString("ad1");
|
||||||
|
|
||||||
|
AddressSettings addressSettings = new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla);
|
||||||
|
server.getAddressSettingsRepository().addMatch(adName.toString(), addressSettings);
|
||||||
|
SimpleString dlq = new SimpleString("DLQ1");
|
||||||
|
clientSession.createQueue(dla, dlq, null, false);
|
||||||
|
clientSession.createQueue(adName, qName, null, false);
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
ClientProducer producer = clientSession.createProducer(adName);
|
||||||
|
ClientMessage clientFile = clientSession.createMessage(true);
|
||||||
|
clientFile.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(SIZE));
|
||||||
|
producer.send(clientFile);
|
||||||
|
clientSession.start();
|
||||||
|
ClientConsumer clientConsumer = clientSession.createConsumer(qName);
|
||||||
|
ClientMessage m = clientConsumer.receive(500);
|
||||||
|
m.acknowledge();
|
||||||
|
Assert.assertNotNull(m);
|
||||||
|
|
||||||
|
// force a cancel
|
||||||
|
clientSession.rollback();
|
||||||
|
m = clientConsumer.receiveImmediate();
|
||||||
|
Assert.assertNull(m);
|
||||||
|
clientConsumer.close();
|
||||||
|
}
|
||||||
|
assertEquals("File descriptors are leaking", 0, ((UnixOperatingSystemMXBean) os).getOpenFileDescriptorCount() - fdBaseline);
|
||||||
|
}
|
||||||
|
|
||||||
// HORNETQ- 1084
|
// HORNETQ- 1084
|
||||||
@Test
|
@Test
|
||||||
public void testBasicSendWithDLAButNoBinding() throws Exception {
|
public void testBasicSendWithDLAButNoBinding() throws Exception {
|
||||||
|
@ -462,10 +505,11 @@ public class DeadLetterAddressTest extends ActiveMQTestBase {
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
super.setUp();
|
super.setUp();
|
||||||
server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig(), false));
|
server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig(), true));
|
||||||
server.start();
|
server.start();
|
||||||
// then we create a client as normal
|
// then we create a client as normal
|
||||||
locator = createInVMNonHALocator();
|
locator = createInVMNonHALocator();
|
||||||
|
locator.setMinLargeMessageSize(1024);
|
||||||
ClientSessionFactory sessionFactory = createSessionFactory(locator);
|
ClientSessionFactory sessionFactory = createSessionFactory(locator);
|
||||||
clientSession = addClientSession(sessionFactory.createSession(false, true, false));
|
clientSession = addClientSession(sessionFactory.createSession(false, true, false));
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue