diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTempDestinationTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTempDestinationTest.java index ea686399e5..900d01f055 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTempDestinationTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTempDestinationTest.java @@ -25,11 +25,13 @@ import static org.junit.Assert.fail; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.activemq.broker.jmx.BrokerViewMBean; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport; import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; @@ -248,6 +250,111 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport { connection.close(); } + @Test(timeout = 60000) + public void TestCreateDynamicQueueSenderAndPublish() throws Exception { + doTestCreateDynamicSenderAndPublish(false); + } + + @Test(timeout = 60000) + public void TestCreateDynamicTopicSenderAndPublish() throws Exception { + doTestCreateDynamicSenderAndPublish(true); + } + + protected void doTestCreateDynamicSenderAndPublish(boolean topic) throws Exception { + Target target = createDynamicTarget(topic); + + final BrokerViewMBean brokerView = getProxyToBroker(); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(target); + assertNotNull(sender); + + if (topic) { + assertEquals(1, brokerView.getTemporaryTopics().length); + } else { + assertEquals(1, brokerView.getTemporaryQueues().length); + } + + // Get the new address + String address = sender.getSender().getRemoteTarget().getAddress(); + LOG.info("New dynamic sender address -> {}", address); + + // Create a message and send to a receive that is listening on the newly + // created dynamic link address. + AmqpMessage message = new AmqpMessage(); + message.setMessageId("msg-1"); + message.setText("Test-Message"); + + AmqpReceiver receiver = session.createReceiver(address); + receiver.flow(1); + + sender.send(message); + + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull("Should have read a message", received); + received.accept(); + + receiver.close(); + sender.close(); + + connection.close(); + } + + @Test(timeout = 60000) + public void testCreateDynamicReceiverToTopicAndSend() throws Exception { + doTestCreateDynamicSender(true); + } + + @Test(timeout = 60000) + public void testCreateDynamicReceiverToQueueAndSend() throws Exception { + doTestCreateDynamicSender(false); + } + + protected void doTestCreateDynamicReceiverAndSend(boolean topic) throws Exception { + Source source = createDynamicSource(topic); + + final BrokerViewMBean brokerView = getProxyToBroker(); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(source); + assertNotNull(receiver); + + if (topic) { + assertEquals(1, brokerView.getTemporaryTopics().length); + } else { + assertEquals(1, brokerView.getTemporaryQueues().length); + } + + // Get the new address + String address = receiver.getReceiver().getRemoteSource().getAddress(); + LOG.info("New dynamic receiver address -> {}", address); + + // Create a message and send to a receive that is listening on the newly + // created dynamic link address. + AmqpMessage message = new AmqpMessage(); + message.setMessageId("msg-1"); + message.setText("Test-Message"); + + AmqpSender sender = session.createSender(address); + sender.send(message); + + receiver.flow(1); + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull("Should have read a message", received); + received.accept(); + + sender.close(); + receiver.close(); + + connection.close(); + } + protected Source createDynamicSource(boolean topic) { Source source = new Source();