mirror of https://github.com/apache/activemq.git
Adds an additional test case to verify that when doing request reply with two connections that ReplyTo values are prefixed with /remote-temp-topic and /remote-temp-queue to indicate that the connection receiving them is not the one that created them.
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1197717 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e0b23f27e2
commit
5b99f118a2
|
@ -133,6 +133,11 @@ public class StompTest extends CombinationTestSupport {
|
||||||
stompConnection.open(createSocket(connectUri));
|
stompConnection.open(createSocket(connectUri));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void stompConnect(StompConnection connection) throws IOException, URISyntaxException, UnknownHostException {
|
||||||
|
URI connectUri = new URI(bindAddress);
|
||||||
|
connection.open(createSocket(connectUri));
|
||||||
|
}
|
||||||
|
|
||||||
protected Socket createSocket(URI connectUri) throws IOException {
|
protected Socket createSocket(URI connectUri) throws IOException {
|
||||||
return new Socket("127.0.0.1", connectUri.getPort());
|
return new Socket("127.0.0.1", connectUri.getPort());
|
||||||
}
|
}
|
||||||
|
@ -1757,6 +1762,71 @@ public class StompTest extends CombinationTestSupport {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testReplyToAcrossConnections() throws Exception {
|
||||||
|
|
||||||
|
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
|
frame = stompConnection.receiveFrame();
|
||||||
|
assertTrue(frame.startsWith("CONNECTED"));
|
||||||
|
|
||||||
|
doReplyToAcrossConnections("topic");
|
||||||
|
doReplyToAcrossConnections("queue");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doReplyToAcrossConnections(String type) throws Exception {
|
||||||
|
LOG.info("Starting test on Temp Destinations using a temporary: " + type);
|
||||||
|
|
||||||
|
StompConnection responder = new StompConnection();
|
||||||
|
stompConnect(responder);
|
||||||
|
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||||
|
responder.sendFrame(frame);
|
||||||
|
|
||||||
|
frame = responder.receiveFrame();
|
||||||
|
assertTrue(frame.startsWith("CONNECTED"));
|
||||||
|
|
||||||
|
final String dest = "/" + type + "/" + getQueueName();
|
||||||
|
final String tempDest = String.format("/temp-%s/2C26441740C0ECC9tt1:1:0:1", type);
|
||||||
|
LOG.info("Test is using out-bound topic: " + dest + ", and replyTo dest: " + tempDest);
|
||||||
|
|
||||||
|
// Subscribe to the temp destination, this is where we get our response.
|
||||||
|
stompConnection.subscribe(tempDest);
|
||||||
|
|
||||||
|
// Subscribe to the Queue, this is where we get our request.
|
||||||
|
responder.subscribe(dest);
|
||||||
|
|
||||||
|
// Send a Message with the ReplyTo value set.
|
||||||
|
HashMap<String, String> properties = new HashMap<String, String>();
|
||||||
|
properties.put(Stomp.Headers.Send.REPLY_TO, tempDest);
|
||||||
|
LOG.info(String.format("Sending request message: SEND with %s=%s", Stomp.Headers.Send.REPLY_TO, tempDest));
|
||||||
|
stompConnection.send(dest, "REQUEST", null, properties);
|
||||||
|
|
||||||
|
// The subscription should receive a response with the ReplyTo property set.
|
||||||
|
StompFrame received = responder.receive();
|
||||||
|
assertNotNull(received);
|
||||||
|
String remoteReplyTo = received.getHeaders().get(Stomp.Headers.Send.REPLY_TO);
|
||||||
|
assertNotNull(remoteReplyTo);
|
||||||
|
assertTrue(remoteReplyTo.startsWith(String.format("/remote-temp-%s/", type)));
|
||||||
|
LOG.info(String.format("Received request message: %s with %s=%s", received.getAction(), Stomp.Headers.Send.REPLY_TO, remoteReplyTo));
|
||||||
|
|
||||||
|
// Reply to the request using the given ReplyTo destination
|
||||||
|
responder.send(remoteReplyTo, "RESPONSE");
|
||||||
|
|
||||||
|
// The response should be received by the Temporary Destination subscription
|
||||||
|
StompFrame reply = stompConnection.receive();
|
||||||
|
assertNotNull(reply);
|
||||||
|
assertEquals("MESSAGE", reply.getAction());
|
||||||
|
assertTrue(reply.getBody().contains("RESPONSE"));
|
||||||
|
LOG.info(String.format("Response %s received", reply.getAction()));
|
||||||
|
|
||||||
|
BrokerViewMBean broker = getProxyToBroker();
|
||||||
|
if (type.equals("topic")) {
|
||||||
|
assertEquals(1, broker.getTemporaryTopics().length);
|
||||||
|
} else {
|
||||||
|
assertEquals(1, broker.getTemporaryQueues().length);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private BrokerViewMBean getProxyToBroker() throws MalformedObjectNameException, JMSException {
|
private BrokerViewMBean getProxyToBroker() throws MalformedObjectNameException, JMSException {
|
||||||
ObjectName brokerViewMBean = new ObjectName(
|
ObjectName brokerViewMBean = new ObjectName(
|
||||||
"org.apache.activemq:Type=Broker,BrokerName=localhost");
|
"org.apache.activemq:Type=Broker,BrokerName=localhost");
|
||||||
|
|
Loading…
Reference in New Issue