Add some additional tests for dynamic sender / receiver links

This commit is contained in:
Timothy Bish 2016-06-16 15:59:56 -04:00
parent 9ac5f83473
commit 27d955501f
1 changed files with 107 additions and 0 deletions

View File

@ -25,11 +25,13 @@ import static org.junit.Assert.fail;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
@ -248,6 +250,111 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport {
connection.close();
}
@Test(timeout = 60000)
public void TestCreateDynamicQueueSenderAndPublish() throws Exception {
doTestCreateDynamicSenderAndPublish(false);
}
@Test(timeout = 60000)
public void TestCreateDynamicTopicSenderAndPublish() throws Exception {
doTestCreateDynamicSenderAndPublish(true);
}
protected void doTestCreateDynamicSenderAndPublish(boolean topic) throws Exception {
Target target = createDynamicTarget(topic);
final BrokerViewMBean brokerView = getProxyToBroker();
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(target);
assertNotNull(sender);
if (topic) {
assertEquals(1, brokerView.getTemporaryTopics().length);
} else {
assertEquals(1, brokerView.getTemporaryQueues().length);
}
// Get the new address
String address = sender.getSender().getRemoteTarget().getAddress();
LOG.info("New dynamic sender address -> {}", address);
// Create a message and send to a receive that is listening on the newly
// created dynamic link address.
AmqpMessage message = new AmqpMessage();
message.setMessageId("msg-1");
message.setText("Test-Message");
AmqpReceiver receiver = session.createReceiver(address);
receiver.flow(1);
sender.send(message);
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull("Should have read a message", received);
received.accept();
receiver.close();
sender.close();
connection.close();
}
@Test(timeout = 60000)
public void testCreateDynamicReceiverToTopicAndSend() throws Exception {
doTestCreateDynamicSender(true);
}
@Test(timeout = 60000)
public void testCreateDynamicReceiverToQueueAndSend() throws Exception {
doTestCreateDynamicSender(false);
}
protected void doTestCreateDynamicReceiverAndSend(boolean topic) throws Exception {
Source source = createDynamicSource(topic);
final BrokerViewMBean brokerView = getProxyToBroker();
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(source);
assertNotNull(receiver);
if (topic) {
assertEquals(1, brokerView.getTemporaryTopics().length);
} else {
assertEquals(1, brokerView.getTemporaryQueues().length);
}
// Get the new address
String address = receiver.getReceiver().getRemoteSource().getAddress();
LOG.info("New dynamic receiver address -> {}", address);
// Create a message and send to a receive that is listening on the newly
// created dynamic link address.
AmqpMessage message = new AmqpMessage();
message.setMessageId("msg-1");
message.setText("Test-Message");
AmqpSender sender = session.createSender(address);
sender.send(message);
receiver.flow(1);
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull("Should have read a message", received);
received.accept();
sender.close();
receiver.close();
connection.close();
}
protected Source createDynamicSource(boolean topic) {
Source source = new Source();