Merge pull request #1290 from cshannon/AMQ-8398-stomp-fix

AMQ-8398 - Fix Stomp to OpenWire UTF-8 translation
This commit is contained in:
Christopher L. Shannon 2024-08-28 15:04:01 -04:00 committed by GitHub
commit e1bcbb9f3e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 93 additions and 32 deletions

View File

@ -18,6 +18,7 @@ package org.apache.activemq.transport.stomp;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
@ -53,12 +54,9 @@ public class LegacyFrameTranslator implements FrameTranslator {
if(intendedType.equalsIgnoreCase("text")){
ActiveMQTextMessage text = new ActiveMQTextMessage();
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream(command.getContent().length + 4);
DataOutputStream data = new DataOutputStream(bytes);
data.writeInt(command.getContent().length);
data.write(command.getContent());
text.setContent(bytes.toByteSequence());
data.close();
// AMQ-8398 - get the original text back so we decode from standard UTF-8
// and set on the message so it will re-encode using AMQ modified UTF-8
text.setText(command.getBody());
} catch (Throwable e) {
throw new ProtocolException("Text could not bet set: " + e, false, e);
}
@ -78,12 +76,9 @@ public class LegacyFrameTranslator implements FrameTranslator {
} else {
ActiveMQTextMessage text = new ActiveMQTextMessage();
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream(command.getContent().length + 4);
DataOutputStream data = new DataOutputStream(bytes);
data.writeInt(command.getContent().length);
data.write(command.getContent());
text.setContent(bytes.toByteSequence());
data.close();
// AMQ-8398 - get the original text back so we decode from standard UTF-8
// and set on the message so it will re-encode using AMQ modified UTF-8
text.setText(command.getBody());
} catch (Throwable e) {
throw new ProtocolException("Text could not bet set: " + e, false, e);
}
@ -103,22 +98,13 @@ public class LegacyFrameTranslator implements FrameTranslator {
FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(converter, message, command, this);
if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
if (!message.isCompressed() && message.getContent() != null) {
ByteSequence msgContent = message.getContent();
if (msgContent.getLength() > 4) {
byte[] content = new byte[msgContent.getLength() - 4];
System.arraycopy(msgContent.data, 4, content, 0, content.length);
command.setContent(content);
}
} else {
ActiveMQTextMessage msg = (ActiveMQTextMessage)message.copy();
// AMQ-8398 - get the original text back so we decode from modified UTF-8
// and then we can re-encode using the standard JDK encoding
String messageText = msg.getText();
if (messageText != null) {
command.setContent(msg.getText().getBytes("UTF-8"));
command.setContent(msg.getText().getBytes(StandardCharsets.UTF_8));
}
}
} else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) {
ActiveMQBytesMessage msg = (ActiveMQBytesMessage)message.copy();

View File

@ -17,6 +17,7 @@
package org.apache.activemq.transport.stomp;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Locale;
@ -75,11 +76,7 @@ public class StompFrame implements Command {
}
public String getBody() {
try {
return new String(content, "UTF-8");
} catch (UnsupportedEncodingException e) {
return new String(content);
}
return new String(content, StandardCharsets.UTF_8);
}
public void setContent(byte[] data) {

View File

@ -219,6 +219,84 @@ public class StompTest extends StompTestSupport {
assertTrue(Math.abs(tnow - tmsg) < 1000);
}
// Test that a string that requires 4 bytes to encode using standard
// UTF-8 does not break when sent by Stomp and received by JMS/OpenWire
// AMQ uses a modified UTF-8 encoding that only uses 3 bytes so conversion
// needs to happen so this works.
@Test(timeout = 60000)
public void testSend4ByteUtf8StompToJms() throws Exception {
// Create test string using emojis, requires 4 bytes with standard UTF-8
String body = "!®౩\uD83D\uDE42";
MessageConsumer consumer = session.createConsumer(queue);
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
// publish message with string that requires 4-byte UTF-8 encoding
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + body + Stomp.NULL;
stompConnection.sendFrame(frame);
// Verify received message is original sent string
TextMessage message = (TextMessage)consumer.receive(2500);
assertNotNull(message);
assertEquals(body, message.getText());
}
// Test that a string that requires 4 bytes to encode using standard
// UTF-8 does not break when sent by JMS/OpenWire and received by Stomp
// AMQ uses a modified UTF-8 encoding that only uses 3 bytes so conversion
// needs to happen so this works.
@Test(timeout = 60000)
public void testSend4ByteUtf8JmsToStomp() throws Exception {
// Create test string using emojis, requires 4 bytes with standard UTF-8
String body = "!®౩\uD83D\uDE42";
MessageProducer producer = session.createProducer(queue);
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
// publish message with string that requires 4-byte UTF-8 encoding
producer.send(session.createTextMessage(body));
// Verify received message is original sent string
StompFrame message = stompConnection.receive();
assertNotNull(message);
assertEquals(body, message.getBody());
}
// Test that a string that requires 4 bytes to encode using standard
// UTF-8 does not break when sent by Stomp and received by Stomp
@Test(timeout = 60000)
public void testSend4ByteUtf8StompToStomp() throws Exception {
// Create test string using emojis, requires 4 bytes with standard UTF-8
String body = "!®౩\uD83D\uDE42";
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
// publish message with string that requires 4-byte UTF-8 encoding
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + body + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
// Verify received message is original sent string
StompFrame message = stompConnection.receive();
assertNotNull(message);
assertEquals(body, message.getBody());
}
@Test(timeout = 60000)
public void testJMSXGroupIdCanBeSet() throws Exception {