Add some additional tests to validate AMQP behavior
This commit is contained in:
Timothy Bish 2015-03-30 17:20:52 -04:00
parent e333fd957b
commit 351d4b9dea
2 changed files with 104 additions and 0 deletions

View File

@ -210,6 +210,32 @@ public class AmqpMessage {
return message.getProperties().getMessageId().toString();
}
/**
* Sets the GroupId property on an outbound message using the provided String
*
* @param messageId
* the String Group ID value to set.
*/
public void setGroupId(String groupId) {
checkReadOnly();
lazyCreateProperties();
getWrappedMessage().setGroupId(groupId);
}
/**
* Return the set GroupId value in String form, if there are no properties
* in the given message return null.
*
* @return the set GroupID in String form or null if not set.
*/
public String getGroupId() {
if (message.getProperties() == null) {
return null;
}
return message.getProperties().getGroupId();
}
/**
* Sets a given application property on an outbound message.
*

View File

@ -17,6 +17,7 @@
package org.apache.activemq.transport.amqp.interop;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.util.concurrent.TimeUnit;
@ -76,4 +77,81 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
receiver2.close();
connection.close();
}
@Test(timeout = 60000)
public void testReceiveWithJMSSelectorFilter() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
AmqpMessage message = new AmqpMessage();
message.setGroupId("abcdefg");
message.setApplicationProperty("sn", 100);
AmqpSender sender = session.createSender("queue://" + getTestName());
sender.send(message);
sender.close();
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName(), "sn = 100");
receiver.flow(1);
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(received);
assertEquals(100, received.getApplicationProperty("sn"));
assertEquals("abcdefg", received.getGroupId());
received.accept();
receiver.close();
}
@Test(timeout = 30000)
public void testAdvancedLinkFlowControl() throws Exception {
final int MSG_COUNT = 20;
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName());
for (int i = 0; i < MSG_COUNT; i++) {
AmqpMessage message = new AmqpMessage();
message.setMessageId("msg" + i);
message.setMessageAnnotation("serialNo", i);
message.setText("Test-Message");
sender.send(message);
}
sender.close();
AmqpReceiver receiver1 = session.createReceiver("queue://" + getTestName());
receiver1.flow(2);
AmqpMessage message1 = receiver1.receive(5, TimeUnit.SECONDS);
AmqpMessage message2 = receiver1.receive(5, TimeUnit.SECONDS);
assertEquals("msg0", message1.getMessageId());
assertEquals("msg1", message2.getMessageId());
message1.accept();
message2.accept();
AmqpReceiver receiver2 = session.createReceiver("queue://" + getTestName());
receiver2.flow(2);
AmqpMessage message3 = receiver2.receive(5, TimeUnit.SECONDS);
AmqpMessage message4 = receiver2.receive(5, TimeUnit.SECONDS);
assertEquals("msg2", message3.getMessageId());
assertEquals("msg3", message4.getMessageId());
message3.accept();
message4.accept();
receiver1.flow(MSG_COUNT - 4);
for (int i = 4; i < MSG_COUNT - 4; i++) {
AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS);
assertEquals("msg" + i, message.getMessageId());
message.accept();
}
receiver1.close();
receiver2.close();
}
}