ARTEMIS-585 support send on dynamic sender link

The sender abstraction must be able to update its sender address in the
case of dynamic senders whose target address is not set until the code
initializes the link and creates a destination for it.
This commit is contained in:
Timothy Bish 2016-06-20 11:34:49 -04:00
parent 372528d841
commit 75f18d1e10
7 changed files with 105 additions and 13 deletions

View File

@ -21,4 +21,7 @@ import org.apache.qpid.proton.message.ProtonJMessage;
public interface AMQPClientSenderContext {
void send(ProtonJMessage message);
String getAddress();
}

View File

@ -22,6 +22,8 @@ public interface AMQPClientSessionContext extends AMQPSessionContext {
AMQPClientSenderContext createSender(String address, boolean preSettled) throws ActiveMQAMQPException;
AMQPClientSenderContext createDynamicSender(boolean preSettled) throws ActiveMQAMQPException;
AMQPClientReceiverContext createReceiver(String address) throws ActiveMQAMQPException;
AMQPClientReceiverContext createReceiver(String name, String address) throws ActiveMQAMQPException;

View File

@ -32,7 +32,7 @@ public abstract class AbstractProtonReceiverContext extends ProtonInitializable
protected final Receiver receiver;
protected final String address;
protected String address;
protected final AMQPSessionCallback sessionSPI;
@ -43,12 +43,6 @@ public abstract class AbstractProtonReceiverContext extends ProtonInitializable
this.connection = connection;
this.protonSession = protonSession;
this.receiver = receiver;
if (receiver.getRemoteTarget() != null) {
this.address = receiver.getRemoteTarget().getAddress();
}
else {
this.address = null;
}
this.sessionSPI = sessionSPI;
}

View File

@ -67,7 +67,10 @@ public class ProtonClientContext extends AbstractProtonContextSender implements
Thread.currentThread().interrupt();
return false;
}
}
@Override
public String getAddress() {
return sender.getRemoteTarget().getAddress();
}
}

View File

@ -16,8 +16,18 @@
*/
package org.proton.plug.context.client;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.DeleteOnClose;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
@ -26,6 +36,7 @@ import org.proton.plug.AMQPClientReceiverContext;
import org.proton.plug.AMQPClientSenderContext;
import org.proton.plug.AMQPClientSessionContext;
import org.proton.plug.AMQPSessionCallback;
import org.proton.plug.AmqpSupport;
import org.proton.plug.context.AbstractConnectionContext;
import org.proton.plug.context.AbstractProtonSessionContext;
import org.proton.plug.exceptions.ActiveMQAMQPException;
@ -62,6 +73,46 @@ public class ProtonClientSessionContext extends AbstractProtonSessionContext imp
return amqpSender;
}
@Override
public AMQPClientSenderContext createDynamicSender(boolean preSettled) throws ActiveMQAMQPException {
FutureRunnable futureRunnable = new FutureRunnable(1);
ProtonClientContext amqpSender;
synchronized (connection.getLock()) {
final String senderName = "Dynamic-" + UUID.randomUUID().toString();
Sender sender = session.sender(senderName);
sender.setSenderSettleMode(SenderSettleMode.SETTLED);
Symbol[] outcomes = new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL};
Source source = new Source();
source.setAddress(senderName);
source.setOutcomes(outcomes);
Target target = new Target();
target.setDynamic(true);
target.setDurable(TerminusDurability.NONE);
target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
// Set the dynamic node lifetime-policy
Map<Symbol, Object> dynamicNodeProperties = new HashMap<>();
dynamicNodeProperties.put(AmqpSupport.LIFETIME_POLICY, DeleteOnClose.getInstance());
target.setDynamicNodeProperties(dynamicNodeProperties);
amqpSender = new ProtonClientContext(connection, sender, this, sessionSPI);
amqpSender.afterInit(futureRunnable);
sender.setSource(source);
sender.setTarget(target);
sender.setContext(amqpSender);
sender.open();
}
connection.flush();
waitWithTimeout(futureRunnable);
return amqpSender;
}
@Override
public AMQPClientReceiverContext createReceiver(String address) throws ActiveMQAMQPException {
return createReceiver(address, address);

View File

@ -61,20 +61,20 @@ public class ProtonServerReceiverContext extends AbstractProtonReceiverContext {
if (target.getDynamic()) {
//if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and
// will be deleted on closing of the session
String queue = sessionSPI.tempQueueName();
address = sessionSPI.tempQueueName();
try {
sessionSPI.createTemporaryQueue(queue);
sessionSPI.createTemporaryQueue(address);
}
catch (Exception e) {
throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
}
target.setAddress(queue);
target.setAddress(address);
}
else {
//if not dynamic then we use the targets address as the address to forward the messages to, however there has to
//be a queue bound to it so we nee to check this.
String address = target.getAddress();
address = target.getAddress();
if (address == null) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.targetAddressNotSet();
}

View File

@ -39,6 +39,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
@ -51,7 +52,10 @@ import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.message.ProtonJMessage;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -60,6 +64,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.proton.plug.AMQPClientConnectionContext;
import org.proton.plug.AMQPClientReceiverContext;
import org.proton.plug.AMQPClientSenderContext;
import org.proton.plug.AMQPClientSessionContext;
import org.proton.plug.test.Constants;
import org.proton.plug.test.minimalclient.SimpleAMQPConnector;
@ -351,6 +356,40 @@ public class ProtonTest extends ActiveMQTestBase {
}
}
@Test
public void testDynamicSenderLink() throws Exception {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
SimpleAMQPConnector connector = new SimpleAMQPConnector();
connector.start();
AMQPClientConnectionContext clientConnection = connector.connect("127.0.0.1", Constants.PORT);
clientConnection.clientOpen(null);
AMQPClientSessionContext csession = clientConnection.createClientSession();
AMQPClientSenderContext sender = csession.createDynamicSender(false);
String address = sender.getAddress();
AMQPClientReceiverContext receiver = csession.createReceiver(address);
receiver.flow(1);
// Send one on the dynamic address
MessageImpl message = (MessageImpl) org.apache.qpid.proton.message.Message.Factory.create();
Properties props = new Properties();
Map<Object, Object> map = new HashMap<>();
map.put("some-property", 1);
AmqpValue value = new AmqpValue(map);
message.setBody(value);
message.setProperties(props);
sender.send(message);
ProtonJMessage protonJMessage = receiver.receiveMessage(500, TimeUnit.MILLISECONDS);
Assert.assertNotNull(protonJMessage);
}
@Test
public void testConnection() throws Exception {