diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java index 7af4c2c91a..526a0435ad 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java @@ -52,6 +52,9 @@ public class AmqpSupport { // Symbols used in configuration of newly opened links. public static final Symbol COPY = Symbol.getSymbol("copy"); + // Lifetime policy symbols + public static final Symbol DYNAMIC_NODE_LIFETIME_POLICY = Symbol.valueOf("lifetime-policy"); + /** * Search for a given Symbol in a given array of Symbol object. * diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java index 585ba93c7e..978075c44c 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java @@ -71,12 +71,12 @@ public class AmqpReceiver extends AmqpAbstractResource { private final AmqpSession session; private final String address; private final String receiverId; + private final Source userSpecifiedSource; private String subscriptionName; private String selector; private boolean presettle; private boolean noLocal; - private Source userSpecifiedSource; /** * Create a new receiver instance. @@ -94,6 +94,7 @@ public class AmqpReceiver extends AmqpAbstractResource { throw new IllegalArgumentException("Address cannot be empty."); } + this.userSpecifiedSource = null; this.session = session; this.address = address; this.receiverId = receiverId; @@ -454,7 +455,7 @@ public class AmqpReceiver extends AmqpAbstractResource { Source source = userSpecifiedSource; Target target = new Target(); - if (userSpecifiedSource == null && address != null) { + if (source == null && address != null) { source = new Source(); source.setAddress(address); configureSource(source); diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java index 95b074391c..c8829e08d7 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java @@ -64,6 +64,7 @@ public class AmqpSender extends AmqpAbstractResource { private final AmqpSession session; private final String address; private final String senderId; + private final Target userSpecifiedTarget; private boolean presettle; private long sendTimeout = DEFAULT_SEND_TIMEOUT; @@ -82,9 +83,37 @@ public class AmqpSender extends AmqpAbstractResource { * The unique ID assigned to this sender. */ public AmqpSender(AmqpSession session, String address, String senderId) { + + if (address != null && address.isEmpty()) { + throw new IllegalArgumentException("Address cannot be empty."); + } + this.session = session; this.address = address; this.senderId = senderId; + this.userSpecifiedTarget = null; + } + + /** + * Create a new sender instance using the given Target when creating the link. + * + * @param session + * The parent session that created the session. + * @param address + * The address that this sender produces to. + * @param senderId + * The unique ID assigned to this sender. + */ + public AmqpSender(AmqpSession session, Target target, String senderId) { + + if (target == null) { + throw new IllegalArgumentException("User specified Target cannot be null"); + } + + this.session = session; + this.userSpecifiedTarget = target; + this.address = target.getAddress(); + this.senderId = senderId; } /** @@ -216,8 +245,11 @@ public class AmqpSender extends AmqpAbstractResource { source.setAddress(senderId); source.setOutcomes(outcomes); - Target target = new Target(); - target.setAddress(address); + Target target = userSpecifiedTarget; + if (target == null) { + target = new Target(); + target.setAddress(address); + } String senderName = senderId + ":" + address; diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java index 3b2a3d1dab..8af362b0f1 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.transport.amqp.client.util.ClientFuture; import org.apache.activemq.transport.amqp.client.util.UnmodifiableSession; import org.apache.qpid.proton.amqp.messaging.Source; +import org.apache.qpid.proton.amqp.messaging.Target; import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Session; @@ -81,6 +82,38 @@ public class AmqpSession extends AmqpAbstractResource { return sender; } + /** + * Create a sender instance using the given Target + * + * @param target + * the caller created and configured Traget used to create the sender link. + * + * @return a newly created sender that is ready for use. + * + * @throws Exception if an error occurs while creating the receiver. + */ + public AmqpSender createSender(Target target) throws Exception { + checkClosed(); + + final AmqpSender sender = new AmqpSender(AmqpSession.this, target, getNextSenderId()); + final ClientFuture request = new ClientFuture(); + + connection.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + sender.setStateInspector(getStateInspector()); + sender.open(request); + pumpToProtonTransport(); + } + }); + + request.sync(); + + return sender; + } + /** * Create a receiver instance using the given address * @@ -153,10 +186,8 @@ public class AmqpSession extends AmqpAbstractResource { } /** - * Create a receiver instance using the given address + * Create a receiver instance using the given Source * - * @param address - * the address to which the receiver will subscribe for its messages. * @param source * the caller created and configured Source used to create the receiver link. * diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTempDestinationTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTempDestinationTest.java new file mode 100644 index 0000000000..18ef7dd5e3 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTempDestinationTest.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.interop; + +import static org.apache.activemq.transport.amqp.AmqpSupport.DYNAMIC_NODE_LIFETIME_POLICY; +import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_QUEUE_CAPABILITY; +import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_TOPIC_CAPABILITY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.activemq.broker.jmx.BrokerViewMBean; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.DeleteOnClose; +import org.apache.qpid.proton.amqp.messaging.Target; +import org.apache.qpid.proton.amqp.messaging.TerminusDurability; +import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; +import org.junit.Test; + +/** + * Tests for JMS temporary destination mappings to AMQP + */ +public class AmqpTempDestinationTest extends AmqpClientTestSupport { + + @Test(timeout = 60000) + public void testCreateDynamicSenderToTopic() throws Exception { + doTestCreateDynamicSender(true); + } + + @Test(timeout = 60000) + public void testCreateDynamicSenderToQueue() throws Exception { + doTestCreateDynamicSender(false); + } + + protected void doTestCreateDynamicSender(boolean topic) throws Exception { + Target target = createDynamicTarget(topic); + + final BrokerViewMBean brokerView = getProxyToBroker(); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(target); + assertNotNull(sender); + + if (topic) { + assertEquals(1, brokerView.getTemporaryTopics().length); + } else { + assertEquals(1, brokerView.getTemporaryQueues().length); + } + + connection.close(); + } + + @Test(timeout = 60000) + public void testDynamicSenderLifetimeBoundToLinkTopic() throws Exception { + doTestDynamicSenderLifetimeBoundToLinkQueue(true); + } + + @Test(timeout = 60000) + public void testDynamicSenderLifetimeBoundToLinkQueue() throws Exception { + doTestDynamicSenderLifetimeBoundToLinkQueue(false); + } + + protected void doTestDynamicSenderLifetimeBoundToLinkQueue(boolean topic) throws Exception { + Target target = createDynamicTarget(topic); + + final BrokerViewMBean brokerView = getProxyToBroker(); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(target); + assertNotNull(sender); + + if (topic) { + assertEquals(1, brokerView.getTemporaryTopics().length); + } else { + assertEquals(1, brokerView.getTemporaryQueues().length); + } + + sender.close(); + + if (topic) { + assertEquals(0, brokerView.getTemporaryTopics().length); + } else { + assertEquals(0, brokerView.getTemporaryQueues().length); + } + + connection.close(); + } + + protected Target createDynamicTarget(boolean topic) { + + Target target = new Target(); + target.setDynamic(true); + target.setDurable(TerminusDurability.NONE); + target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); + + // Set the dynamic node lifetime-policy + Map dynamicNodeProperties = new HashMap(); + dynamicNodeProperties.put(DYNAMIC_NODE_LIFETIME_POLICY, DeleteOnClose.getInstance()); + target.setDynamicNodeProperties(dynamicNodeProperties); + + // Set the capability to indicate the node type being created + if (!topic) { + target.setCapabilities(TEMP_QUEUE_CAPABILITY); + } else { + target.setCapabilities(TEMP_TOPIC_CAPABILITY); + } + + return target; + } +}