mirror of https://github.com/apache/activemq.git
AMQ-8398 - Fix Stomp to OpenWire UTF-8 translation
This commit fixes conversion of messages that are sent or received using Stomp when the body contains characters that require 4 bytes for encoding using standard UTF-8. ActiveMQ and OpenWire currently use a modified UTF-8 encoding that only uses 3 bytes so the conversion previously was breaking because the body was encoded using standard JDK UTF-8 encoding and set directly on an ActiveMQText message which was leading to decoding errors later when the ActiveMQMessage tried to decode using the modified encoder. The reverse was also was true and was breaking in some cases. The fix now makes sure to correctly decode the Stomp message back to a String first and set that on the ActiveMQ message so it can be re-encoded correctly. The reverse is fixed as well so both conversion from Stomp -> OpenWire and OpenWire -> Stomp work. Tests have been added for Stomp -> OpenWire, OpenWire -> Stomp, and Stomp -> Stomp which is really Stomp -> OpenWire -> Stomp.
This commit is contained in:
parent
e45ee4aae5
commit
3ddf515597
|
@ -18,6 +18,7 @@ package org.apache.activemq.transport.stomp;
|
|||
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -53,12 +54,9 @@ public class LegacyFrameTranslator implements FrameTranslator {
|
|||
if(intendedType.equalsIgnoreCase("text")){
|
||||
ActiveMQTextMessage text = new ActiveMQTextMessage();
|
||||
try {
|
||||
ByteArrayOutputStream bytes = new ByteArrayOutputStream(command.getContent().length + 4);
|
||||
DataOutputStream data = new DataOutputStream(bytes);
|
||||
data.writeInt(command.getContent().length);
|
||||
data.write(command.getContent());
|
||||
text.setContent(bytes.toByteSequence());
|
||||
data.close();
|
||||
// AMQ-8398 - get the original text back so we decode from standard UTF-8
|
||||
// and set on the message so it will re-encode using AMQ modified UTF-8
|
||||
text.setText(command.getBody());
|
||||
} catch (Throwable e) {
|
||||
throw new ProtocolException("Text could not bet set: " + e, false, e);
|
||||
}
|
||||
|
@ -78,12 +76,9 @@ public class LegacyFrameTranslator implements FrameTranslator {
|
|||
} else {
|
||||
ActiveMQTextMessage text = new ActiveMQTextMessage();
|
||||
try {
|
||||
ByteArrayOutputStream bytes = new ByteArrayOutputStream(command.getContent().length + 4);
|
||||
DataOutputStream data = new DataOutputStream(bytes);
|
||||
data.writeInt(command.getContent().length);
|
||||
data.write(command.getContent());
|
||||
text.setContent(bytes.toByteSequence());
|
||||
data.close();
|
||||
// AMQ-8398 - get the original text back so we decode from standard UTF-8
|
||||
// and set on the message so it will re-encode using AMQ modified UTF-8
|
||||
text.setText(command.getBody());
|
||||
} catch (Throwable e) {
|
||||
throw new ProtocolException("Text could not bet set: " + e, false, e);
|
||||
}
|
||||
|
@ -103,22 +98,13 @@ public class LegacyFrameTranslator implements FrameTranslator {
|
|||
FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(converter, message, command, this);
|
||||
|
||||
if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
|
||||
|
||||
if (!message.isCompressed() && message.getContent() != null) {
|
||||
ByteSequence msgContent = message.getContent();
|
||||
if (msgContent.getLength() > 4) {
|
||||
byte[] content = new byte[msgContent.getLength() - 4];
|
||||
System.arraycopy(msgContent.data, 4, content, 0, content.length);
|
||||
command.setContent(content);
|
||||
}
|
||||
} else {
|
||||
ActiveMQTextMessage msg = (ActiveMQTextMessage)message.copy();
|
||||
// AMQ-8398 - get the original text back so we decode from modified UTF-8
|
||||
// and then we can re-encode using the standard JDK encoding
|
||||
String messageText = msg.getText();
|
||||
if (messageText != null) {
|
||||
command.setContent(msg.getText().getBytes("UTF-8"));
|
||||
command.setContent(msg.getText().getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
}
|
||||
|
||||
} else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) {
|
||||
|
||||
ActiveMQBytesMessage msg = (ActiveMQBytesMessage)message.copy();
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package org.apache.activemq.transport.stomp;
|
||||
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Locale;
|
||||
|
@ -75,11 +76,7 @@ public class StompFrame implements Command {
|
|||
}
|
||||
|
||||
public String getBody() {
|
||||
try {
|
||||
return new String(content, "UTF-8");
|
||||
} catch (UnsupportedEncodingException e) {
|
||||
return new String(content);
|
||||
}
|
||||
return new String(content, StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
public void setContent(byte[] data) {
|
||||
|
|
|
@ -219,6 +219,84 @@ public class StompTest extends StompTestSupport {
|
|||
assertTrue(Math.abs(tnow - tmsg) < 1000);
|
||||
}
|
||||
|
||||
// Test that a string that requires 4 bytes to encode using standard
|
||||
// UTF-8 does not break when sent by Stomp and received by JMS/OpenWire
|
||||
// AMQ uses a modified UTF-8 encoding that only uses 3 bytes so conversion
|
||||
// needs to happen so this works.
|
||||
@Test(timeout = 60000)
|
||||
public void testSend4ByteUtf8StompToJms() throws Exception {
|
||||
// Create test string using emojis, requires 4 bytes with standard UTF-8
|
||||
String body = "!®౩\uD83D\uDE42";
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
|
||||
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
|
||||
stompConnection.sendFrame(frame);
|
||||
|
||||
frame = stompConnection.receiveFrame();
|
||||
assertTrue(frame.startsWith("CONNECTED"));
|
||||
|
||||
// publish message with string that requires 4-byte UTF-8 encoding
|
||||
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + body + Stomp.NULL;
|
||||
stompConnection.sendFrame(frame);
|
||||
|
||||
// Verify received message is original sent string
|
||||
TextMessage message = (TextMessage)consumer.receive(2500);
|
||||
assertNotNull(message);
|
||||
assertEquals(body, message.getText());
|
||||
}
|
||||
|
||||
// Test that a string that requires 4 bytes to encode using standard
|
||||
// UTF-8 does not break when sent by JMS/OpenWire and received by Stomp
|
||||
// AMQ uses a modified UTF-8 encoding that only uses 3 bytes so conversion
|
||||
// needs to happen so this works.
|
||||
@Test(timeout = 60000)
|
||||
public void testSend4ByteUtf8JmsToStomp() throws Exception {
|
||||
// Create test string using emojis, requires 4 bytes with standard UTF-8
|
||||
String body = "!®౩\uD83D\uDE42";
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
|
||||
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
|
||||
stompConnection.sendFrame(frame);
|
||||
|
||||
frame = stompConnection.receiveFrame();
|
||||
assertTrue(frame.startsWith("CONNECTED"));
|
||||
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
|
||||
stompConnection.sendFrame(frame);
|
||||
|
||||
// publish message with string that requires 4-byte UTF-8 encoding
|
||||
producer.send(session.createTextMessage(body));
|
||||
|
||||
// Verify received message is original sent string
|
||||
StompFrame message = stompConnection.receive();
|
||||
assertNotNull(message);
|
||||
assertEquals(body, message.getBody());
|
||||
}
|
||||
|
||||
// Test that a string that requires 4 bytes to encode using standard
|
||||
// UTF-8 does not break when sent by Stomp and received by Stomp
|
||||
@Test(timeout = 60000)
|
||||
public void testSend4ByteUtf8StompToStomp() throws Exception {
|
||||
// Create test string using emojis, requires 4 bytes with standard UTF-8
|
||||
String body = "!®౩\uD83D\uDE42";
|
||||
|
||||
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
|
||||
stompConnection.sendFrame(frame);
|
||||
|
||||
// publish message with string that requires 4-byte UTF-8 encoding
|
||||
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + body + Stomp.NULL;
|
||||
stompConnection.sendFrame(frame);
|
||||
|
||||
frame = stompConnection.receiveFrame();
|
||||
assertTrue(frame.startsWith("CONNECTED"));
|
||||
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
|
||||
stompConnection.sendFrame(frame);
|
||||
|
||||
// Verify received message is original sent string
|
||||
StompFrame message = stompConnection.receive();
|
||||
assertNotNull(message);
|
||||
assertEquals(body, message.getBody());
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testJMSXGroupIdCanBeSet() throws Exception {
|
||||
|
||||
|
|
Loading…
Reference in New Issue