From 64724c3520586c6cd1bc0aec9942ae5bb5562459 Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Tue, 16 Jan 2018 22:24:08 +0100 Subject: [PATCH] ARTEMIS-1616 OpenWire improvements Used SimpleString on OpenWireMessageConverter to avoid translations on CoreMessage --- .../openwire/OpenWireMessageConverter.java | 337 +++++++++--------- .../protocol/openwire/amq/AMQConsumer.java | 2 +- 2 files changed, 170 insertions(+), 169 deletions(-) diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index 54f3c9948d..457593d76a 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -71,29 +71,31 @@ import org.fusesource.hawtbuf.UTF8Buffer; public class OpenWireMessageConverter implements MessageConverter { - public static final String AMQ_PREFIX = "__HDR_"; - public static final String AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY = AMQ_PREFIX + "dlqDeliveryFailureCause"; + private static final SimpleString JMS_TYPE_PROPERTY = new SimpleString("JMSType"); + private static final SimpleString JMS_CORRELATION_ID_PROPERTY = new SimpleString("JMSCorrelationID"); + private static final SimpleString AMQ_PREFIX = new SimpleString("__HDR_"); + public static final SimpleString AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY = new SimpleString(AMQ_PREFIX + "dlqDeliveryFailureCause"); - private static final String AMQ_MSG_ARRIVAL = AMQ_PREFIX + "ARRIVAL"; - private static final String AMQ_MSG_BROKER_IN_TIME = AMQ_PREFIX + "BROKER_IN_TIME"; + private static final SimpleString AMQ_MSG_ARRIVAL = new SimpleString(AMQ_PREFIX + "ARRIVAL"); + private static final SimpleString AMQ_MSG_BROKER_IN_TIME = new SimpleString(AMQ_PREFIX + "BROKER_IN_TIME"); - private static final String AMQ_MSG_BROKER_PATH = AMQ_PREFIX + "BROKER_PATH"; - private static final String AMQ_MSG_CLUSTER = AMQ_PREFIX + "CLUSTER"; - private static final String AMQ_MSG_COMMAND_ID = AMQ_PREFIX + "COMMAND_ID"; - private static final String AMQ_MSG_DATASTRUCTURE = AMQ_PREFIX + "DATASTRUCTURE"; - private static final String AMQ_MSG_GROUP_ID = org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID.toString(); - private static final String AMQ_MSG_GROUP_SEQUENCE = AMQ_PREFIX + "GROUP_SEQUENCE"; - private static final String AMQ_MSG_MESSAGE_ID = AMQ_PREFIX + "MESSAGE_ID"; - private static final String AMQ_MSG_ORIG_DESTINATION = AMQ_PREFIX + "ORIG_DESTINATION"; - private static final String AMQ_MSG_ORIG_TXID = AMQ_PREFIX + "ORIG_TXID"; - private static final String AMQ_MSG_PRODUCER_ID = AMQ_PREFIX + "PRODUCER_ID"; - private static final String AMQ_MSG_MARSHALL_PROP = AMQ_PREFIX + "MARSHALL_PROP"; - private static final String AMQ_MSG_REPLY_TO = AMQ_PREFIX + "REPLY_TO"; + private static final SimpleString AMQ_MSG_BROKER_PATH = new SimpleString(AMQ_PREFIX + "BROKER_PATH"); + private static final SimpleString AMQ_MSG_CLUSTER = new SimpleString(AMQ_PREFIX + "CLUSTER"); + private static final SimpleString AMQ_MSG_COMMAND_ID = new SimpleString(AMQ_PREFIX + "COMMAND_ID"); + private static final SimpleString AMQ_MSG_DATASTRUCTURE = new SimpleString(AMQ_PREFIX + "DATASTRUCTURE"); + private static final SimpleString AMQ_MSG_GROUP_ID = org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID; + private static final SimpleString AMQ_MSG_GROUP_SEQUENCE = new SimpleString(AMQ_PREFIX + "GROUP_SEQUENCE"); + private static final SimpleString AMQ_MSG_MESSAGE_ID = new SimpleString(AMQ_PREFIX + "MESSAGE_ID"); + private static final SimpleString AMQ_MSG_ORIG_DESTINATION = new SimpleString(AMQ_PREFIX + "ORIG_DESTINATION"); + private static final SimpleString AMQ_MSG_ORIG_TXID = new SimpleString(AMQ_PREFIX + "ORIG_TXID"); + private static final SimpleString AMQ_MSG_PRODUCER_ID = new SimpleString(AMQ_PREFIX + "PRODUCER_ID"); + private static final SimpleString AMQ_MSG_MARSHALL_PROP = new SimpleString(AMQ_PREFIX + "MARSHALL_PROP"); + private static final SimpleString AMQ_MSG_REPLY_TO = new SimpleString(AMQ_PREFIX + "REPLY_TO"); - private static final String AMQ_MSG_USER_ID = AMQ_PREFIX + "USER_ID"; + private static final SimpleString AMQ_MSG_USER_ID = new SimpleString(AMQ_PREFIX + "USER_ID"); - private static final String AMQ_MSG_DROPPABLE = AMQ_PREFIX + "DROPPABLE"; - private static final String AMQ_MSG_COMPRESSED = AMQ_PREFIX + "COMPRESSED"; + private static final SimpleString AMQ_MSG_DROPPABLE = new SimpleString(AMQ_PREFIX + "DROPPABLE"); + private static final SimpleString AMQ_MSG_COMPRESSED = new SimpleString(AMQ_PREFIX + "COMPRESSED"); private static final String AMQ_NOTIFICATIONS_DESTINATION = "activemq.notifications"; @@ -128,7 +130,7 @@ public class OpenWireMessageConverter implements MessageConverter 0) { - mapData.decode(buffer.byteBuf()); - Map map = mapData.getMap(); - ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize()); - OutputStream os = out; - if (isCompressed) { - os = new DeflaterOutputStream(os, true); - } - try (DataOutputStream dataOut = new DataOutputStream(os)) { - MarshallingSupport.marshalPrimitiveMap(map, dataOut); - dataOut.flush(); - } - bytes = out.toByteArray(); - } - } else if (coreType == org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE) { - if (buffer.readableBytes() > 0) { - int len = buffer.readInt(); - bytes = new byte[len]; - buffer.readBytes(bytes); - if (isCompressed) { - ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); - try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) { - out.write(bytes); - out.flush(); - } - bytes = bytesOut.toByteArray(); - } - } - } else if (coreType == org.apache.activemq.artemis.api.core.Message.STREAM_TYPE) { - org.apache.activemq.util.ByteArrayOutputStream bytesOut = new org.apache.activemq.util.ByteArrayOutputStream(); + if (coreType == org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) { + SimpleString text = buffer.readNullableSimpleString(); + if (text != null) { + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(text.length() + 4); OutputStream out = bytesOut; if (isCompressed) { - out = new DeflaterOutputStream(bytesOut, true); + out = new DeflaterOutputStream(out, true); } try (DataOutputStream dataOut = new DataOutputStream(out)) { - - boolean stop = false; - while (!stop && buffer.readable()) { - byte primitiveType = buffer.readByte(); - switch (primitiveType) { - case DataConstants.BOOLEAN: - MarshallingSupport.marshalBoolean(dataOut, buffer.readBoolean()); - break; - case DataConstants.BYTE: - MarshallingSupport.marshalByte(dataOut, buffer.readByte()); - break; - case DataConstants.BYTES: - int len = buffer.readInt(); - byte[] bytesData = new byte[len]; - buffer.readBytes(bytesData); - MarshallingSupport.marshalByteArray(dataOut, bytesData); - break; - case DataConstants.CHAR: - char ch = (char) buffer.readShort(); - MarshallingSupport.marshalChar(dataOut, ch); - break; - case DataConstants.DOUBLE: - double doubleVal = Double.longBitsToDouble(buffer.readLong()); - MarshallingSupport.marshalDouble(dataOut, doubleVal); - break; - case DataConstants.FLOAT: - Float floatVal = Float.intBitsToFloat(buffer.readInt()); - MarshallingSupport.marshalFloat(dataOut, floatVal); - break; - case DataConstants.INT: - MarshallingSupport.marshalInt(dataOut, buffer.readInt()); - break; - case DataConstants.LONG: - MarshallingSupport.marshalLong(dataOut, buffer.readLong()); - break; - case DataConstants.SHORT: - MarshallingSupport.marshalShort(dataOut, buffer.readShort()); - break; - case DataConstants.STRING: - String string = buffer.readNullableString(); - if (string == null) { - MarshallingSupport.marshalNull(dataOut); - } else { - MarshallingSupport.marshalString(dataOut, string); - } - break; - default: - //now we stop - stop = true; - break; - } - dataOut.flush(); - } - } - bytes = bytesOut.toByteArray(); - } else if (coreType == org.apache.activemq.artemis.api.core.Message.BYTES_TYPE) { - int n = buffer.readableBytes(); - bytes = new byte[n]; - buffer.readBytes(bytes); - if (isCompressed) { - int length = bytes.length; - Deflater deflater = new Deflater(); - try (org.apache.activemq.util.ByteArrayOutputStream compressed = new org.apache.activemq.util.ByteArrayOutputStream()) { - compressed.write(new byte[4]); - deflater.setInput(bytes); - deflater.finish(); - byte[] bytesBuf = new byte[1024]; - while (!deflater.finished()) { - int count = deflater.deflate(bytesBuf); - compressed.write(bytesBuf, 0, count); - } - compressed.flush(); - ByteSequence byteSeq = compressed.toByteSequence(); - ByteSequenceData.writeIntBig(byteSeq, length); - bytes = Arrays.copyOfRange(byteSeq.data, 0, byteSeq.length); - } finally { - deflater.end(); - } - } - } else { - int n = buffer.readableBytes(); - bytes = new byte[n]; - buffer.readBytes(bytes); - if (isCompressed) { - try (ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); - DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) { - out.write(bytes); - out.flush(); - bytes = bytesOut.toByteArray(); - } + MarshallingSupport.writeUTF8(dataOut, text.toString()); + dataOut.flush(); + bytes = bytesOut.toByteArray(); } } + } else if (coreType == org.apache.activemq.artemis.api.core.Message.MAP_TYPE) { + TypedProperties mapData = new TypedProperties(); + //it could be a null map + if (buffer.readableBytes() > 0) { + mapData.decode(buffer.byteBuf()); + Map map = mapData.getMap(); + ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize()); + OutputStream os = out; + if (isCompressed) { + os = new DeflaterOutputStream(os, true); + } + try (DataOutputStream dataOut = new DataOutputStream(os)) { + MarshallingSupport.marshalPrimitiveMap(map, dataOut); + dataOut.flush(); + } + bytes = out.toByteArray(); + } - buffer.resetReaderIndex();// this is important for topics as the buffer - // may be read multiple times + } else if (coreType == org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE) { + if (buffer.readableBytes() > 0) { + int len = buffer.readInt(); + bytes = new byte[len]; + buffer.readBytes(bytes); + if (isCompressed) { + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) { + out.write(bytes); + out.flush(); + } + bytes = bytesOut.toByteArray(); + } + } + } else if (coreType == org.apache.activemq.artemis.api.core.Message.STREAM_TYPE) { + org.apache.activemq.util.ByteArrayOutputStream bytesOut = new org.apache.activemq.util.ByteArrayOutputStream(); + OutputStream out = bytesOut; + if (isCompressed) { + out = new DeflaterOutputStream(bytesOut, true); + } + try (DataOutputStream dataOut = new DataOutputStream(out)) { + + boolean stop = false; + while (!stop && buffer.readable()) { + byte primitiveType = buffer.readByte(); + switch (primitiveType) { + case DataConstants.BOOLEAN: + MarshallingSupport.marshalBoolean(dataOut, buffer.readBoolean()); + break; + case DataConstants.BYTE: + MarshallingSupport.marshalByte(dataOut, buffer.readByte()); + break; + case DataConstants.BYTES: + int len = buffer.readInt(); + byte[] bytesData = new byte[len]; + buffer.readBytes(bytesData); + MarshallingSupport.marshalByteArray(dataOut, bytesData); + break; + case DataConstants.CHAR: + char ch = (char) buffer.readShort(); + MarshallingSupport.marshalChar(dataOut, ch); + break; + case DataConstants.DOUBLE: + double doubleVal = Double.longBitsToDouble(buffer.readLong()); + MarshallingSupport.marshalDouble(dataOut, doubleVal); + break; + case DataConstants.FLOAT: + Float floatVal = Float.intBitsToFloat(buffer.readInt()); + MarshallingSupport.marshalFloat(dataOut, floatVal); + break; + case DataConstants.INT: + MarshallingSupport.marshalInt(dataOut, buffer.readInt()); + break; + case DataConstants.LONG: + MarshallingSupport.marshalLong(dataOut, buffer.readLong()); + break; + case DataConstants.SHORT: + MarshallingSupport.marshalShort(dataOut, buffer.readShort()); + break; + case DataConstants.STRING: + String string = buffer.readNullableString(); + if (string == null) { + MarshallingSupport.marshalNull(dataOut); + } else { + MarshallingSupport.marshalString(dataOut, string); + } + break; + default: + //now we stop + stop = true; + break; + } + dataOut.flush(); + } + } + bytes = bytesOut.toByteArray(); + } else if (coreType == org.apache.activemq.artemis.api.core.Message.BYTES_TYPE) { + int n = buffer.readableBytes(); + bytes = new byte[n]; + buffer.readBytes(bytes); + if (isCompressed) { + int length = bytes.length; + Deflater deflater = new Deflater(); + try (org.apache.activemq.util.ByteArrayOutputStream compressed = new org.apache.activemq.util.ByteArrayOutputStream()) { + compressed.write(new byte[4]); + deflater.setInput(bytes); + deflater.finish(); + byte[] bytesBuf = new byte[1024]; + while (!deflater.finished()) { + int count = deflater.deflate(bytesBuf); + compressed.write(bytesBuf, 0, count); + } + compressed.flush(); + ByteSequence byteSeq = compressed.toByteSequence(); + ByteSequenceData.writeIntBig(byteSeq, length); + bytes = Arrays.copyOfRange(byteSeq.data, 0, byteSeq.length); + } finally { + deflater.end(); + } + } + } else { + int n = buffer.readableBytes(); + bytes = new byte[n]; + buffer.readBytes(bytes); + if (isCompressed) { + try (ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) { + out.write(bytes); + out.flush(); + bytes = bytesOut.toByteArray(); + } + } } + + buffer.resetReaderIndex();// this is important for topics as the buffer + // may be read multiple times } //we need check null because messages may come from other clients @@ -767,7 +768,7 @@ public class OpenWireMessageConverter implements MessageConverter