From 351d4b9dea128a1faedd55fab448931f374922eb Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Mon, 30 Mar 2015 17:20:52 -0400 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5666 Add some additional tests to validate AMQP behavior --- .../transport/amqp/client/AmqpMessage.java | 26 +++++++ .../amqp/interop/AmqpSendReceiveTest.java | 78 +++++++++++++++++++ 2 files changed, 104 insertions(+) diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java index 9db12f9e1d..e5d2d97175 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java @@ -210,6 +210,32 @@ public class AmqpMessage { return message.getProperties().getMessageId().toString(); } + /** + * Sets the GroupId property on an outbound message using the provided String + * + * @param messageId + * the String Group ID value to set. + */ + public void setGroupId(String groupId) { + checkReadOnly(); + lazyCreateProperties(); + getWrappedMessage().setGroupId(groupId); + } + + /** + * Return the set GroupId value in String form, if there are no properties + * in the given message return null. + * + * @return the set GroupID in String form or null if not set. + */ + public String getGroupId() { + if (message.getProperties() == null) { + return null; + } + + return message.getProperties().getGroupId(); + } + /** * Sets a given application property on an outbound message. * diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java index e7058e5c67..822edee359 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java @@ -17,6 +17,7 @@ package org.apache.activemq.transport.amqp.interop; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import java.util.concurrent.TimeUnit; @@ -76,4 +77,81 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { receiver2.close(); connection.close(); } + + @Test(timeout = 60000) + public void testReceiveWithJMSSelectorFilter() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + AmqpMessage message = new AmqpMessage(); + + message.setGroupId("abcdefg"); + message.setApplicationProperty("sn", 100); + + AmqpSender sender = session.createSender("queue://" + getTestName()); + sender.send(message); + sender.close(); + + AmqpReceiver receiver = session.createReceiver("queue://" + getTestName(), "sn = 100"); + receiver.flow(1); + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(received); + assertEquals(100, received.getApplicationProperty("sn")); + assertEquals("abcdefg", received.getGroupId()); + received.accept(); + + receiver.close(); + } + + @Test(timeout = 30000) + public void testAdvancedLinkFlowControl() throws Exception { + final int MSG_COUNT = 20; + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender("queue://" + getTestName()); + + for (int i = 0; i < MSG_COUNT; i++) { + AmqpMessage message = new AmqpMessage(); + + message.setMessageId("msg" + i); + message.setMessageAnnotation("serialNo", i); + message.setText("Test-Message"); + + sender.send(message); + } + + sender.close(); + + AmqpReceiver receiver1 = session.createReceiver("queue://" + getTestName()); + receiver1.flow(2); + AmqpMessage message1 = receiver1.receive(5, TimeUnit.SECONDS); + AmqpMessage message2 = receiver1.receive(5, TimeUnit.SECONDS); + assertEquals("msg0", message1.getMessageId()); + assertEquals("msg1", message2.getMessageId()); + message1.accept(); + message2.accept(); + + AmqpReceiver receiver2 = session.createReceiver("queue://" + getTestName()); + receiver2.flow(2); + AmqpMessage message3 = receiver2.receive(5, TimeUnit.SECONDS); + AmqpMessage message4 = receiver2.receive(5, TimeUnit.SECONDS); + assertEquals("msg2", message3.getMessageId()); + assertEquals("msg3", message4.getMessageId()); + message3.accept(); + message4.accept(); + + receiver1.flow(MSG_COUNT - 4); + for (int i = 4; i < MSG_COUNT - 4; i++) { + AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS); + assertEquals("msg" + i, message.getMessageId()); + message.accept(); + } + + receiver1.close(); + receiver2.close(); + } }