mirror of https://github.com/apache/activemq.git
Adds a basic request / response test using temp topic and temp queue for
reply to destination.
This commit is contained in:
parent
37eb6b0c6e
commit
ddf0b2a309
|
@ -0,0 +1,223 @@
|
|||
package org.apache.activemq.transport.amqp;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Vector;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.ExceptionListener;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageListener;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
import javax.jms.Topic;
|
||||
|
||||
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
|
||||
import org.junit.After;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TestName;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class JmsClientRequestResponseTest extends AmqpTestSupport implements MessageListener {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(JmsClientRequestResponseTest.class);
|
||||
|
||||
@Rule public TestName name = new TestName();
|
||||
|
||||
private Connection requestorConnection;
|
||||
private Destination requestDestination;
|
||||
private Session requestorSession;
|
||||
|
||||
private Connection responderConnection;
|
||||
private MessageProducer responseProducer;
|
||||
private Session responderSession;
|
||||
private Destination replyDestination;
|
||||
|
||||
private final List<JMSException> failures = new Vector<JMSException>();
|
||||
private boolean dynamicallyCreateProducer;
|
||||
private final boolean useAsyncConsumer = true;
|
||||
private Thread syncThread;
|
||||
|
||||
@Override
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
requestorConnection.close();
|
||||
responderConnection.close();
|
||||
|
||||
if (syncThread != null) {
|
||||
syncThread.join(5000);
|
||||
}
|
||||
|
||||
super.tearDown();
|
||||
}
|
||||
|
||||
private void doSetupConnections(boolean topic) throws Exception {
|
||||
responderConnection = createConnection(name.getMethodName() + "-responder");
|
||||
responderSession = responderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
if (topic) {
|
||||
requestDestination = responderSession.createTopic(name.getMethodName());
|
||||
} else {
|
||||
requestDestination = responderSession.createQueue(name.getMethodName());
|
||||
}
|
||||
responseProducer = responderSession.createProducer(null);
|
||||
|
||||
final MessageConsumer requestConsumer = responderSession.createConsumer(requestDestination);
|
||||
if (useAsyncConsumer) {
|
||||
requestConsumer.setMessageListener(this);
|
||||
} else {
|
||||
syncThread = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
syncConsumeLoop(requestConsumer);
|
||||
}
|
||||
});
|
||||
syncThread.start();
|
||||
}
|
||||
responderConnection.start();
|
||||
|
||||
requestorConnection = createConnection(name.getMethodName() + "-requestor");
|
||||
requestorSession = requestorConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
if (topic) {
|
||||
replyDestination = requestorSession.createTemporaryTopic();
|
||||
} else {
|
||||
replyDestination = requestorSession.createTemporaryQueue();
|
||||
}
|
||||
requestorConnection.start();
|
||||
}
|
||||
|
||||
@Test(timeout=60000)
|
||||
public void testRequestResponseToTempQueue() throws Exception {
|
||||
doSetupConnections(false);
|
||||
doTestRequestResponse();
|
||||
}
|
||||
|
||||
@Test(timeout=60000)
|
||||
public void testRequestResponseToTempTopic() throws Exception {
|
||||
doSetupConnections(true);
|
||||
doTestRequestResponse();
|
||||
}
|
||||
|
||||
private void doTestRequestResponse() throws Exception {
|
||||
|
||||
MessageProducer requestProducer = requestorSession.createProducer(requestDestination);
|
||||
MessageConsumer replyConsumer = requestorSession.createConsumer(replyDestination);
|
||||
|
||||
TextMessage requestMessage = requestorSession.createTextMessage("SomeRequest");
|
||||
requestMessage.setJMSReplyTo(replyDestination);
|
||||
requestProducer.send(requestMessage);
|
||||
|
||||
LOG.info("Sent request to destination: {}", requestDestination.toString());
|
||||
|
||||
Message msg = replyConsumer.receive(10000);
|
||||
|
||||
if (msg instanceof TextMessage) {
|
||||
TextMessage replyMessage = (TextMessage)msg;
|
||||
LOG.info("Received reply.");
|
||||
LOG.info(replyMessage.toString());
|
||||
assertTrue("Wrong message content", replyMessage.getText().startsWith("response"));
|
||||
} else {
|
||||
fail("Should have received a reply by now");
|
||||
}
|
||||
replyConsumer.close();
|
||||
|
||||
assertEquals("Should not have had any failures: " + failures, 0, failures.size());
|
||||
}
|
||||
|
||||
/**
|
||||
* Can be overridden in subclasses to test against a different transport suchs as NIO.
|
||||
*
|
||||
* @return the port to connect to on the Broker.
|
||||
*/
|
||||
protected int getBrokerPort() {
|
||||
return port;
|
||||
}
|
||||
|
||||
private Connection createConnection(String clientId) throws JMSException {
|
||||
return createConnection(clientId, false, false);
|
||||
}
|
||||
|
||||
protected Connection createConnection(String clientId, boolean syncPublish, boolean useSsl) throws JMSException {
|
||||
|
||||
int brokerPort = getBrokerPort();
|
||||
LOG.debug("Creating connection on port {}", brokerPort);
|
||||
final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", brokerPort, "admin", "password", null, useSsl);
|
||||
|
||||
factory.setSyncPublish(syncPublish);
|
||||
factory.setTopicPrefix("topic://");
|
||||
factory.setQueuePrefix("queue://");
|
||||
|
||||
final Connection connection = factory.createConnection();
|
||||
if (clientId != null && !clientId.isEmpty()) {
|
||||
connection.setClientID(clientId);
|
||||
}
|
||||
connection.setExceptionListener(new ExceptionListener() {
|
||||
@Override
|
||||
public void onException(JMSException exception) {
|
||||
exception.printStackTrace();
|
||||
}
|
||||
});
|
||||
connection.start();
|
||||
return connection;
|
||||
}
|
||||
|
||||
protected void syncConsumeLoop(MessageConsumer requestConsumer) {
|
||||
try {
|
||||
Message message = requestConsumer.receive(5000);
|
||||
if (message != null) {
|
||||
onMessage(message);
|
||||
} else {
|
||||
LOG.error("No message received");
|
||||
}
|
||||
} catch (JMSException e) {
|
||||
onException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(Message message) {
|
||||
try {
|
||||
TextMessage requestMessage = (TextMessage)message;
|
||||
|
||||
LOG.info("Received request.");
|
||||
LOG.info(requestMessage.toString());
|
||||
|
||||
Destination replyDestination = requestMessage.getJMSReplyTo();
|
||||
if (replyDestination instanceof Topic) {
|
||||
LOG.info("Reply destination is: {}", ((Topic)replyDestination).getTopicName());
|
||||
} else {
|
||||
LOG.info("Reply destination is: {}", ((Queue)replyDestination).getQueueName());
|
||||
}
|
||||
|
||||
TextMessage replyMessage = responderSession.createTextMessage("response for: " + requestMessage.getText());
|
||||
replyMessage.setJMSCorrelationID(requestMessage.getJMSMessageID());
|
||||
|
||||
if (dynamicallyCreateProducer) {
|
||||
responseProducer = responderSession.createProducer(replyDestination);
|
||||
responseProducer.send(replyMessage);
|
||||
} else {
|
||||
responseProducer.send(replyDestination, replyMessage);
|
||||
}
|
||||
|
||||
LOG.info("Sent reply.");
|
||||
LOG.info(replyMessage.toString());
|
||||
} catch (JMSException e) {
|
||||
onException(e);
|
||||
}
|
||||
}
|
||||
|
||||
protected void onException(JMSException e) {
|
||||
LOG.info("Caught: " + e);
|
||||
e.printStackTrace();
|
||||
failures.add(e);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue