diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java index eeab37c2e2..d796d073dc 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java @@ -25,6 +25,7 @@ import java.util.HashMap; public class StompCodec { + final static byte[] crlfcrlf = new byte[]{'\r','\n','\r','\n'}; TcpTransport transport; ByteArrayOutputStream currentCommand = new ByteArrayOutputStream(); @@ -52,7 +53,7 @@ public class StompCodec { if (!processedHeaders) { currentCommand.write(b); // end of headers section, parse action and header - if (previousByte == '\n' && b == '\n') { + if (b == '\n' && (previousByte == '\n' || currentCommand.endsWith(crlfcrlf))) { if (transport.getWireFormat() instanceof StompWireFormat) { DataByteArrayInputStream data = new DataByteArrayInputStream(currentCommand.toByteArray()); action = ((StompWireFormat)transport.getWireFormat()).parseAction(data); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java index 8e2a02c7f3..282443eb96 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java @@ -178,7 +178,7 @@ public class StompWireFormat implements WireFormat { HashMap headers = new HashMap(25); while (true) { ByteSequence line = readHeaderLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded"); - if (line != null && line.length > 0) { + if (line != null && line.length > 1) { if (headers.size() > MAX_HEADERS) { throw new ProtocolException("The maximum number of headers was exceeded", true); diff --git a/activemq-core/src/main/java/org/apache/activemq/util/ByteArrayOutputStream.java b/activemq-core/src/main/java/org/apache/activemq/util/ByteArrayOutputStream.java index fca10fc70a..0c7f8995c3 100644 --- a/activemq-core/src/main/java/org/apache/activemq/util/ByteArrayOutputStream.java +++ b/activemq-core/src/main/java/org/apache/activemq/util/ByteArrayOutputStream.java @@ -52,7 +52,7 @@ public class ByteArrayOutputStream extends OutputStream { /** * Ensures the the buffer has at least the minimumCapacity specified. - * @param i + * @param minimumCapacity */ private void checkCapacity(int minimumCapacity) { if (minimumCapacity > buffer.length) { @@ -79,4 +79,18 @@ public class ByteArrayOutputStream extends OutputStream { public int size() { return size; } + + public boolean endsWith(final byte[] array) { + int i = 0; + int start = size - array.length; + if (start < 0) { + return false; + } + while (start < size) { + if (buffer[start++] != array[i++]) { + return false; + } + } + return true; + } } diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTelnetTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTelnetTest.java new file mode 100644 index 0000000000..0bb1a3e203 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTelnetTest.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.stomp; + +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import org.apache.activemq.CombinationTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StompTelnetTest extends CombinationTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(StompTelnetTest.class); + + private BrokerService broker; + + @Override + protected void setUp() throws Exception { + + broker = new BrokerService(); + broker.setPersistent(false); + broker.addConnector("stomp://localhost:0"); + broker.addConnector("stomp+nio://localhost:0"); + + broker.start(); + broker.waitUntilStarted(); + } + + public void testCRLF() throws Exception { + + for (TransportConnector connector : broker.getTransportConnectors()) { + LOG.info("try: " + connector.getConnectUri()); + + StompConnection stompConnection = new StompConnection(); + stompConnection.open(createSocket(connector.getConnectUri())); + String frame = "CONNECT\r\n\r\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + LOG.info("response from: " + connector.getConnectUri() + ", " + frame); + assertTrue(frame.startsWith("CONNECTED")); + stompConnection.close(); + } + } + + protected Socket createSocket(URI connectUri) throws IOException { + return new Socket("127.0.0.1", connectUri.getPort()); + } + + protected String getQueueName() { + return getClass().getName() + "." + getName(); + } + + @Override + protected void tearDown() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + } + +}