This closes #593

This commit is contained in:
Clebert Suconic 2016-06-21 16:53:13 -04:00
commit 5c4fab3fcc
7 changed files with 105 additions and 13 deletions

View File

@ -21,4 +21,7 @@ import org.apache.qpid.proton.message.ProtonJMessage;
public interface AMQPClientSenderContext { public interface AMQPClientSenderContext {
void send(ProtonJMessage message); void send(ProtonJMessage message);
String getAddress();
} }

View File

@ -22,6 +22,8 @@ public interface AMQPClientSessionContext extends AMQPSessionContext {
AMQPClientSenderContext createSender(String address, boolean preSettled) throws ActiveMQAMQPException; AMQPClientSenderContext createSender(String address, boolean preSettled) throws ActiveMQAMQPException;
AMQPClientSenderContext createDynamicSender(boolean preSettled) throws ActiveMQAMQPException;
AMQPClientReceiverContext createReceiver(String address) throws ActiveMQAMQPException; AMQPClientReceiverContext createReceiver(String address) throws ActiveMQAMQPException;
AMQPClientReceiverContext createReceiver(String name, String address) throws ActiveMQAMQPException; AMQPClientReceiverContext createReceiver(String name, String address) throws ActiveMQAMQPException;

View File

@ -32,7 +32,7 @@ public abstract class AbstractProtonReceiverContext extends ProtonInitializable
protected final Receiver receiver; protected final Receiver receiver;
protected final String address; protected String address;
protected final AMQPSessionCallback sessionSPI; protected final AMQPSessionCallback sessionSPI;
@ -43,12 +43,6 @@ public abstract class AbstractProtonReceiverContext extends ProtonInitializable
this.connection = connection; this.connection = connection;
this.protonSession = protonSession; this.protonSession = protonSession;
this.receiver = receiver; this.receiver = receiver;
if (receiver.getRemoteTarget() != null) {
this.address = receiver.getRemoteTarget().getAddress();
}
else {
this.address = null;
}
this.sessionSPI = sessionSPI; this.sessionSPI = sessionSPI;
} }

View File

@ -67,7 +67,10 @@ public class ProtonClientContext extends AbstractProtonContextSender implements
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
return false; return false;
} }
} }
@Override
public String getAddress() {
return sender.getRemoteTarget().getAddress();
}
} }

View File

@ -16,8 +16,18 @@
*/ */
package org.proton.plug.context.client; package org.proton.plug.context.client;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.DeleteOnClose;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target; import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender; import org.apache.qpid.proton.engine.Sender;
@ -26,6 +36,7 @@ import org.proton.plug.AMQPClientReceiverContext;
import org.proton.plug.AMQPClientSenderContext; import org.proton.plug.AMQPClientSenderContext;
import org.proton.plug.AMQPClientSessionContext; import org.proton.plug.AMQPClientSessionContext;
import org.proton.plug.AMQPSessionCallback; import org.proton.plug.AMQPSessionCallback;
import org.proton.plug.AmqpSupport;
import org.proton.plug.context.AbstractConnectionContext; import org.proton.plug.context.AbstractConnectionContext;
import org.proton.plug.context.AbstractProtonSessionContext; import org.proton.plug.context.AbstractProtonSessionContext;
import org.proton.plug.exceptions.ActiveMQAMQPException; import org.proton.plug.exceptions.ActiveMQAMQPException;
@ -62,6 +73,46 @@ public class ProtonClientSessionContext extends AbstractProtonSessionContext imp
return amqpSender; return amqpSender;
} }
@Override
public AMQPClientSenderContext createDynamicSender(boolean preSettled) throws ActiveMQAMQPException {
FutureRunnable futureRunnable = new FutureRunnable(1);
ProtonClientContext amqpSender;
synchronized (connection.getLock()) {
final String senderName = "Dynamic-" + UUID.randomUUID().toString();
Sender sender = session.sender(senderName);
sender.setSenderSettleMode(SenderSettleMode.SETTLED);
Symbol[] outcomes = new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL};
Source source = new Source();
source.setAddress(senderName);
source.setOutcomes(outcomes);
Target target = new Target();
target.setDynamic(true);
target.setDurable(TerminusDurability.NONE);
target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
// Set the dynamic node lifetime-policy
Map<Symbol, Object> dynamicNodeProperties = new HashMap<>();
dynamicNodeProperties.put(AmqpSupport.LIFETIME_POLICY, DeleteOnClose.getInstance());
target.setDynamicNodeProperties(dynamicNodeProperties);
amqpSender = new ProtonClientContext(connection, sender, this, sessionSPI);
amqpSender.afterInit(futureRunnable);
sender.setSource(source);
sender.setTarget(target);
sender.setContext(amqpSender);
sender.open();
}
connection.flush();
waitWithTimeout(futureRunnable);
return amqpSender;
}
@Override @Override
public AMQPClientReceiverContext createReceiver(String address) throws ActiveMQAMQPException { public AMQPClientReceiverContext createReceiver(String address) throws ActiveMQAMQPException {
return createReceiver(address, address); return createReceiver(address, address);

View File

@ -61,20 +61,20 @@ public class ProtonServerReceiverContext extends AbstractProtonReceiverContext {
if (target.getDynamic()) { if (target.getDynamic()) {
//if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and //if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and
// will be deleted on closing of the session // will be deleted on closing of the session
String queue = sessionSPI.tempQueueName(); address = sessionSPI.tempQueueName();
try { try {
sessionSPI.createTemporaryQueue(queue); sessionSPI.createTemporaryQueue(address);
} }
catch (Exception e) { catch (Exception e) {
throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e); throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
} }
target.setAddress(queue); target.setAddress(address);
} }
else { else {
//if not dynamic then we use the targets address as the address to forward the messages to, however there has to //if not dynamic then we use the targets address as the address to forward the messages to, however there has to
//be a queue bound to it so we nee to check this. //be a queue bound to it so we nee to check this.
String address = target.getAddress(); address = target.getAddress();
if (address == null) { if (address == null) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.targetAddressNotSet(); throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.targetAddressNotSet();
} }

View File

@ -39,6 +39,7 @@ import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Enumeration; import java.util.Enumeration;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -51,7 +52,10 @@ import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.qpid.jms.JmsConnectionFactory; import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.message.ProtonJMessage; import org.apache.qpid.proton.message.ProtonJMessage;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -60,6 +64,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import org.proton.plug.AMQPClientConnectionContext; import org.proton.plug.AMQPClientConnectionContext;
import org.proton.plug.AMQPClientReceiverContext; import org.proton.plug.AMQPClientReceiverContext;
import org.proton.plug.AMQPClientSenderContext;
import org.proton.plug.AMQPClientSessionContext; import org.proton.plug.AMQPClientSessionContext;
import org.proton.plug.test.Constants; import org.proton.plug.test.Constants;
import org.proton.plug.test.minimalclient.SimpleAMQPConnector; import org.proton.plug.test.minimalclient.SimpleAMQPConnector;
@ -196,7 +201,7 @@ public class ProtonTest extends ActiveMQTestBase {
} }
@Test @Test
public void testReplyToNonJMS() throws Throwable { public void testReplyToNonJMS() throws Throwable {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue queue = session.createTemporaryQueue(); TemporaryQueue queue = session.createTemporaryQueue();
@ -351,6 +356,40 @@ public class ProtonTest extends ActiveMQTestBase {
} }
} }
@Test
public void testDynamicSenderLink() throws Exception {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
SimpleAMQPConnector connector = new SimpleAMQPConnector();
connector.start();
AMQPClientConnectionContext clientConnection = connector.connect("127.0.0.1", Constants.PORT);
clientConnection.clientOpen(null);
AMQPClientSessionContext csession = clientConnection.createClientSession();
AMQPClientSenderContext sender = csession.createDynamicSender(false);
String address = sender.getAddress();
AMQPClientReceiverContext receiver = csession.createReceiver(address);
receiver.flow(1);
// Send one on the dynamic address
MessageImpl message = (MessageImpl) org.apache.qpid.proton.message.Message.Factory.create();
Properties props = new Properties();
Map<Object, Object> map = new HashMap<>();
map.put("some-property", 1);
AmqpValue value = new AmqpValue(map);
message.setBody(value);
message.setProperties(props);
sender.send(message);
ProtonJMessage protonJMessage = receiver.receiveMessage(500, TimeUnit.MILLISECONDS);
Assert.assertNotNull(protonJMessage);
}
@Test @Test
public void testConnection() throws Exception { public void testConnection() throws Exception {