git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@963282 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2010-07-12 13:57:11 +00:00
parent 70971237bf
commit 584d52c1f0
4 changed files with 151 additions and 52 deletions

View File

@ -56,6 +56,15 @@ public class StompConnection {
outputStream.flush();
}
public void sendFrame(String frame, byte[] data) throws Exception {
byte[] bytes = frame.getBytes("UTF-8");
OutputStream outputStream = stompSocket.getOutputStream();
outputStream.write(bytes);
outputStream.write(data);
outputStream.write(0);
outputStream.flush();
}
public StompFrame receive() throws Exception {
return receive(RECEIVE_TIMEOUT);
}

View File

@ -26,6 +26,7 @@ import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import javax.net.SocketFactory;
@ -37,6 +38,7 @@ import org.apache.activemq.transport.nio.SelectorSelection;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.DataByteArrayInputStream;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
@ -53,6 +55,11 @@ public class StompNIOTransport extends TcpTransport {
private ByteBuffer inputBuffer;
ByteArrayOutputStream currentCommand = new ByteArrayOutputStream();
boolean processedHeaders = false;
String action;
HashMap<String, String> headers;
int contentLength = -1;
int readLength = 0;
int previousByte = -1;
public StompNIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
@ -114,16 +121,47 @@ public class StompNIOTransport extends TcpTransport {
while(i++ < readSize) {
b = input.read();
// skip repeating nulls
if (previousByte == 0 && b == 0) {
if (!processedHeaders && previousByte == 0 && b == 0) {
continue;
}
currentCommand.write(b);
// end of command reached, unmarshal
if (b == 0) {
Object command = wireFormat.unmarshal(new ByteSequence(currentCommand.toByteArray()));
doConsume((Command)command);
currentCommand.reset();
if (!processedHeaders) {
currentCommand.write(b);
// end of headers section, parse action and header
if (previousByte == '\n' && b == '\n') {
if (wireFormat instanceof StompWireFormat) {
DataByteArrayInputStream data = new DataByteArrayInputStream(currentCommand.toByteArray());
action = ((StompWireFormat)wireFormat).parseAction(data);
headers = ((StompWireFormat)wireFormat).parseHeaders(data);
String contentLengthHeader = headers.get(Stomp.Headers.CONTENT_LENGTH);
if (contentLengthHeader != null) {
contentLength = ((StompWireFormat)wireFormat).parseContentLength(contentLengthHeader);
} else {
contentLength = -1;
}
}
processedHeaders = true;
currentCommand.reset();
}
} else {
if (contentLength == -1) {
// end of command reached, unmarshal
if (b == 0) {
processCommand();
} else {
currentCommand.write(b);
}
} else {
// read desired content length
if (readLength++ == contentLength) {
processCommand();
} else {
currentCommand.write(b);
}
}
}
previousByte = b;
}
// clear the buffer
@ -136,6 +174,14 @@ public class StompNIOTransport extends TcpTransport {
onException(IOExceptionSupport.create(e));
}
}
private void processCommand() throws Exception {
StompFrame frame = new StompFrame(action, headers, currentCommand.toByteArray());
doConsume(frame);
processedHeaders = false;
currentCommand.reset();
contentLength = -1;
}
protected void doStart() throws Exception {
connect();

View File

@ -87,43 +87,12 @@ public class StompWireFormat implements WireFormat {
public Object unmarshal(DataInput in) throws IOException {
try {
String action = null;
// skip white space to next real action line
while (true) {
action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded");
if (action == null) {
throw new IOException("connection was closed");
} else {
action = action.trim();
if (action.length() > 0) {
break;
}
}
}
// parse action
String action = parseAction(in);
// Parse the headers
HashMap<String, String> headers = new HashMap<String, String>(25);
while (true) {
String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded");
if (line != null && line.trim().length() > 0) {
if (headers.size() > MAX_HEADERS) {
throw new ProtocolException("The maximum number of headers was exceeded", true);
}
try {
int seperatorIndex = line.indexOf(Stomp.Headers.SEPERATOR);
String name = line.substring(0, seperatorIndex).trim();
String value = line.substring(seperatorIndex + 1, line.length()).trim();
headers.put(name, value);
} catch (Exception e) {
throw new ProtocolException("Unable to parser header line [" + line + "]", true);
}
} else {
break;
}
}
HashMap<String, String> headers = parseHeaders(in);
// Read in the data part.
byte[] data = NO_DATA;
@ -131,16 +100,7 @@ public class StompWireFormat implements WireFormat {
if (contentLength != null) {
// Bless the client, he's telling us how much data to read in.
int length;
try {
length = Integer.parseInt(contentLength.trim());
} catch (NumberFormatException e) {
throw new ProtocolException("Specified content-length is not a valid integer", true);
}
if (length > MAX_DATA_LENGTH) {
throw new ProtocolException("The maximum data length was exceeded", true);
}
int length = parseContentLength(contentLength);
data = new byte[length];
in.readFully(data);
@ -193,6 +153,64 @@ public class StompWireFormat implements WireFormat {
ByteSequence sequence = baos.toByteSequence();
return new String(sequence.getData(), sequence.getOffset(), sequence.getLength(), "UTF-8");
}
protected String parseAction(DataInput in) throws IOException {
String action = null;
// skip white space to next real action line
while (true) {
action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded");
if (action == null) {
throw new IOException("connection was closed");
} else {
action = action.trim();
if (action.length() > 0) {
break;
}
}
}
return action;
}
protected HashMap<String, String> parseHeaders(DataInput in) throws IOException {
HashMap<String, String> headers = new HashMap<String, String>(25);
while (true) {
String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded");
if (line != null && line.trim().length() > 0) {
if (headers.size() > MAX_HEADERS) {
throw new ProtocolException("The maximum number of headers was exceeded", true);
}
try {
int seperatorIndex = line.indexOf(Stomp.Headers.SEPERATOR);
String name = line.substring(0, seperatorIndex).trim();
String value = line.substring(seperatorIndex + 1, line.length()).trim();
headers.put(name, value);
} catch (Exception e) {
throw new ProtocolException("Unable to parser header line [" + line + "]", true);
}
} else {
break;
}
}
return headers;
}
protected int parseContentLength(String contentLength) throws ProtocolException {
int length;
try {
length = Integer.parseInt(contentLength.trim());
} catch (NumberFormatException e) {
throw new ProtocolException("Specified content-length is not a valid integer", true);
}
if (length > MAX_DATA_LENGTH) {
throw new ProtocolException("The maximum data length was exceeded", true);
}
return length;
}
public int getVersion() {
return version;

View File

@ -397,6 +397,32 @@ public class StompTest extends CombinationTestSupport {
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
public void testBytesMessageWithNulls() throws Exception {
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SEND\n destination:/queue/" + getQueueName() + "\ncontent-length:5" + " \n\n" + "\u0001\u0002\u0000\u0004\u0005" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
StompFrame message = stompConnection.receive();
assertTrue(message.getAction().startsWith("MESSAGE"));
String length = message.getHeaders().get("content-length");
assertEquals("5", length);
assertEquals(5, message.getContent().length);
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
public void testSubscribeWithMessageSentWithProperties() throws Exception {