diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java index 70ff658e43..10f06b209b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java @@ -16,7 +16,11 @@ */ package org.apache.activemq.artemis.tests.integration.amqp; -import javax.jms.JMSException; +import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS; +import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS; +import static org.apache.activemq.transport.amqp.AmqpSupport.contains; +import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter; + import java.util.ArrayList; import java.util.LinkedList; import java.util.List; @@ -26,10 +30,12 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import javax.jms.JMSException; + +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl; import org.apache.activemq.artemis.core.server.Queue; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.transport.amqp.client.AmqpClient; @@ -43,15 +49,11 @@ import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.engine.Sender; +import org.jgroups.util.UUID; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS; -import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS; -import static org.apache.activemq.transport.amqp.AmqpSupport.contains; -import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter; - /** * Test basic send and receive scenarios using only AMQP sender and receiver links. */ @@ -1049,6 +1051,61 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { connection.close(); } + @Test(timeout = 60000) + public void testMessageWithToFieldSetToSenderAddress() throws Exception { + doTestMessageWithToFieldSet(false, getTestName()); + } + + @Test(timeout = 60000) + public void testMessageWithToFieldSetToRandomAddress() throws Exception { + doTestMessageWithToFieldSet(false, UUID.randomUUID().toString()); + } + + @Test(timeout = 60000) + public void testMessageWithToFieldSetToEmpty() throws Exception { + doTestMessageWithToFieldSet(false, ""); + } + + @Test(timeout = 60000) + public void testMessageWithToFieldSetToNull() throws Exception { + doTestMessageWithToFieldSet(false, null); + } + + @Test(timeout = 60000) + public void testMessageWithToFieldSetWithAnonymousSender() throws Exception { + doTestMessageWithToFieldSet(true, getTestName()); + } + + private void doTestMessageWithToFieldSet(boolean anonymous, String expected) throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + final String address = getTestName(); + + AmqpSender sender = session.createSender(anonymous ? null : address); + + AmqpMessage message = new AmqpMessage(); + message.setAddress(expected); + message.setMessageId("msg:1"); + sender.send(message); + + AmqpReceiver receiver = session.createReceiver(address); + + Queue queueView = getProxyToQueue(address); + assertEquals(1, queueView.getMessageCount()); + + receiver.flow(1); + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(received); + assertEquals(expected, received.getAddress()); + receiver.close(); + + assertEquals(1, queueView.getMessageCount()); + + connection.close(); + } + public void sendMessages(String destinationName, int count) throws Exception { AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); @@ -1066,7 +1123,6 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { } } - public void sendMessages(String destinationName, int count, boolean durable) throws Exception { AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect());