Fill in the source / target created for the requested dynamic node with
the actual attributes we are going to support.
This commit is contained in:
Timothy Bish 2016-10-14 15:00:49 -04:00
parent d9d1ae73eb
commit 10fc397ab7
4 changed files with 73 additions and 5 deletions

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -44,6 +44,8 @@ public class AmqpSupport {
// Capabilities used to identify destination type in some requests.
public static final Symbol TEMP_QUEUE_CAPABILITY = Symbol.valueOf("temporary-queue");
public static final Symbol TEMP_TOPIC_CAPABILITY = Symbol.valueOf("temporary-topic");
public static final Symbol QUEUE_CAPABILITY = Symbol.valueOf("queue");
public static final Symbol TOPIC_CAPABILITY = Symbol.valueOf("topic");
// Symbols used to announce connection information to remote peer.
public static final Symbol INVALID_FIELD = Symbol.valueOf("invalid-field");
@ -214,4 +216,28 @@ public class AmqpSupport {
throw new RuntimeException("Unexpected terminus type: " + endpoint);
}
}
/**
* Given an ActiveMQDestination return the proper Capability value for the concrete destination type.
*
* @param destination
* The ActiveMQDestination whose capability is being requested.
*
* @return a Symbol that matches the defined Capability value for the ActiveMQDestiantion.
*/
public static Symbol getDestinationTypeSymbol(ActiveMQDestination destination) {
if (destination.isQueue()) {
if (destination.isTemporary()) {
return TEMP_QUEUE_CAPABILITY;
} else {
return QUEUE_CAPABILITY;
}
} else {
if (destination.isTemporary()) {
return TEMP_TOPIC_CAPABILITY;
} else {
return TOPIC_CAPABILITY;
}
}
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -19,6 +19,7 @@ package org.apache.activemq.transport.amqp.protocol;
import static org.apache.activemq.transport.amqp.AmqpSupport.COPY;
import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS;
import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_NAME;
import static org.apache.activemq.transport.amqp.AmqpSupport.LIFETIME_POLICY;
import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS;
import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_NAME;
import static org.apache.activemq.transport.amqp.AmqpSupport.createDestination;
@ -46,10 +47,12 @@ import org.apache.activemq.command.TransactionId;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
import org.apache.activemq.transport.amqp.AmqpProtocolException;
import org.apache.activemq.transport.amqp.AmqpSupport;
import org.apache.activemq.transport.amqp.ResponseHandler;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.DeleteOnClose;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
@ -186,9 +189,17 @@ public class AmqpSession implements AmqpResource {
if (target.getDynamic()) {
destination = connection.createTemporaryDestination(protonReceiver, target.getCapabilities());
Map<Symbol, Object> dynamicNodeProperties = new HashMap<Symbol, Object>();
dynamicNodeProperties.put(LIFETIME_POLICY, DeleteOnClose.getInstance());
// Currently we only support temporary destinations with delete on close lifetime policy.
Target actualTarget = new Target();
actualTarget.setAddress(destination.getQualifiedName());
actualTarget.setCapabilities(AmqpSupport.getDestinationTypeSymbol(destination));
actualTarget.setDynamic(true);
actualTarget.setDynamicNodeProperties(dynamicNodeProperties);
protonReceiver.setTarget(actualTarget);
receiver.addCloseAction(new Runnable() {
@ -298,11 +309,18 @@ public class AmqpSession implements AmqpResource {
return;
}
} else if (source.getDynamic()) {
// lets create a temp dest.
destination = connection.createTemporaryDestination(protonSender, source.getCapabilities());
Map<Symbol, Object> dynamicNodeProperties = new HashMap<Symbol, Object>();
dynamicNodeProperties.put(LIFETIME_POLICY, DeleteOnClose.getInstance());
// Currently we only support temporary destinations with delete on close lifetime policy.
source = new org.apache.qpid.proton.amqp.messaging.Source();
source.setAddress(destination.getQualifiedName());
source.setCapabilities(AmqpSupport.getDestinationTypeSymbol(destination));
source.setDynamic(true);
source.setDynamicNodeProperties(dynamicNodeProperties);
sender.addCloseAction(new Runnable() {
@Override

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -21,6 +21,7 @@ import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_QUEUE_CAPABILI
import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_TOPIC_CAPABILITY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.HashMap;
@ -28,6 +29,7 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.transport.amqp.AmqpSupport;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
@ -120,6 +122,7 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport {
doTestCreateDynamicSender(false);
}
@SuppressWarnings("unchecked")
protected void doTestCreateDynamicSender(boolean topic) throws Exception {
Target target = createDynamicTarget(topic);
@ -132,10 +135,20 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport {
AmqpSender sender = session.createSender(target);
assertNotNull(sender);
Target remoteTarget = (Target) sender.getEndpoint().getRemoteTarget();
Map<Symbol, Object> dynamicNodeProperties = remoteTarget.getDynamicNodeProperties();
Symbol[] capabilites = remoteTarget.getCapabilities();
assertTrue(Boolean.TRUE.equals(remoteTarget.getDynamic()));
assertTrue(dynamicNodeProperties.containsKey(LIFETIME_POLICY));
assertEquals(DeleteOnClose.getInstance(), dynamicNodeProperties.get(LIFETIME_POLICY));
if (topic) {
assertEquals(1, brokerView.getTemporaryTopics().length);
assertTrue(AmqpSupport.contains(capabilites, TEMP_TOPIC_CAPABILITY));
} else {
assertEquals(1, brokerView.getTemporaryQueues().length);
assertTrue(AmqpSupport.contains(capabilites, TEMP_QUEUE_CAPABILITY));
}
connection.close();
@ -190,6 +203,7 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport {
doTestCreateDynamicSender(false);
}
@SuppressWarnings("unchecked")
protected void doTestCreateDynamicReceiver(boolean topic) throws Exception {
Source source = createDynamicSource(topic);
@ -202,10 +216,20 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport {
AmqpReceiver receiver = session.createReceiver(source);
assertNotNull(receiver);
Source remoteSource = (Source) receiver.getEndpoint().getRemoteSource();
Map<Symbol, Object> dynamicNodeProperties = remoteSource.getDynamicNodeProperties();
Symbol[] capabilites = remoteSource.getCapabilities();
assertTrue(Boolean.TRUE.equals(remoteSource.getDynamic()));
assertTrue(dynamicNodeProperties.containsKey(LIFETIME_POLICY));
assertEquals(DeleteOnClose.getInstance(), dynamicNodeProperties.get(LIFETIME_POLICY));
if (topic) {
assertEquals(1, brokerView.getTemporaryTopics().length);
assertTrue(AmqpSupport.contains(capabilites, TEMP_TOPIC_CAPABILITY));
} else {
assertEquals(1, brokerView.getTemporaryQueues().length);
assertTrue(AmqpSupport.contains(capabilites, TEMP_QUEUE_CAPABILITY));
}
connection.close();