mirror of https://github.com/apache/activemq.git
[AMQ-8409] Fixed Unexpected \\r instead of \r in header property in incoming messages
This commit is contained in:
parent
5a16145c32
commit
d9450cdd74
|
@ -25,9 +25,11 @@ public interface Stomp {
|
||||||
byte BREAK = '\n';
|
byte BREAK = '\n';
|
||||||
byte COLON = ':';
|
byte COLON = ':';
|
||||||
byte ESCAPE = '\\';
|
byte ESCAPE = '\\';
|
||||||
|
byte CARRIAGE_RETURN = '\r';
|
||||||
byte[] ESCAPE_ESCAPE_SEQ = { 92, 92 };
|
byte[] ESCAPE_ESCAPE_SEQ = { 92, 92 };
|
||||||
byte[] COLON_ESCAPE_SEQ = { 92, 99 };
|
byte[] COLON_ESCAPE_SEQ = { 92, 99 };
|
||||||
byte[] NEWLINE_ESCAPE_SEQ = { 92, 110 };
|
byte[] NEWLINE_ESCAPE_SEQ = { 92, 110 };
|
||||||
|
byte[] CARRIAGE_ESCAPE_SEQ = { 92, 114 };
|
||||||
|
|
||||||
String COMMA = ",";
|
String COMMA = ",";
|
||||||
String V1_0 = "1.0";
|
String V1_0 = "1.0";
|
||||||
|
|
|
@ -316,6 +316,11 @@ public class StompWireFormat implements WireFormat {
|
||||||
case Stomp.COLON:
|
case Stomp.COLON:
|
||||||
stream.write(Stomp.COLON_ESCAPE_SEQ);
|
stream.write(Stomp.COLON_ESCAPE_SEQ);
|
||||||
break;
|
break;
|
||||||
|
case Stomp.CARRIAGE_RETURN:
|
||||||
|
if(stompVersion.equals(Stomp.V1_2)) {
|
||||||
|
stream.write(Stomp.CARRIAGE_ESCAPE_SEQ);
|
||||||
|
break;
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
stream.write(val);
|
stream.write(val);
|
||||||
}
|
}
|
||||||
|
@ -347,6 +352,11 @@ public class StompWireFormat implements WireFormat {
|
||||||
case 92:
|
case 92:
|
||||||
decoded.write(Stomp.ESCAPE);
|
decoded.write(Stomp.ESCAPE);
|
||||||
break;
|
break;
|
||||||
|
case 114:
|
||||||
|
if(stompVersion.equals(Stomp.V1_2)) {
|
||||||
|
decoded.write(Stomp.CARRIAGE_RETURN);
|
||||||
|
break;
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
stream.unread(next);
|
stream.unread(next);
|
||||||
decoded.write(value);
|
decoded.write(value);
|
||||||
|
|
|
@ -16,11 +16,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.transport.stomp;
|
package org.apache.activemq.transport.stomp;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertNotNull;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
import static org.junit.Assert.fail;
|
|
||||||
|
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
import java.net.SocketTimeoutException;
|
import java.net.SocketTimeoutException;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
|
@ -43,6 +38,8 @@ import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
public class Stomp11Test extends StompTestSupport {
|
public class Stomp11Test extends StompTestSupport {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(Stomp11Test.class);
|
private static final Logger LOG = LoggerFactory.getLogger(Stomp11Test.class);
|
||||||
|
@ -1208,6 +1205,44 @@ public class Stomp11Test extends StompTestSupport {
|
||||||
doTestAckMessagesInTransactionOutOfOrderWithTXClientAck("client-individual");
|
doTestAckMessagesInTransactionOutOfOrderWithTXClientAck("client-individual");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testFrameHeaderEscapes() throws Exception {
|
||||||
|
String connectFrame = "STOMP\n" +
|
||||||
|
"login:system\n" +
|
||||||
|
"passcode:manager\n" +
|
||||||
|
"accept-version:1.1\n" +
|
||||||
|
"host:localhost\n" +
|
||||||
|
"\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(connectFrame);
|
||||||
|
|
||||||
|
String f = stompConnection.receiveFrame();
|
||||||
|
LOG.debug("Broker sent: " + f);
|
||||||
|
|
||||||
|
assertTrue(f.startsWith("CONNECTED"));
|
||||||
|
|
||||||
|
String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
|
||||||
|
"id:12345\n" +
|
||||||
|
"ack:auto\n\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
|
String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" +
|
||||||
|
"colon:\\c\n" +
|
||||||
|
"linefeed:\\n\n" +
|
||||||
|
"backslash:\\\\\n" +
|
||||||
|
"carriagereturn:\\r\n" +
|
||||||
|
"\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(message);
|
||||||
|
|
||||||
|
frame = stompConnection.receiveFrame();
|
||||||
|
|
||||||
|
LOG.debug("Broker sent: " + frame);
|
||||||
|
assertTrue(frame.startsWith("MESSAGE"));
|
||||||
|
assertTrue(frame.contains("colon:\\c\n"));
|
||||||
|
assertTrue(frame.contains("linefeed:\\n\n"));
|
||||||
|
assertTrue(frame.contains("backslash:\\\\\n"));
|
||||||
|
assertFalse(frame.contains("carriagereturn:\\r\n"));
|
||||||
|
}
|
||||||
|
|
||||||
public void doTestAckMessagesInTransactionOutOfOrderWithTXClientAck(String ackMode) throws Exception {
|
public void doTestAckMessagesInTransactionOutOfOrderWithTXClientAck(String ackMode) throws Exception {
|
||||||
MessageProducer producer = session.createProducer(queue);
|
MessageProducer producer = session.createProducer(queue);
|
||||||
producer.send(session.createTextMessage("Message 1"));
|
producer.send(session.createTextMessage("Message 1"));
|
||||||
|
|
|
@ -1066,6 +1066,44 @@ public class Stomp12Test extends StompTestSupport {
|
||||||
doTestMixedAckNackWithMessageAckIds(true);
|
doTestMixedAckNackWithMessageAckIds(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testFrameHeaderEscapes() throws Exception {
|
||||||
|
String connectFrame = "STOMP\n" +
|
||||||
|
"login:system\n" +
|
||||||
|
"passcode:manager\n" +
|
||||||
|
"accept-version:1.2\n" +
|
||||||
|
"host:localhost\n" +
|
||||||
|
"\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(connectFrame);
|
||||||
|
|
||||||
|
String f = stompConnection.receiveFrame();
|
||||||
|
LOG.debug("Broker sent: " + f);
|
||||||
|
|
||||||
|
assertTrue(f.startsWith("CONNECTED"));
|
||||||
|
|
||||||
|
String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
|
||||||
|
"id:12345\n" +
|
||||||
|
"ack:auto\n\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
|
String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" +
|
||||||
|
"colon:\\c\n" +
|
||||||
|
"linefeed:\\n\n" +
|
||||||
|
"backslash:\\\\\n" +
|
||||||
|
"carriagereturn:\\r\n" +
|
||||||
|
"\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(message);
|
||||||
|
|
||||||
|
frame = stompConnection.receiveFrame();
|
||||||
|
|
||||||
|
LOG.debug("Broker sent: " + frame);
|
||||||
|
assertTrue(frame.startsWith("MESSAGE"));
|
||||||
|
assertTrue(frame.contains("colon:\\c\n"));
|
||||||
|
assertTrue(frame.contains("linefeed:\\n\n"));
|
||||||
|
assertTrue(frame.contains("backslash:\\\\\n"));
|
||||||
|
assertTrue(frame.contains("carriagereturn:\\r\n"));
|
||||||
|
}
|
||||||
|
|
||||||
public void doTestMixedAckNackWithMessageAckIds(boolean individual) throws Exception {
|
public void doTestMixedAckNackWithMessageAckIds(boolean individual) throws Exception {
|
||||||
|
|
||||||
final int MESSAGE_COUNT = 20;
|
final int MESSAGE_COUNT = 20;
|
||||||
|
|
|
@ -2285,6 +2285,45 @@ public class StompTest extends StompTestSupport {
|
||||||
doTestAckInTransaction(false);
|
doTestAckInTransaction(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testFrameHeaderEscapes() throws Exception {
|
||||||
|
String connectFrame = "STOMP\n" +
|
||||||
|
"login:system\n" +
|
||||||
|
"passcode:manager\n" +
|
||||||
|
"accept-version:1.0\n" +
|
||||||
|
"host:localhost\n" +
|
||||||
|
"\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(connectFrame);
|
||||||
|
|
||||||
|
String f = stompConnection.receiveFrame();
|
||||||
|
LOG.debug("Broker sent: " + f);
|
||||||
|
|
||||||
|
assertTrue(f.startsWith("CONNECTED"));
|
||||||
|
|
||||||
|
String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
|
||||||
|
"id:12345\n" +
|
||||||
|
"ack:auto\n\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
|
String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" +
|
||||||
|
"colon:\\c\n" +
|
||||||
|
"linefeed:\\n\n" +
|
||||||
|
"backslash:\\\\\n" +
|
||||||
|
"carriagereturn:\\r\n" +
|
||||||
|
"\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(message);
|
||||||
|
|
||||||
|
frame = stompConnection.receiveFrame();
|
||||||
|
|
||||||
|
LOG.debug("Broker sent: " + frame);
|
||||||
|
assertTrue(frame.startsWith("MESSAGE"));
|
||||||
|
assertFalse(frame.contains("colon:\\c\n"));
|
||||||
|
assertFalse(frame.contains("linefeed:\\n\n"));
|
||||||
|
assertFalse(frame.contains("backslash:\\\\\n"));
|
||||||
|
assertFalse(frame.contains("carriagereturn:\\\\r\n"));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
public void doTestAckInTransaction(boolean topic) throws Exception {
|
public void doTestAckInTransaction(boolean topic) throws Exception {
|
||||||
|
|
||||||
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
|
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
|
||||||
|
|
Loading…
Reference in New Issue