mirror of https://github.com/apache/activemq.git
Implementing AMQ-748 - adding optional request-id header in connect message and corresponding response-id in the connected message. Updating StompTest to verify.
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@413770 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e9e8fd147d
commit
021a2afd88
|
@ -37,7 +37,7 @@ class Connect implements StompCommand {
|
|||
|
||||
public CommandEnvelope build(String commandLine, DataInput in) throws IOException {
|
||||
|
||||
Properties headers = headerParser.parse(in);
|
||||
final Properties headers = headerParser.parse(in);
|
||||
|
||||
|
||||
// allow anyone to login for now
|
||||
|
@ -59,48 +59,66 @@ class Connect implements StompCommand {
|
|||
connectionInfo.setPassword(passcode);
|
||||
|
||||
while (in.readByte() != 0) {
|
||||
}
|
||||
}
|
||||
|
||||
return new CommandEnvelope(connectionInfo, headers, new ResponseListener() {
|
||||
public boolean onResponse(Response receipt, DataOutput out) throws IOException {
|
||||
return new CommandEnvelope(connectionInfo, headers,
|
||||
new ConnectResponseListener(headers, connectionInfo) );
|
||||
}
|
||||
|
||||
class ConnectResponseListener implements ResponseListener{
|
||||
|
||||
private Properties headers;
|
||||
private ConnectionInfo connectionInfo;
|
||||
|
||||
public ConnectResponseListener( Properties headers, final ConnectionInfo connectionInfo ){
|
||||
this.headers = headers;
|
||||
this.connectionInfo = connectionInfo;
|
||||
}
|
||||
|
||||
public boolean onResponse(Response receipt, DataOutput out) throws IOException {
|
||||
|
||||
if (receipt.getCorrelationId() != connectionInfo.getCommandId())
|
||||
return false;
|
||||
|
||||
final SessionInfo sessionInfo = new SessionInfo(format.getSessionId());
|
||||
sessionInfo.setCommandId(format.generateCommandId());
|
||||
sessionInfo.setResponseRequired(false);
|
||||
|
||||
final ProducerInfo producerInfo = new ProducerInfo(format.getProducerId());
|
||||
producerInfo.setCommandId(format.generateCommandId());
|
||||
producerInfo.setResponseRequired(true);
|
||||
|
||||
format.addResponseListener(new ResponseListener() {
|
||||
public boolean onResponse(Response receipt, DataOutput out) throws IOException {
|
||||
if (receipt.getCorrelationId() != producerInfo.getCommandId())
|
||||
return false;
|
||||
|
||||
format.onFullyConnected();
|
||||
|
||||
StringBuffer buffer = new StringBuffer();
|
||||
buffer.append(Stomp.Responses.CONNECTED);
|
||||
buffer.append(Stomp.NEWLINE);
|
||||
buffer.append(Stomp.Headers.Connected.SESSION);
|
||||
buffer.append(Stomp.Headers.SEPERATOR);
|
||||
buffer.append(connectionInfo.getClientId());
|
||||
buffer.append(Stomp.NEWLINE);
|
||||
buffer.append(Stomp.NEWLINE);
|
||||
buffer.append(Stomp.NULL);
|
||||
buffer.append(Stomp.NEWLINE);
|
||||
out.writeBytes(buffer.toString());
|
||||
return true;
|
||||
}
|
||||
});
|
||||
if (receipt.getCorrelationId() != connectionInfo.getCommandId())
|
||||
return false;
|
||||
|
||||
final SessionInfo sessionInfo = new SessionInfo(format.getSessionId());
|
||||
sessionInfo.setCommandId(format.generateCommandId());
|
||||
sessionInfo.setResponseRequired(false);
|
||||
|
||||
final ProducerInfo producerInfo = new ProducerInfo(format.getProducerId());
|
||||
producerInfo.setCommandId(format.generateCommandId());
|
||||
producerInfo.setResponseRequired(true);
|
||||
|
||||
format.addResponseListener( new ResponseListener(){
|
||||
public boolean onResponse(Response receipt, DataOutput out) throws IOException {
|
||||
if (receipt.getCorrelationId() != producerInfo.getCommandId() )
|
||||
return false;
|
||||
|
||||
format.onFullyConnected();
|
||||
|
||||
StringBuffer buffer = new StringBuffer();
|
||||
buffer.append(Stomp.Responses.CONNECTED);
|
||||
buffer.append(Stomp.NEWLINE);
|
||||
buffer.append(Stomp.Headers.Connected.SESSION);
|
||||
buffer.append(Stomp.Headers.SEPERATOR);
|
||||
buffer.append(connectionInfo.getClientId());
|
||||
if( headers.containsKey(Stomp.Headers.Connect.REQUEST_ID) ){
|
||||
buffer.append(Stomp.NEWLINE);
|
||||
buffer.append(Stomp.Headers.Connected.RESPONSE_ID);
|
||||
buffer.append(Stomp.Headers.SEPERATOR);
|
||||
buffer.append(headers.getProperty( Stomp.Headers.Connect.REQUEST_ID ));
|
||||
}
|
||||
buffer.append(Stomp.NEWLINE);
|
||||
buffer.append(Stomp.NEWLINE);
|
||||
buffer.append(Stomp.NULL);
|
||||
buffer.append(Stomp.NEWLINE);
|
||||
out.writeBytes(buffer.toString());
|
||||
return true;
|
||||
}
|
||||
});
|
||||
|
||||
format.addToPendingReadCommands(sessionInfo);
|
||||
format.addToPendingReadCommands(producerInfo);
|
||||
return true;
|
||||
}
|
||||
});
|
||||
format.addToPendingReadCommands(sessionInfo);
|
||||
format.addToPendingReadCommands(producerInfo);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -97,6 +97,7 @@ public interface Stomp {
|
|||
String LOGIN = "login";
|
||||
String PASSCODE = "passcode";
|
||||
String CLIENT_ID = "client-id";
|
||||
String REQUEST_ID = "request-id";
|
||||
}
|
||||
|
||||
public interface Error {
|
||||
|
@ -105,6 +106,7 @@ public interface Stomp {
|
|||
|
||||
public interface Connected {
|
||||
String SESSION = "session";
|
||||
String RESPONSE_ID = "response-id";
|
||||
}
|
||||
|
||||
public interface Ack {
|
||||
|
|
|
@ -122,11 +122,12 @@ public class StompTest extends CombinationTestSupport {
|
|||
|
||||
public void testConnect() throws Exception {
|
||||
|
||||
String connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "\n" + Stomp.NULL;
|
||||
String connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "request-id: 1\n" + "\n" + Stomp.NULL;
|
||||
sendFrame(connect_frame);
|
||||
|
||||
String f = receiveFrame(10000);
|
||||
assertTrue(f.startsWith("CONNECTED"));
|
||||
assertTrue(f.contains("response-id:1"));
|
||||
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue