diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index ddd9b39e84..0b40ee2140 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -689,6 +689,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr byte[] tag = preSettle ? new byte[0] : protonSession.getTag(); // Let the Message decide how to present the message bytes + boolean attemptRelease = true; ReadableBuffer sendBuffer = message.getSendBuffer(deliveryCount); try { @@ -712,7 +713,16 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr delivery.setMessageFormat((int) message.getMessageFormat()); delivery.setContext(messageReference); - sender.sendNoCopy(sendBuffer); + if (sendBuffer instanceof NettyReadable) { + sender.send(sendBuffer); + // Above send copied, so release now if needed + attemptRelease = false; + ((NettyReadable) sendBuffer).getByteBuf().release(); + } else { + // Don't have pooled content, no need to release or copy. + attemptRelease = false; + sender.sendNoCopy(sendBuffer); + } if (preSettle) { // Presettled means the client implicitly accepts any delivery we send it. @@ -728,7 +738,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr return size; } finally { - if (sendBuffer instanceof NettyReadable) { + if (attemptRelease && sendBuffer instanceof NettyReadable) { ((NettyReadable) sendBuffer).getByteBuf().release(); } } diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java index a00f4ae7e9..9181ceab38 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java @@ -49,6 +49,7 @@ import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.Event; import org.apache.qpid.proton.engine.Event.Type; import org.apache.qpid.proton.engine.Sasl; +import org.apache.qpid.proton.engine.Session; import org.apache.qpid.proton.engine.Transport; import org.apache.qpid.proton.engine.impl.CollectorImpl; import org.apache.qpid.proton.engine.impl.TransportImpl; @@ -105,6 +106,7 @@ public class AmqpConnection extends AmqpAbstractResource implements private boolean authenticated; private int maxFrameSize = DEFAULT_MAX_FRAME_SIZE; private int channelMax = DEFAULT_CHANNEL_MAX; + private int sessionIncomingCapacity = 0; private long connectTimeout = DEFAULT_CONNECT_TIMEOUT; private long closeTimeout = DEFAULT_CLOSE_TIMEOUT; private long drainTimeout = DEFAULT_DRAIN_TIMEOUT; @@ -279,7 +281,9 @@ public class AmqpConnection extends AmqpAbstractResource implements @Override public void run() { checkClosed(); - session.setEndpoint(getEndpoint().session()); + Session protonSession = getEndpoint().session(); + protonSession.setIncomingCapacity(sessionIncomingCapacity); + session.setEndpoint(protonSession); session.setStateInspector(getStateInspector()); session.open(request); pumpToProtonTransport(request); @@ -383,6 +387,14 @@ public class AmqpConnection extends AmqpAbstractResource implements this.channelMax = channelMax; } + public int getSessionIncomingCapacity() { + return sessionIncomingCapacity; + } + + public void setSessionIncomingCapacity(int capacity) { + this.sessionIncomingCapacity = capacity; + } + public long getConnectTimeout() { return connectTimeout; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java index 7e80c8ff74..6bd550a27e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.tests.integration.amqp; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; import java.util.Map; import java.util.Random; @@ -39,6 +40,7 @@ import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpMessage; @@ -46,7 +48,10 @@ import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; import org.apache.qpid.jms.JmsConnectionFactory; +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.apache.qpid.proton.amqp.messaging.Data; +import org.apache.qpid.proton.amqp.messaging.Section; import org.apache.qpid.proton.message.impl.MessageImpl; import org.junit.Assert; import org.junit.Ignore; @@ -65,6 +70,13 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport { String testQueueName = "ConnectionFrameSize"; + @Override + protected void addConfiguration(ActiveMQServer server) { + // Make the journal file size larger than the frame+message sizes used in the tests, + // since it is by default for external brokers and it changes the behaviour. + server.getConfiguration().setJournalFileSize(5 * 1024 * 1024); + } + @Override protected void configureAMQPAcceptorParameters(Map params) { params.put("maxFrameSize", FRAME_SIZE); @@ -325,6 +337,80 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport { } } + @Test(timeout = 60000) + public void testReceiveRedeliveredLargeMessagesWithSessionFlowControl() throws Exception { + server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST)); + + int numMsgs = 10; + int msgSize = 2_000_000; + int maxFrameSize = FRAME_SIZE; // Match the brokers outgoing frame size limit to make window sizing easy + int sessionCapacity = 2_500_000; // Restrict session to 1.x messages in flight at once, make it likely send is partial. + + byte[] payload = createLargePayload(msgSize); + assertEquals(msgSize, payload.length); + + AmqpClient client = createAmqpClient(); + + AmqpConnection connection = client.createConnection(); + connection.setMaxFrameSize(maxFrameSize); + connection.setSessionIncomingCapacity(sessionCapacity); + + connection.connect(); + addConnection(connection); + try { + String testQueueName = getTestName(); + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(testQueueName); + + for (int i = 0; i < numMsgs; ++i) { + AmqpMessage message = new AmqpMessage(); + message.setBytes(payload); + + sender.send(message); + } + + Wait.assertEquals(numMsgs, () -> getMessageCount(server.getPostOffice(), testQueueName), 5000, 10); + + AmqpReceiver receiver = session.createReceiver(testQueueName); + receiver.flow(numMsgs); + + ArrayList messages = new ArrayList<>(); + for (int i = 0; i < numMsgs; ++i) { + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull("failed at " + i, message); + messages.add(message); + } + + for (int i = 0; i < numMsgs; ++i) { + AmqpMessage msg = messages.get(i); + msg.modified(true, false); + } + + receiver.close(); + + AmqpReceiver receiver2 = session.createReceiver(testQueueName); + receiver2.flow(numMsgs); + for (int i = 0; i < numMsgs; ++i) { + AmqpMessage message = receiver2.receive(5, TimeUnit.SECONDS); + assertNotNull("failed at " + i, message); + + Section body = message.getWrappedMessage().getBody(); + assertNotNull("No message body for msg " + i, body); + + //TODO: ARTEMIS-1941 raised. This is wrong, test sent a Data section, it got converted in transit. + assertTrue("Unexpected message body type for msg " + body.getClass(), body instanceof AmqpValue); + assertEquals("Unexpected body content for msg", new Binary(payload, 0, payload.length), ((AmqpValue) body).getValue()); + + message.accept(); + } + + session.close(); + + } finally { + connection.close(); + } + } + private void sendObjectMessages(int nMsgs, ConnectionFactory factory) throws Exception { try (Connection connection = factory.createConnection()) { Session session = connection.createSession();