This commit is contained in:
Clebert Suconic 2021-09-09 16:56:52 -04:00
commit 6f2d58f886
2 changed files with 58 additions and 0 deletions

View File

@ -252,6 +252,7 @@ public abstract class ProtonAbstractReceiver extends ProtonInitializable impleme
currentLargeMessage.addBytes(receiver.recv()); currentLargeMessage.addBytes(receiver.recv());
receiver.advance(); receiver.advance();
message = currentLargeMessage; message = currentLargeMessage;
currentLargeMessage.releaseResources(true, true);
currentLargeMessage = null; currentLargeMessage = null;
} else { } else {
ReadableBuffer data = receiver.recv(); ReadableBuffer data = receiver.recv();

View File

@ -16,9 +16,16 @@
*/ */
package org.apache.activemq.artemis.tests.integration.client; package org.apache.activemq.artemis.tests.integration.client;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.transaction.xa.XAResource; import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid; import javax.transaction.xa.Xid;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.HashMap; import java.util.HashMap;
import java.util.UUID; import java.util.UUID;
@ -26,9 +33,11 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import com.sun.management.UnixOperatingSystemMXBean;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientConsumer;
@ -53,11 +62,13 @@ import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase; import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.RandomUtil; import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator; import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -2738,6 +2749,52 @@ public class LargeMessageTest extends LargeMessageTestBase {
cons.close(); cons.close();
} }
@Test
public void testAMQPLargeMessageFDs() throws Exception {
OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean();
Assume.assumeTrue(os instanceof UnixOperatingSystemMXBean);
final SimpleString MY_QUEUE = new SimpleString("MY-QUEUE");
final int numberOfMessages = 30;
ActiveMQServer server = createServer(true, true);
server.start();
long fdBefore = ((UnixOperatingSystemMXBean)os).getOpenFileDescriptorCount();
server.createQueue(new QueueConfiguration(MY_QUEUE).setRoutingType(RoutingType.ANYCAST));
ConnectionFactory connectionFactory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
byte[] bufferSample = new byte[300 * 1024];
for (int i = 0; i < bufferSample.length; i++) {
bufferSample[i] = getSamplebyte(i);
}
javax.jms.Queue jmsQueue = session.createQueue(MY_QUEUE.toString());
MessageProducer producer = session.createProducer(jmsQueue);
producer.setTimeToLive(300);
for (int i = 0; i < numberOfMessages; i++) {
BytesMessage message = session.createBytesMessage();
message.writeBytes(bufferSample);
message.setIntProperty("count", i);
producer.send(message);
}
session.close();
connection.close();
Wait.assertTrue(() -> ((UnixOperatingSystemMXBean)os).getOpenFileDescriptorCount() - fdBefore < 3);
}
// Private ------------------------------------------------------- // Private -------------------------------------------------------
// Inner classes ------------------------------------------------- // Inner classes -------------------------------------------------