From 75f18d1e10fc3b6c881bf3a44045e80ca8812288 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Mon, 20 Jun 2016 11:34:49 -0400 Subject: [PATCH] ARTEMIS-585 support send on dynamic sender link The sender abstraction must be able to update its sender address in the case of dynamic senders whose target address is not set until the code initializes the link and creates a destination for it. --- .../proton/plug/AMQPClientSenderContext.java | 3 ++ .../proton/plug/AMQPClientSessionContext.java | 2 + .../AbstractProtonReceiverContext.java | 8 +-- .../context/client/ProtonClientContext.java | 5 +- .../client/ProtonClientSessionContext.java | 51 +++++++++++++++++++ .../server/ProtonServerReceiverContext.java | 8 +-- .../tests/integration/proton/ProtonTest.java | 41 ++++++++++++++- 7 files changed, 105 insertions(+), 13 deletions(-) diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientSenderContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientSenderContext.java index 8c40b9d31c..44d10561e5 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientSenderContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientSenderContext.java @@ -21,4 +21,7 @@ import org.apache.qpid.proton.message.ProtonJMessage; public interface AMQPClientSenderContext { void send(ProtonJMessage message); + + String getAddress(); + } diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientSessionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientSessionContext.java index b5184749e7..44cec7c850 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientSessionContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientSessionContext.java @@ -22,6 +22,8 @@ public interface AMQPClientSessionContext extends AMQPSessionContext { AMQPClientSenderContext createSender(String address, boolean preSettled) throws ActiveMQAMQPException; + AMQPClientSenderContext createDynamicSender(boolean preSettled) throws ActiveMQAMQPException; + AMQPClientReceiverContext createReceiver(String address) throws ActiveMQAMQPException; AMQPClientReceiverContext createReceiver(String name, String address) throws ActiveMQAMQPException; diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java index ffc08d395e..4343b017e2 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java @@ -32,7 +32,7 @@ public abstract class AbstractProtonReceiverContext extends ProtonInitializable protected final Receiver receiver; - protected final String address; + protected String address; protected final AMQPSessionCallback sessionSPI; @@ -43,12 +43,6 @@ public abstract class AbstractProtonReceiverContext extends ProtonInitializable this.connection = connection; this.protonSession = protonSession; this.receiver = receiver; - if (receiver.getRemoteTarget() != null) { - this.address = receiver.getRemoteTarget().getAddress(); - } - else { - this.address = null; - } this.sessionSPI = sessionSPI; } diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientContext.java index e03c99dc35..f442b9efa1 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientContext.java @@ -67,7 +67,10 @@ public class ProtonClientContext extends AbstractProtonContextSender implements Thread.currentThread().interrupt(); return false; } - } + @Override + public String getAddress() { + return sender.getRemoteTarget().getAddress(); + } } diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientSessionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientSessionContext.java index b3e96bb6c2..9079dc3769 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientSessionContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientSessionContext.java @@ -16,8 +16,18 @@ */ package org.proton.plug.context.client; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.Accepted; +import org.apache.qpid.proton.amqp.messaging.DeleteOnClose; +import org.apache.qpid.proton.amqp.messaging.Rejected; import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.amqp.messaging.Target; +import org.apache.qpid.proton.amqp.messaging.TerminusDurability; +import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.engine.Sender; @@ -26,6 +36,7 @@ import org.proton.plug.AMQPClientReceiverContext; import org.proton.plug.AMQPClientSenderContext; import org.proton.plug.AMQPClientSessionContext; import org.proton.plug.AMQPSessionCallback; +import org.proton.plug.AmqpSupport; import org.proton.plug.context.AbstractConnectionContext; import org.proton.plug.context.AbstractProtonSessionContext; import org.proton.plug.exceptions.ActiveMQAMQPException; @@ -62,6 +73,46 @@ public class ProtonClientSessionContext extends AbstractProtonSessionContext imp return amqpSender; } + @Override + public AMQPClientSenderContext createDynamicSender(boolean preSettled) throws ActiveMQAMQPException { + FutureRunnable futureRunnable = new FutureRunnable(1); + + ProtonClientContext amqpSender; + synchronized (connection.getLock()) { + final String senderName = "Dynamic-" + UUID.randomUUID().toString(); + + Sender sender = session.sender(senderName); + sender.setSenderSettleMode(SenderSettleMode.SETTLED); + + Symbol[] outcomes = new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL}; + Source source = new Source(); + source.setAddress(senderName); + source.setOutcomes(outcomes); + + Target target = new Target(); + target.setDynamic(true); + target.setDurable(TerminusDurability.NONE); + target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); + + // Set the dynamic node lifetime-policy + Map dynamicNodeProperties = new HashMap<>(); + dynamicNodeProperties.put(AmqpSupport.LIFETIME_POLICY, DeleteOnClose.getInstance()); + target.setDynamicNodeProperties(dynamicNodeProperties); + + amqpSender = new ProtonClientContext(connection, sender, this, sessionSPI); + amqpSender.afterInit(futureRunnable); + sender.setSource(source); + sender.setTarget(target); + sender.setContext(amqpSender); + sender.open(); + } + + connection.flush(); + + waitWithTimeout(futureRunnable); + return amqpSender; + } + @Override public AMQPClientReceiverContext createReceiver(String address) throws ActiveMQAMQPException { return createReceiver(address, address); diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java index e3cbb3bb16..aa04cefe0b 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java @@ -61,20 +61,20 @@ public class ProtonServerReceiverContext extends AbstractProtonReceiverContext { if (target.getDynamic()) { //if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and // will be deleted on closing of the session - String queue = sessionSPI.tempQueueName(); + address = sessionSPI.tempQueueName(); try { - sessionSPI.createTemporaryQueue(queue); + sessionSPI.createTemporaryQueue(address); } catch (Exception e) { throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e); } - target.setAddress(queue); + target.setAddress(address); } else { //if not dynamic then we use the targets address as the address to forward the messages to, however there has to //be a queue bound to it so we nee to check this. - String address = target.getAddress(); + address = target.getAddress(); if (address == null) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.targetAddressNotSet(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java index 9534681536..d803e9e70a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java @@ -39,6 +39,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Enumeration; import java.util.HashMap; +import java.util.Map; import java.util.Random; import java.util.concurrent.TimeUnit; @@ -51,7 +52,10 @@ import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.qpid.jms.JmsConnectionFactory; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.amqp.messaging.Properties; import org.apache.qpid.proton.message.ProtonJMessage; +import org.apache.qpid.proton.message.impl.MessageImpl; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -60,6 +64,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.proton.plug.AMQPClientConnectionContext; import org.proton.plug.AMQPClientReceiverContext; +import org.proton.plug.AMQPClientSenderContext; import org.proton.plug.AMQPClientSessionContext; import org.proton.plug.test.Constants; import org.proton.plug.test.minimalclient.SimpleAMQPConnector; @@ -196,7 +201,7 @@ public class ProtonTest extends ActiveMQTestBase { } @Test - public void testReplyToNonJMS() throws Throwable { + public void testReplyToNonJMS() throws Throwable { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); TemporaryQueue queue = session.createTemporaryQueue(); @@ -351,6 +356,40 @@ public class ProtonTest extends ActiveMQTestBase { } } + @Test + public void testDynamicSenderLink() throws Exception { + + if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol + + SimpleAMQPConnector connector = new SimpleAMQPConnector(); + connector.start(); + AMQPClientConnectionContext clientConnection = connector.connect("127.0.0.1", Constants.PORT); + + clientConnection.clientOpen(null); + + AMQPClientSessionContext csession = clientConnection.createClientSession(); + AMQPClientSenderContext sender = csession.createDynamicSender(false); + + String address = sender.getAddress(); + + AMQPClientReceiverContext receiver = csession.createReceiver(address); + receiver.flow(1); + + // Send one on the dynamic address + MessageImpl message = (MessageImpl) org.apache.qpid.proton.message.Message.Factory.create(); + + Properties props = new Properties(); + Map map = new HashMap<>(); + + map.put("some-property", 1); + AmqpValue value = new AmqpValue(map); + message.setBody(value); + message.setProperties(props); + sender.send(message); + + ProtonJMessage protonJMessage = receiver.receiveMessage(500, TimeUnit.MILLISECONDS); + Assert.assertNotNull(protonJMessage); + } @Test public void testConnection() throws Exception {