ARTEMIS-2844 Save additional copies and use pooled direct ByteBufs
This commit is contained in:
parent
0db13ee8ab
commit
70068a0659
|
@ -17,10 +17,9 @@
|
|||
|
||||
package org.apache.activemq.artemis.core.protocol.mqtt;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.buffer.ByteBufUtil;
|
||||
import io.netty.buffer.EmptyByteBuf;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
|
||||
|
@ -267,20 +266,21 @@ public class MQTTPublishManager {
|
|||
}
|
||||
|
||||
private void sendServerMessage(int messageId, ICoreMessage message, int deliveryCount, int qos) {
|
||||
String address = MQTTUtil.convertCoreAddressFilterToMQTT(message.getAddress() == null ? "" : message.getAddress().toString(), session.getWildcardConfiguration());
|
||||
String address = MQTTUtil.convertCoreAddressFilterToMQTT(message.getAddress() == null ? "" : message.getAddress(), session.getWildcardConfiguration());
|
||||
boolean isRetain = message.getBooleanProperty(MQTT_MESSAGE_RETAIN_KEY);
|
||||
|
||||
ByteBuf payload;
|
||||
switch (message.getType()) {
|
||||
case Message.TEXT_TYPE:
|
||||
SimpleString text = message.getDataBuffer().readNullableSimpleString();
|
||||
byte[] stringPayload = text.toString().getBytes(StandardCharsets.UTF_8);
|
||||
payload = ByteBufAllocator.DEFAULT.buffer(stringPayload.length);
|
||||
payload.writeBytes(stringPayload);
|
||||
final int utf8Bytes = ByteBufUtil.utf8Bytes(text);
|
||||
payload = ByteBufAllocator.DEFAULT.directBuffer(utf8Bytes);
|
||||
// IMPORTANT: this one won't enlarge ByteBuf by ByteBufUtil.maxUtf8Bytes(text), but just utf8Bytes
|
||||
ByteBufUtil.reserveAndWriteUtf8(payload, text, utf8Bytes);
|
||||
break;
|
||||
default:
|
||||
ActiveMQBuffer bodyBuffer = message.getDataBuffer();
|
||||
payload = ByteBufAllocator.DEFAULT.buffer(bodyBuffer.writerIndex());
|
||||
payload = ByteBufAllocator.DEFAULT.directBuffer(bodyBuffer.writerIndex());
|
||||
payload.writeBytes(bodyBuffer.byteBuf());
|
||||
break;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue