ARTEMIS-233 Convert TextMessage to UTF8 in MQTT
This commit is contained in:
parent
385bd047fe
commit
e453aae5c9
|
@ -17,9 +17,13 @@
|
||||||
|
|
||||||
package org.apache.activemq.artemis.core.protocol.mqtt;
|
package org.apache.activemq.artemis.core.protocol.mqtt;
|
||||||
|
|
||||||
|
import java.io.UnsupportedEncodingException;
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.buffer.ByteBufAllocator;
|
||||||
import io.netty.buffer.EmptyByteBuf;
|
import io.netty.buffer.EmptyByteBuf;
|
||||||
import io.netty.handler.codec.mqtt.MqttMessageType;
|
import io.netty.handler.codec.mqtt.MqttMessageType;
|
||||||
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
import org.apache.activemq.artemis.api.core.Pair;
|
import org.apache.activemq.artemis.api.core.Pair;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.io.IOCallback;
|
import org.apache.activemq.artemis.core.io.IOCallback;
|
||||||
|
@ -216,8 +220,24 @@ public class MQTTPublishManager {
|
||||||
private void sendServerMessage(int messageId, ServerMessageImpl message, int deliveryCount, int qos) {
|
private void sendServerMessage(int messageId, ServerMessageImpl message, int deliveryCount, int qos) {
|
||||||
String address = MQTTUtil.convertCoreAddressFilterToMQTT(message.getAddress().toString());
|
String address = MQTTUtil.convertCoreAddressFilterToMQTT(message.getAddress().toString());
|
||||||
|
|
||||||
ByteBuf payload = message.getBodyBufferDuplicate().byteBuf();
|
ByteBuf payload;
|
||||||
|
switch (message.getType()) {
|
||||||
|
case Message.TEXT_TYPE:
|
||||||
|
try {
|
||||||
|
SimpleString text = message.getBodyBuffer().readNullableSimpleString();
|
||||||
|
byte[] stringPayload = text.toString().getBytes("UTF-8");
|
||||||
|
payload = ByteBufAllocator.DEFAULT.buffer(stringPayload.length);
|
||||||
|
payload.writeBytes(stringPayload);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
catch (UnsupportedEncodingException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
// Do nothing default to sending raw bytes.
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
payload = message.getBodyBufferDuplicate().byteBuf();
|
||||||
|
break;
|
||||||
|
}
|
||||||
session.getProtocolHandler().send(messageId, address, qos, payload, deliveryCount);
|
session.getProtocolHandler().send(messageId, address, qos, payload, deliveryCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue