This closes #736

This commit is contained in:
Jean-Baptiste Onofré 2022-01-19 18:29:25 +01:00
commit ea48a9c257
5 changed files with 129 additions and 5 deletions

View File

@ -25,9 +25,11 @@ public interface Stomp {
byte BREAK = '\n'; byte BREAK = '\n';
byte COLON = ':'; byte COLON = ':';
byte ESCAPE = '\\'; byte ESCAPE = '\\';
byte CARRIAGE_RETURN = '\r';
byte[] ESCAPE_ESCAPE_SEQ = { 92, 92 }; byte[] ESCAPE_ESCAPE_SEQ = { 92, 92 };
byte[] COLON_ESCAPE_SEQ = { 92, 99 }; byte[] COLON_ESCAPE_SEQ = { 92, 99 };
byte[] NEWLINE_ESCAPE_SEQ = { 92, 110 }; byte[] NEWLINE_ESCAPE_SEQ = { 92, 110 };
byte[] CARRIAGE_ESCAPE_SEQ = { 92, 114 };
String COMMA = ","; String COMMA = ",";
String V1_0 = "1.0"; String V1_0 = "1.0";

View File

@ -316,6 +316,11 @@ public class StompWireFormat implements WireFormat {
case Stomp.COLON: case Stomp.COLON:
stream.write(Stomp.COLON_ESCAPE_SEQ); stream.write(Stomp.COLON_ESCAPE_SEQ);
break; break;
case Stomp.CARRIAGE_RETURN:
if(stompVersion.equals(Stomp.V1_2)) {
stream.write(Stomp.CARRIAGE_ESCAPE_SEQ);
break;
}
default: default:
stream.write(val); stream.write(val);
} }
@ -347,6 +352,11 @@ public class StompWireFormat implements WireFormat {
case 92: case 92:
decoded.write(Stomp.ESCAPE); decoded.write(Stomp.ESCAPE);
break; break;
case 114:
if(stompVersion.equals(Stomp.V1_2)) {
decoded.write(Stomp.CARRIAGE_RETURN);
break;
}
default: default:
stream.unread(next); stream.unread(next);
decoded.write(value); decoded.write(value);

View File

@ -16,11 +16,6 @@
*/ */
package org.apache.activemq.transport.stomp; package org.apache.activemq.transport.stomp;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -43,6 +38,8 @@ import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.junit.Assert.*;
public class Stomp11Test extends StompTestSupport { public class Stomp11Test extends StompTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(Stomp11Test.class); private static final Logger LOG = LoggerFactory.getLogger(Stomp11Test.class);
@ -1208,6 +1205,44 @@ public class Stomp11Test extends StompTestSupport {
doTestAckMessagesInTransactionOutOfOrderWithTXClientAck("client-individual"); doTestAckMessagesInTransactionOutOfOrderWithTXClientAck("client-individual");
} }
@Test(timeout = 60000)
public void testFrameHeaderEscapes() throws Exception {
String connectFrame = "STOMP\n" +
"login:system\n" +
"passcode:manager\n" +
"accept-version:1.1\n" +
"host:localhost\n" +
"\n" + Stomp.NULL;
stompConnection.sendFrame(connectFrame);
String f = stompConnection.receiveFrame();
LOG.debug("Broker sent: " + f);
assertTrue(f.startsWith("CONNECTED"));
String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
"id:12345\n" +
"ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" +
"colon:\\c\n" +
"linefeed:\\n\n" +
"backslash:\\\\\n" +
"carriagereturn:\\r\n" +
"\n" + Stomp.NULL;
stompConnection.sendFrame(message);
frame = stompConnection.receiveFrame();
LOG.debug("Broker sent: " + frame);
assertTrue(frame.startsWith("MESSAGE"));
assertTrue(frame.contains("colon:\\c\n"));
assertTrue(frame.contains("linefeed:\\n\n"));
assertTrue(frame.contains("backslash:\\\\\n"));
assertFalse(frame.contains("carriagereturn:\\r\n"));
}
public void doTestAckMessagesInTransactionOutOfOrderWithTXClientAck(String ackMode) throws Exception { public void doTestAckMessagesInTransactionOutOfOrderWithTXClientAck(String ackMode) throws Exception {
MessageProducer producer = session.createProducer(queue); MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("Message 1")); producer.send(session.createTextMessage("Message 1"));

View File

@ -1066,6 +1066,44 @@ public class Stomp12Test extends StompTestSupport {
doTestMixedAckNackWithMessageAckIds(true); doTestMixedAckNackWithMessageAckIds(true);
} }
@Test(timeout = 60000)
public void testFrameHeaderEscapes() throws Exception {
String connectFrame = "STOMP\n" +
"login:system\n" +
"passcode:manager\n" +
"accept-version:1.2\n" +
"host:localhost\n" +
"\n" + Stomp.NULL;
stompConnection.sendFrame(connectFrame);
String f = stompConnection.receiveFrame();
LOG.debug("Broker sent: " + f);
assertTrue(f.startsWith("CONNECTED"));
String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
"id:12345\n" +
"ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" +
"colon:\\c\n" +
"linefeed:\\n\n" +
"backslash:\\\\\n" +
"carriagereturn:\\r\n" +
"\n" + Stomp.NULL;
stompConnection.sendFrame(message);
frame = stompConnection.receiveFrame();
LOG.debug("Broker sent: " + frame);
assertTrue(frame.startsWith("MESSAGE"));
assertTrue(frame.contains("colon:\\c\n"));
assertTrue(frame.contains("linefeed:\\n\n"));
assertTrue(frame.contains("backslash:\\\\\n"));
assertTrue(frame.contains("carriagereturn:\\r\n"));
}
public void doTestMixedAckNackWithMessageAckIds(boolean individual) throws Exception { public void doTestMixedAckNackWithMessageAckIds(boolean individual) throws Exception {
final int MESSAGE_COUNT = 20; final int MESSAGE_COUNT = 20;

View File

@ -2285,6 +2285,45 @@ public class StompTest extends StompTestSupport {
doTestAckInTransaction(false); doTestAckInTransaction(false);
} }
@Test(timeout = 60000)
public void testFrameHeaderEscapes() throws Exception {
String connectFrame = "STOMP\n" +
"login:system\n" +
"passcode:manager\n" +
"accept-version:1.0\n" +
"host:localhost\n" +
"\n" + Stomp.NULL;
stompConnection.sendFrame(connectFrame);
String f = stompConnection.receiveFrame();
LOG.debug("Broker sent: " + f);
assertTrue(f.startsWith("CONNECTED"));
String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
"id:12345\n" +
"ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" +
"colon:\\c\n" +
"linefeed:\\n\n" +
"backslash:\\\\\n" +
"carriagereturn:\\r\n" +
"\n" + Stomp.NULL;
stompConnection.sendFrame(message);
frame = stompConnection.receiveFrame();
LOG.debug("Broker sent: " + frame);
assertTrue(frame.startsWith("MESSAGE"));
assertFalse(frame.contains("colon:\\c\n"));
assertFalse(frame.contains("linefeed:\\n\n"));
assertFalse(frame.contains("backslash:\\\\\n"));
assertFalse(frame.contains("carriagereturn:\\\\r\n"));
}
public void doTestAckInTransaction(boolean topic) throws Exception { public void doTestAckInTransaction(boolean topic) throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL; String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;