AMQ-8398 - Fix Stomp to OpenWire UTF-8 translation

This commit fixes conversion of messages that are sent or received
using Stomp when the body contains characters that require 4 bytes
for encoding using standard UTF-8.

ActiveMQ and OpenWire currently use a modified UTF-8
encoding that only uses 3 bytes so the conversion previously was
breaking because the body was encoded using standard JDK UTF-8 encoding
and set directly on an ActiveMQText message which was leading to
decoding errors later when the ActiveMQMessage tried to decode using the
modified encoder. The reverse was also was true and was breaking
in some cases.

The fix now makes sure to correctly decode the Stomp message back to a
String first and set that on the ActiveMQ message so it can be
re-encoded correctly. The reverse is fixed as well so both
conversion from Stomp -> OpenWire and OpenWire -> Stomp work. Tests have
been added for Stomp -> OpenWire, OpenWire -> Stomp, and Stomp -> Stomp
which is really Stomp -> OpenWire -> Stomp.

(cherry picked from commit 3ddf515597)
(cherry picked from commit 73a8d5ca67)
This commit is contained in:
Christopher L. Shannon 2024-08-28 12:44:45 -04:00
parent e4ec7c5360
commit 6f2b70d8d5
3 changed files with 93 additions and 32 deletions

View File

@ -18,6 +18,7 @@ package org.apache.activemq.transport.stomp;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
@ -53,12 +54,9 @@ public class LegacyFrameTranslator implements FrameTranslator {
if(intendedType.equalsIgnoreCase("text")){
ActiveMQTextMessage text = new ActiveMQTextMessage();
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream(command.getContent().length + 4);
DataOutputStream data = new DataOutputStream(bytes);
data.writeInt(command.getContent().length);
data.write(command.getContent());
text.setContent(bytes.toByteSequence());
data.close();
// AMQ-8398 - get the original text back so we decode from standard UTF-8
// and set on the message so it will re-encode using AMQ modified UTF-8
text.setText(command.getBody());
} catch (Throwable e) {
throw new ProtocolException("Text could not bet set: " + e, false, e);
}
@ -78,12 +76,9 @@ public class LegacyFrameTranslator implements FrameTranslator {
} else {
ActiveMQTextMessage text = new ActiveMQTextMessage();
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream(command.getContent().length + 4);
DataOutputStream data = new DataOutputStream(bytes);
data.writeInt(command.getContent().length);
data.write(command.getContent());
text.setContent(bytes.toByteSequence());
data.close();
// AMQ-8398 - get the original text back so we decode from standard UTF-8
// and set on the message so it will re-encode using AMQ modified UTF-8
text.setText(command.getBody());
} catch (Throwable e) {
throw new ProtocolException("Text could not bet set: " + e, false, e);
}
@ -103,22 +98,13 @@ public class LegacyFrameTranslator implements FrameTranslator {
FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(converter, message, command, this);
if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
if (!message.isCompressed() && message.getContent() != null) {
ByteSequence msgContent = message.getContent();
if (msgContent.getLength() > 4) {
byte[] content = new byte[msgContent.getLength() - 4];
System.arraycopy(msgContent.data, 4, content, 0, content.length);
command.setContent(content);
}
} else {
ActiveMQTextMessage msg = (ActiveMQTextMessage)message.copy();
String messageText = msg.getText();
if (messageText != null) {
command.setContent(msg.getText().getBytes("UTF-8"));
}
ActiveMQTextMessage msg = (ActiveMQTextMessage)message.copy();
// AMQ-8398 - get the original text back so we decode from modified UTF-8
// and then we can re-encode using the standard JDK encoding
String messageText = msg.getText();
if (messageText != null) {
command.setContent(msg.getText().getBytes(StandardCharsets.UTF_8));
}
} else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) {
ActiveMQBytesMessage msg = (ActiveMQBytesMessage)message.copy();

View File

@ -17,6 +17,7 @@
package org.apache.activemq.transport.stomp;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Locale;
@ -75,11 +76,7 @@ public class StompFrame implements Command {
}
public String getBody() {
try {
return new String(content, "UTF-8");
} catch (UnsupportedEncodingException e) {
return new String(content);
}
return new String(content, StandardCharsets.UTF_8);
}
public void setContent(byte[] data) {

View File

@ -216,6 +216,84 @@ public class StompTest extends StompTestSupport {
assertTrue(Math.abs(tnow - tmsg) < 1000);
}
// Test that a string that requires 4 bytes to encode using standard
// UTF-8 does not break when sent by Stomp and received by JMS/OpenWire
// AMQ uses a modified UTF-8 encoding that only uses 3 bytes so conversion
// needs to happen so this works.
@Test(timeout = 60000)
public void testSend4ByteUtf8StompToJms() throws Exception {
// Create test string using emojis, requires 4 bytes with standard UTF-8
String body = "!®౩\uD83D\uDE42";
MessageConsumer consumer = session.createConsumer(queue);
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
// publish message with string that requires 4-byte UTF-8 encoding
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + body + Stomp.NULL;
stompConnection.sendFrame(frame);
// Verify received message is original sent string
TextMessage message = (TextMessage)consumer.receive(2500);
assertNotNull(message);
assertEquals(body, message.getText());
}
// Test that a string that requires 4 bytes to encode using standard
// UTF-8 does not break when sent by JMS/OpenWire and received by Stomp
// AMQ uses a modified UTF-8 encoding that only uses 3 bytes so conversion
// needs to happen so this works.
@Test(timeout = 60000)
public void testSend4ByteUtf8JmsToStomp() throws Exception {
// Create test string using emojis, requires 4 bytes with standard UTF-8
String body = "!®౩\uD83D\uDE42";
MessageProducer producer = session.createProducer(queue);
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
// publish message with string that requires 4-byte UTF-8 encoding
producer.send(session.createTextMessage(body));
// Verify received message is original sent string
StompFrame message = stompConnection.receive();
assertNotNull(message);
assertEquals(body, message.getBody());
}
// Test that a string that requires 4 bytes to encode using standard
// UTF-8 does not break when sent by Stomp and received by Stomp
@Test(timeout = 60000)
public void testSend4ByteUtf8StompToStomp() throws Exception {
// Create test string using emojis, requires 4 bytes with standard UTF-8
String body = "!®౩\uD83D\uDE42";
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
// publish message with string that requires 4-byte UTF-8 encoding
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + body + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
// Verify received message is original sent string
StompFrame message = stompConnection.receive();
assertNotNull(message);
assertEquals(body, message.getBody());
}
@Test(timeout = 60000)
public void testJMSXGroupIdCanBeSet() throws Exception {