This commit is contained in:
Clebert Suconic 2018-08-08 14:27:42 -04:00
commit 24c13fa4e0
3 changed files with 149 additions and 9 deletions

View File

@ -92,7 +92,7 @@
<maven.assembly.plugin.version>2.4</maven.assembly.plugin.version>
<mockito.version>2.8.47</mockito.version>
<netty.version>4.1.24.Final</netty.version>
<proton.version>0.27.1</proton.version>
<proton.version>0.27.3</proton.version>
<resteasy.version>3.0.19.Final</resteasy.version>
<slf4j.version>1.7.21</slf4j.version>
<qpid.jms.version>0.33.0</qpid.jms.version>

View File

@ -363,6 +363,20 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
* if an error occurs while sending the flow.
*/
public void flow(final int credit) throws IOException {
flow(credit, false);
}
/**
* Controls the amount of credit given to the receiver link.
*
* @param credit
* the amount of credit to grant.
* @param deferWrite
* defer writing to the wire, hold until for the next operation writes.
* @throws IOException
* if an error occurs while sending the flow.
*/
public void flow(final int credit, final boolean deferWrite) throws IOException {
checkClosed();
final ClientFuture request = new ClientFuture();
session.getScheduler().execute(new Runnable() {
@ -372,7 +386,9 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
checkClosed();
try {
getEndpoint().flow(credit);
if (!deferWrite) {
session.pumpToProtonTransport(request);
}
request.onSuccess();
} catch (Exception e) {
request.onFailure(e);

View File

@ -393,13 +393,7 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport {
receiver2.flow(numMsgs);
for (int i = 0; i < numMsgs; ++i) {
AmqpMessage message = receiver2.receive(5, TimeUnit.SECONDS);
assertNotNull("failed at " + i, message);
Section body = message.getWrappedMessage().getBody();
assertNotNull("No message body for msg " + i, body);
assertTrue("Unexpected message body type for msg " + body.getClass(), body instanceof Data);
assertEquals("Unexpected body content for msg", new Binary(payload, 0, payload.length), ((Data) body).getValue());
validateMessage(payload, i, message);
message.accept();
}
@ -411,6 +405,136 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport {
}
}
@Test(timeout = 60000)
public void testReceiveLargeMessagesMultiplexedOnSameSession() throws Exception {
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
int numMsgs = 10;
int maxFrameSize = FRAME_SIZE; // Match the brokers outgoing frame size limit to make window sizing easy
int msgSizeA = FRAME_SIZE * 4; // Bigger multi-frame messages
int msgSizeB = maxFrameSize / 2; // Smaller single frame messages
int sessionCapacity = msgSizeA + maxFrameSize; // Restrict session to 1.X of the larger messages in flight at once, make it likely send is partial.
byte[] payloadA = createLargePayload(msgSizeA);
assertEquals(msgSizeA, payloadA.length);
byte[] payloadB = createLargePayload(msgSizeB);
assertEquals(msgSizeB, payloadB.length);
String testQueueNameA = getTestName() + "A";
String testQueueNameB = getTestName() + "B";
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.createConnection();
connection.setMaxFrameSize(maxFrameSize);
connection.setSessionIncomingCapacity(sessionCapacity);
connection.connect();
addConnection(connection);
try {
AmqpSession session = connection.createSession();
AmqpSender senderA = session.createSender(testQueueNameA);
AmqpSender senderB = session.createSender(testQueueNameB);
// Send in the messages
for (int i = 0; i < numMsgs; ++i) {
AmqpMessage messageA = new AmqpMessage();
messageA.setBytes(payloadA);
senderA.send(messageA);
AmqpMessage messageB = new AmqpMessage();
messageB.setBytes(payloadB);
senderB.send(messageB);
}
Wait.assertEquals(numMsgs, () -> getMessageCount(server.getPostOffice(), testQueueNameA), 5000, 10);
Wait.assertEquals(numMsgs, () -> getMessageCount(server.getPostOffice(), testQueueNameB), 5000, 10);
AmqpReceiver receiverA = session.createReceiver(testQueueNameA);
AmqpReceiver receiverB = session.createReceiver(testQueueNameB);
// Split credit flow to encourage overlapping
// Flow initial credit for both consumers, in the same TCP frame.
receiverA.flow(numMsgs / 2, true);
receiverB.flow(numMsgs / 2);
// Flow remaining credit for both consumers, in the same TCP frame.
receiverA.flow(numMsgs / 2, true);
receiverB.flow(numMsgs / 2);
ArrayList<AmqpMessage> messagesA = new ArrayList<>();
ArrayList<AmqpMessage> messagesB = new ArrayList<>();
long timeout = 6000;
long start = System.nanoTime();
// Validate the messages are all received
boolean timeRemaining = true;
while (timeRemaining) {
if (messagesA.size() < numMsgs) {
LOG.debug("Attempting to receive message for receiver A");
AmqpMessage messageA = receiverA.receive(20, TimeUnit.MILLISECONDS);
if (messageA != null) {
LOG.debug("Got message for receiver A");
messagesA.add(messageA);
messageA.accept();
}
}
if (messagesB.size() < numMsgs) {
LOG.debug("Attempting to receive message for receiver B");
AmqpMessage messageB = receiverB.receive(20, TimeUnit.MILLISECONDS);
if (messageB != null) {
LOG.debug("Got message for receiver B");
messagesB.add(messageB);
messageB.accept();
}
}
if (messagesA.size() == numMsgs && messagesB.size() == numMsgs) {
LOG.debug("Received expected messages");
break;
}
timeRemaining = System.nanoTime() - start < TimeUnit.MILLISECONDS.toNanos(timeout);
}
assertTrue("Failed to receive all messages in expected time: A=" + messagesA.size() + ", B=" + messagesB.size(), timeRemaining);
// Validate there aren't any extras
assertNull("Unexpected additional message present for A", receiverA.receiveNoWait());
assertNull("Unexpected additional message present for B", receiverB.receiveNoWait());
// Validate the transfers were reconstituted to give the expected delivery payload.
for (int i = 0; i < numMsgs; ++i) {
AmqpMessage messageA = messagesA.get(i);
validateMessage(payloadA, i, messageA);
AmqpMessage messageB = messagesB.get(i);
validateMessage(payloadB, i, messageB);
}
receiverA.close();
receiverB.close();
session.close();
} finally {
connection.close();
}
}
private void validateMessage(byte[] expectedPayload, int msgNum, AmqpMessage message) {
assertNotNull("failed at " + msgNum, message);
Section body = message.getWrappedMessage().getBody();
assertNotNull("No message body for msg " + msgNum, body);
assertTrue("Unexpected message body type for msg " + body.getClass(), body instanceof Data);
assertEquals("Unexpected body content for msg", new Binary(expectedPayload, 0, expectedPayload.length), ((Data) body).getValue());
}
@Test(timeout = 60000)
public void testMessageWithAmqpValueAndEmptyBinaryPreservesBody() throws Exception {
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));