diff --git a/pom.xml b/pom.xml index 221cff985f..fb95a5efc3 100644 --- a/pom.xml +++ b/pom.xml @@ -92,7 +92,7 @@ 2.4 2.8.47 4.1.24.Final - 0.27.1 + 0.27.3 3.0.19.Final 1.7.21 0.33.0 diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java index e9fc75bd92..fb4e4daa04 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java @@ -363,6 +363,20 @@ public class AmqpReceiver extends AmqpAbstractResource { * if an error occurs while sending the flow. */ public void flow(final int credit) throws IOException { + flow(credit, false); + } + + /** + * Controls the amount of credit given to the receiver link. + * + * @param credit + * the amount of credit to grant. + * @param deferWrite + * defer writing to the wire, hold until for the next operation writes. + * @throws IOException + * if an error occurs while sending the flow. + */ + public void flow(final int credit, final boolean deferWrite) throws IOException { checkClosed(); final ClientFuture request = new ClientFuture(); session.getScheduler().execute(new Runnable() { @@ -372,7 +386,9 @@ public class AmqpReceiver extends AmqpAbstractResource { checkClosed(); try { getEndpoint().flow(credit); - session.pumpToProtonTransport(request); + if (!deferWrite) { + session.pumpToProtonTransport(request); + } request.onSuccess(); } catch (Exception e) { request.onFailure(e); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java index 8465c61128..6393bae5c6 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java @@ -393,13 +393,7 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport { receiver2.flow(numMsgs); for (int i = 0; i < numMsgs; ++i) { AmqpMessage message = receiver2.receive(5, TimeUnit.SECONDS); - assertNotNull("failed at " + i, message); - - Section body = message.getWrappedMessage().getBody(); - assertNotNull("No message body for msg " + i, body); - - assertTrue("Unexpected message body type for msg " + body.getClass(), body instanceof Data); - assertEquals("Unexpected body content for msg", new Binary(payload, 0, payload.length), ((Data) body).getValue()); + validateMessage(payload, i, message); message.accept(); } @@ -411,6 +405,136 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport { } } + @Test(timeout = 60000) + public void testReceiveLargeMessagesMultiplexedOnSameSession() throws Exception { + server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST)); + + int numMsgs = 10; + int maxFrameSize = FRAME_SIZE; // Match the brokers outgoing frame size limit to make window sizing easy + int msgSizeA = FRAME_SIZE * 4; // Bigger multi-frame messages + int msgSizeB = maxFrameSize / 2; // Smaller single frame messages + int sessionCapacity = msgSizeA + maxFrameSize; // Restrict session to 1.X of the larger messages in flight at once, make it likely send is partial. + + byte[] payloadA = createLargePayload(msgSizeA); + assertEquals(msgSizeA, payloadA.length); + byte[] payloadB = createLargePayload(msgSizeB); + assertEquals(msgSizeB, payloadB.length); + + String testQueueNameA = getTestName() + "A"; + String testQueueNameB = getTestName() + "B"; + + AmqpClient client = createAmqpClient(); + + AmqpConnection connection = client.createConnection(); + connection.setMaxFrameSize(maxFrameSize); + connection.setSessionIncomingCapacity(sessionCapacity); + + connection.connect(); + addConnection(connection); + try { + AmqpSession session = connection.createSession(); + AmqpSender senderA = session.createSender(testQueueNameA); + AmqpSender senderB = session.createSender(testQueueNameB); + + // Send in the messages + for (int i = 0; i < numMsgs; ++i) { + AmqpMessage messageA = new AmqpMessage(); + messageA.setBytes(payloadA); + + senderA.send(messageA); + + AmqpMessage messageB = new AmqpMessage(); + messageB.setBytes(payloadB); + + senderB.send(messageB); + } + + Wait.assertEquals(numMsgs, () -> getMessageCount(server.getPostOffice(), testQueueNameA), 5000, 10); + Wait.assertEquals(numMsgs, () -> getMessageCount(server.getPostOffice(), testQueueNameB), 5000, 10); + + AmqpReceiver receiverA = session.createReceiver(testQueueNameA); + AmqpReceiver receiverB = session.createReceiver(testQueueNameB); + + // Split credit flow to encourage overlapping + // Flow initial credit for both consumers, in the same TCP frame. + receiverA.flow(numMsgs / 2, true); + receiverB.flow(numMsgs / 2); + + // Flow remaining credit for both consumers, in the same TCP frame. + receiverA.flow(numMsgs / 2, true); + receiverB.flow(numMsgs / 2); + + ArrayList messagesA = new ArrayList<>(); + ArrayList messagesB = new ArrayList<>(); + + long timeout = 6000; + long start = System.nanoTime(); + + // Validate the messages are all received + boolean timeRemaining = true; + while (timeRemaining) { + if (messagesA.size() < numMsgs) { + LOG.debug("Attempting to receive message for receiver A"); + AmqpMessage messageA = receiverA.receive(20, TimeUnit.MILLISECONDS); + if (messageA != null) { + LOG.debug("Got message for receiver A"); + messagesA.add(messageA); + messageA.accept(); + } + } + + if (messagesB.size() < numMsgs) { + LOG.debug("Attempting to receive message for receiver B"); + AmqpMessage messageB = receiverB.receive(20, TimeUnit.MILLISECONDS); + if (messageB != null) { + LOG.debug("Got message for receiver B"); + messagesB.add(messageB); + messageB.accept(); + } + } + + if (messagesA.size() == numMsgs && messagesB.size() == numMsgs) { + LOG.debug("Received expected messages"); + break; + } + + timeRemaining = System.nanoTime() - start < TimeUnit.MILLISECONDS.toNanos(timeout); + } + + assertTrue("Failed to receive all messages in expected time: A=" + messagesA.size() + ", B=" + messagesB.size(), timeRemaining); + + // Validate there aren't any extras + assertNull("Unexpected additional message present for A", receiverA.receiveNoWait()); + assertNull("Unexpected additional message present for B", receiverB.receiveNoWait()); + + // Validate the transfers were reconstituted to give the expected delivery payload. + for (int i = 0; i < numMsgs; ++i) { + AmqpMessage messageA = messagesA.get(i); + validateMessage(payloadA, i, messageA); + + AmqpMessage messageB = messagesB.get(i); + validateMessage(payloadB, i, messageB); + } + + receiverA.close(); + receiverB.close(); + + session.close(); + } finally { + connection.close(); + } + } + + private void validateMessage(byte[] expectedPayload, int msgNum, AmqpMessage message) { + assertNotNull("failed at " + msgNum, message); + + Section body = message.getWrappedMessage().getBody(); + assertNotNull("No message body for msg " + msgNum, body); + + assertTrue("Unexpected message body type for msg " + body.getClass(), body instanceof Data); + assertEquals("Unexpected body content for msg", new Binary(expectedPayload, 0, expectedPayload.length), ((Data) body).getValue()); + } + @Test(timeout = 60000) public void testMessageWithAmqpValueAndEmptyBinaryPreservesBody() throws Exception { server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));