diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientSenderContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientSenderContext.java index 8c40b9d31c..44d10561e5 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientSenderContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientSenderContext.java @@ -21,4 +21,7 @@ import org.apache.qpid.proton.message.ProtonJMessage; public interface AMQPClientSenderContext { void send(ProtonJMessage message); + + String getAddress(); + } diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientSessionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientSessionContext.java index b5184749e7..44cec7c850 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientSessionContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientSessionContext.java @@ -22,6 +22,8 @@ public interface AMQPClientSessionContext extends AMQPSessionContext { AMQPClientSenderContext createSender(String address, boolean preSettled) throws ActiveMQAMQPException; + AMQPClientSenderContext createDynamicSender(boolean preSettled) throws ActiveMQAMQPException; + AMQPClientReceiverContext createReceiver(String address) throws ActiveMQAMQPException; AMQPClientReceiverContext createReceiver(String name, String address) throws ActiveMQAMQPException; diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java index ffc08d395e..4343b017e2 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java @@ -32,7 +32,7 @@ public abstract class AbstractProtonReceiverContext extends ProtonInitializable protected final Receiver receiver; - protected final String address; + protected String address; protected final AMQPSessionCallback sessionSPI; @@ -43,12 +43,6 @@ public abstract class AbstractProtonReceiverContext extends ProtonInitializable this.connection = connection; this.protonSession = protonSession; this.receiver = receiver; - if (receiver.getRemoteTarget() != null) { - this.address = receiver.getRemoteTarget().getAddress(); - } - else { - this.address = null; - } this.sessionSPI = sessionSPI; } diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientContext.java index e03c99dc35..f442b9efa1 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientContext.java @@ -67,7 +67,10 @@ public class ProtonClientContext extends AbstractProtonContextSender implements Thread.currentThread().interrupt(); return false; } - } + @Override + public String getAddress() { + return sender.getRemoteTarget().getAddress(); + } } diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientSessionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientSessionContext.java index b3e96bb6c2..9079dc3769 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientSessionContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientSessionContext.java @@ -16,8 +16,18 @@ */ package org.proton.plug.context.client; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.Accepted; +import org.apache.qpid.proton.amqp.messaging.DeleteOnClose; +import org.apache.qpid.proton.amqp.messaging.Rejected; import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.amqp.messaging.Target; +import org.apache.qpid.proton.amqp.messaging.TerminusDurability; +import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.engine.Sender; @@ -26,6 +36,7 @@ import org.proton.plug.AMQPClientReceiverContext; import org.proton.plug.AMQPClientSenderContext; import org.proton.plug.AMQPClientSessionContext; import org.proton.plug.AMQPSessionCallback; +import org.proton.plug.AmqpSupport; import org.proton.plug.context.AbstractConnectionContext; import org.proton.plug.context.AbstractProtonSessionContext; import org.proton.plug.exceptions.ActiveMQAMQPException; @@ -62,6 +73,46 @@ public class ProtonClientSessionContext extends AbstractProtonSessionContext imp return amqpSender; } + @Override + public AMQPClientSenderContext createDynamicSender(boolean preSettled) throws ActiveMQAMQPException { + FutureRunnable futureRunnable = new FutureRunnable(1); + + ProtonClientContext amqpSender; + synchronized (connection.getLock()) { + final String senderName = "Dynamic-" + UUID.randomUUID().toString(); + + Sender sender = session.sender(senderName); + sender.setSenderSettleMode(SenderSettleMode.SETTLED); + + Symbol[] outcomes = new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL}; + Source source = new Source(); + source.setAddress(senderName); + source.setOutcomes(outcomes); + + Target target = new Target(); + target.setDynamic(true); + target.setDurable(TerminusDurability.NONE); + target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); + + // Set the dynamic node lifetime-policy + Map dynamicNodeProperties = new HashMap<>(); + dynamicNodeProperties.put(AmqpSupport.LIFETIME_POLICY, DeleteOnClose.getInstance()); + target.setDynamicNodeProperties(dynamicNodeProperties); + + amqpSender = new ProtonClientContext(connection, sender, this, sessionSPI); + amqpSender.afterInit(futureRunnable); + sender.setSource(source); + sender.setTarget(target); + sender.setContext(amqpSender); + sender.open(); + } + + connection.flush(); + + waitWithTimeout(futureRunnable); + return amqpSender; + } + @Override public AMQPClientReceiverContext createReceiver(String address) throws ActiveMQAMQPException { return createReceiver(address, address); diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java index e3cbb3bb16..aa04cefe0b 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java @@ -61,20 +61,20 @@ public class ProtonServerReceiverContext extends AbstractProtonReceiverContext { if (target.getDynamic()) { //if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and // will be deleted on closing of the session - String queue = sessionSPI.tempQueueName(); + address = sessionSPI.tempQueueName(); try { - sessionSPI.createTemporaryQueue(queue); + sessionSPI.createTemporaryQueue(address); } catch (Exception e) { throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e); } - target.setAddress(queue); + target.setAddress(address); } else { //if not dynamic then we use the targets address as the address to forward the messages to, however there has to //be a queue bound to it so we nee to check this. - String address = target.getAddress(); + address = target.getAddress(); if (address == null) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.targetAddressNotSet(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java index 9534681536..d803e9e70a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java @@ -39,6 +39,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Enumeration; import java.util.HashMap; +import java.util.Map; import java.util.Random; import java.util.concurrent.TimeUnit; @@ -51,7 +52,10 @@ import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.qpid.jms.JmsConnectionFactory; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.amqp.messaging.Properties; import org.apache.qpid.proton.message.ProtonJMessage; +import org.apache.qpid.proton.message.impl.MessageImpl; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -60,6 +64,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.proton.plug.AMQPClientConnectionContext; import org.proton.plug.AMQPClientReceiverContext; +import org.proton.plug.AMQPClientSenderContext; import org.proton.plug.AMQPClientSessionContext; import org.proton.plug.test.Constants; import org.proton.plug.test.minimalclient.SimpleAMQPConnector; @@ -196,7 +201,7 @@ public class ProtonTest extends ActiveMQTestBase { } @Test - public void testReplyToNonJMS() throws Throwable { + public void testReplyToNonJMS() throws Throwable { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); TemporaryQueue queue = session.createTemporaryQueue(); @@ -351,6 +356,40 @@ public class ProtonTest extends ActiveMQTestBase { } } + @Test + public void testDynamicSenderLink() throws Exception { + + if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol + + SimpleAMQPConnector connector = new SimpleAMQPConnector(); + connector.start(); + AMQPClientConnectionContext clientConnection = connector.connect("127.0.0.1", Constants.PORT); + + clientConnection.clientOpen(null); + + AMQPClientSessionContext csession = clientConnection.createClientSession(); + AMQPClientSenderContext sender = csession.createDynamicSender(false); + + String address = sender.getAddress(); + + AMQPClientReceiverContext receiver = csession.createReceiver(address); + receiver.flow(1); + + // Send one on the dynamic address + MessageImpl message = (MessageImpl) org.apache.qpid.proton.message.Message.Factory.create(); + + Properties props = new Properties(); + Map map = new HashMap<>(); + + map.put("some-property", 1); + AmqpValue value = new AmqpValue(map); + message.setBody(value); + message.setProperties(props); + sender.send(message); + + ProtonJMessage protonJMessage = receiver.receiveMessage(500, TimeUnit.MILLISECONDS); + Assert.assertNotNull(protonJMessage); + } @Test public void testConnection() throws Exception {