Create some tests that exercise creating temp destinations using sender
links with dynamic targets
This commit is contained in:
Timothy Bish 2015-03-18 18:12:02 -04:00
parent 4228e3d3e8
commit 8e6a404d5e
5 changed files with 211 additions and 7 deletions

View File

@ -52,6 +52,9 @@ public class AmqpSupport {
// Symbols used in configuration of newly opened links.
public static final Symbol COPY = Symbol.getSymbol("copy");
// Lifetime policy symbols
public static final Symbol DYNAMIC_NODE_LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
/**
* Search for a given Symbol in a given array of Symbol object.
*

View File

@ -71,12 +71,12 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
private final AmqpSession session;
private final String address;
private final String receiverId;
private final Source userSpecifiedSource;
private String subscriptionName;
private String selector;
private boolean presettle;
private boolean noLocal;
private Source userSpecifiedSource;
/**
* Create a new receiver instance.
@ -94,6 +94,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
throw new IllegalArgumentException("Address cannot be empty.");
}
this.userSpecifiedSource = null;
this.session = session;
this.address = address;
this.receiverId = receiverId;
@ -454,7 +455,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
Source source = userSpecifiedSource;
Target target = new Target();
if (userSpecifiedSource == null && address != null) {
if (source == null && address != null) {
source = new Source();
source.setAddress(address);
configureSource(source);

View File

@ -64,6 +64,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
private final AmqpSession session;
private final String address;
private final String senderId;
private final Target userSpecifiedTarget;
private boolean presettle;
private long sendTimeout = DEFAULT_SEND_TIMEOUT;
@ -82,9 +83,37 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
* The unique ID assigned to this sender.
*/
public AmqpSender(AmqpSession session, String address, String senderId) {
if (address != null && address.isEmpty()) {
throw new IllegalArgumentException("Address cannot be empty.");
}
this.session = session;
this.address = address;
this.senderId = senderId;
this.userSpecifiedTarget = null;
}
/**
* Create a new sender instance using the given Target when creating the link.
*
* @param session
* The parent session that created the session.
* @param address
* The address that this sender produces to.
* @param senderId
* The unique ID assigned to this sender.
*/
public AmqpSender(AmqpSession session, Target target, String senderId) {
if (target == null) {
throw new IllegalArgumentException("User specified Target cannot be null");
}
this.session = session;
this.userSpecifiedTarget = target;
this.address = target.getAddress();
this.senderId = senderId;
}
/**
@ -216,8 +245,11 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
source.setAddress(senderId);
source.setOutcomes(outcomes);
Target target = new Target();
target.setAddress(address);
Target target = userSpecifiedTarget;
if (target == null) {
target = new Target();
target.setAddress(address);
}
String senderName = senderId + ":" + address;

View File

@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.transport.amqp.client.util.ClientFuture;
import org.apache.activemq.transport.amqp.client.util.UnmodifiableSession;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Session;
@ -81,6 +82,38 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
return sender;
}
/**
* Create a sender instance using the given Target
*
* @param target
* the caller created and configured Traget used to create the sender link.
*
* @return a newly created sender that is ready for use.
*
* @throws Exception if an error occurs while creating the receiver.
*/
public AmqpSender createSender(Target target) throws Exception {
checkClosed();
final AmqpSender sender = new AmqpSender(AmqpSession.this, target, getNextSenderId());
final ClientFuture request = new ClientFuture();
connection.getScheduler().execute(new Runnable() {
@Override
public void run() {
checkClosed();
sender.setStateInspector(getStateInspector());
sender.open(request);
pumpToProtonTransport();
}
});
request.sync();
return sender;
}
/**
* Create a receiver instance using the given address
*
@ -153,10 +186,8 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
}
/**
* Create a receiver instance using the given address
* Create a receiver instance using the given Source
*
* @param address
* the address to which the receiver will subscribe for its messages.
* @param source
* the caller created and configured Source used to create the receiver link.
*

View File

@ -0,0 +1,137 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.interop;
import static org.apache.activemq.transport.amqp.AmqpSupport.DYNAMIC_NODE_LIFETIME_POLICY;
import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_QUEUE_CAPABILITY;
import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_TOPIC_CAPABILITY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.DeleteOnClose;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.junit.Test;
/**
* Tests for JMS temporary destination mappings to AMQP
*/
public class AmqpTempDestinationTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testCreateDynamicSenderToTopic() throws Exception {
doTestCreateDynamicSender(true);
}
@Test(timeout = 60000)
public void testCreateDynamicSenderToQueue() throws Exception {
doTestCreateDynamicSender(false);
}
protected void doTestCreateDynamicSender(boolean topic) throws Exception {
Target target = createDynamicTarget(topic);
final BrokerViewMBean brokerView = getProxyToBroker();
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(target);
assertNotNull(sender);
if (topic) {
assertEquals(1, brokerView.getTemporaryTopics().length);
} else {
assertEquals(1, brokerView.getTemporaryQueues().length);
}
connection.close();
}
@Test(timeout = 60000)
public void testDynamicSenderLifetimeBoundToLinkTopic() throws Exception {
doTestDynamicSenderLifetimeBoundToLinkQueue(true);
}
@Test(timeout = 60000)
public void testDynamicSenderLifetimeBoundToLinkQueue() throws Exception {
doTestDynamicSenderLifetimeBoundToLinkQueue(false);
}
protected void doTestDynamicSenderLifetimeBoundToLinkQueue(boolean topic) throws Exception {
Target target = createDynamicTarget(topic);
final BrokerViewMBean brokerView = getProxyToBroker();
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(target);
assertNotNull(sender);
if (topic) {
assertEquals(1, brokerView.getTemporaryTopics().length);
} else {
assertEquals(1, brokerView.getTemporaryQueues().length);
}
sender.close();
if (topic) {
assertEquals(0, brokerView.getTemporaryTopics().length);
} else {
assertEquals(0, brokerView.getTemporaryQueues().length);
}
connection.close();
}
protected Target createDynamicTarget(boolean topic) {
Target target = new Target();
target.setDynamic(true);
target.setDurable(TerminusDurability.NONE);
target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
// Set the dynamic node lifetime-policy
Map<Symbol, Object> dynamicNodeProperties = new HashMap<Symbol, Object>();
dynamicNodeProperties.put(DYNAMIC_NODE_LIFETIME_POLICY, DeleteOnClose.getInstance());
target.setDynamicNodeProperties(dynamicNodeProperties);
// Set the capability to indicate the node type being created
if (!topic) {
target.setCapabilities(TEMP_QUEUE_CAPABILITY);
} else {
target.setCapabilities(TEMP_TOPIC_CAPABILITY);
}
return target;
}
}