diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index 83ff6d6bea..54f3c9948d 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -123,10 +123,10 @@ public class OpenWireMessageConverter implements MessageConverter map = MarshallingSupport.unmarshalPrimitiveMap(mdataIn); - mdataIn.close(); - TypedProperties props = new TypedProperties(); - loadMapIntoProperties(props, map); - props.encode(body.byteBuf()); + writeMapType(contents, messageCompressed, body); break; case org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE: - if (messageCompressed) { - try (InputStream ois = new InflaterInputStream(new ByteArrayInputStream(contents)); - org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream()) { - byte[] buf = new byte[1024]; - int n = ois.read(buf); - while (n != -1) { - decompressed.write(buf, 0, n); - n = ois.read(); - } - //read done - contents = decompressed.toByteSequence(); - } - } - body.writeInt(contents.length); - body.writeBytes(contents.data, contents.offset, contents.length); + writeObjectType(contents, messageCompressed, body); break; case org.apache.activemq.artemis.api.core.Message.STREAM_TYPE: - InputStream sis = new ByteArrayInputStream(contents); - if (messageCompressed) { - sis = new InflaterInputStream(sis); - } - DataInputStream sdis = new DataInputStream(sis); - int stype = sdis.read(); - while (stype != -1) { - switch (stype) { - case MarshallingSupport.BOOLEAN_TYPE: - body.writeByte(DataConstants.BOOLEAN); - body.writeBoolean(sdis.readBoolean()); - break; - case MarshallingSupport.BYTE_TYPE: - body.writeByte(DataConstants.BYTE); - body.writeByte(sdis.readByte()); - break; - case MarshallingSupport.BYTE_ARRAY_TYPE: - body.writeByte(DataConstants.BYTES); - int slen = sdis.readInt(); - byte[] sbytes = new byte[slen]; - sdis.read(sbytes); - body.writeInt(slen); - body.writeBytes(sbytes); - break; - case MarshallingSupport.CHAR_TYPE: - body.writeByte(DataConstants.CHAR); - char schar = sdis.readChar(); - body.writeShort((short) schar); - break; - case MarshallingSupport.DOUBLE_TYPE: - body.writeByte(DataConstants.DOUBLE); - double sdouble = sdis.readDouble(); - body.writeLong(Double.doubleToLongBits(sdouble)); - break; - case MarshallingSupport.FLOAT_TYPE: - body.writeByte(DataConstants.FLOAT); - float sfloat = sdis.readFloat(); - body.writeInt(Float.floatToIntBits(sfloat)); - break; - case MarshallingSupport.INTEGER_TYPE: - body.writeByte(DataConstants.INT); - body.writeInt(sdis.readInt()); - break; - case MarshallingSupport.LONG_TYPE: - body.writeByte(DataConstants.LONG); - body.writeLong(sdis.readLong()); - break; - case MarshallingSupport.SHORT_TYPE: - body.writeByte(DataConstants.SHORT); - body.writeShort(sdis.readShort()); - break; - case MarshallingSupport.STRING_TYPE: - body.writeByte(DataConstants.STRING); - String sstring = sdis.readUTF(); - body.writeNullableString(sstring); - break; - case MarshallingSupport.BIG_STRING_TYPE: - body.writeByte(DataConstants.STRING); - String sbigString = MarshallingSupport.readUTF8(sdis); - body.writeNullableString(sbigString); - break; - case MarshallingSupport.NULL: - body.writeByte(DataConstants.STRING); - body.writeNullableString(null); - break; - default: - //something we don't know, ignore - break; - } - stype = sdis.read(); - } - sdis.close(); + writeStreamType(contents, messageCompressed, body); break; case org.apache.activemq.artemis.api.core.Message.BYTES_TYPE: - if (messageCompressed) { - Inflater inflater = new Inflater(); - try (org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream()) { - int length = ByteSequenceData.readIntBig(contents); - contents.offset = 0; - byte[] data = Arrays.copyOfRange(contents.getData(), 4, contents.getLength()); - - inflater.setInput(data); - byte[] buffer = new byte[length]; - int count = inflater.inflate(buffer); - decompressed.write(buffer, 0, count); - contents = decompressed.toByteSequence(); - } catch (Exception e) { - throw new IOException(e); - } finally { - inflater.end(); - } - } - body.writeBytes(contents.data, contents.offset, contents.length); + writeBytesType(contents, messageCompressed, body); break; default: - if (messageCompressed) { - try (org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream(); - OutputStream os = new InflaterOutputStream(decompressed)) { - os.write(contents.data, contents.offset, contents.getLength()); - contents = decompressed.toByteSequence(); - } catch (Exception e) { - throw new IOException(e); - } - } - body.writeBytes(contents.data, contents.offset, contents.length); + writeDefaultType(contents, messageCompressed, body); break; } } //amq specific coreMessage.putLongProperty(AMQ_MSG_ARRIVAL, messageSend.getArrival()); coreMessage.putLongProperty(AMQ_MSG_BROKER_IN_TIME, messageSend.getBrokerInTime()); - BrokerId[] brokers = messageSend.getBrokerPath(); + final BrokerId[] brokers = messageSend.getBrokerPath(); if (brokers != null) { - StringBuilder builder = new StringBuilder(); - for (int i = 0; i < brokers.length; i++) { - builder.append(brokers[i].getValue()); - if (i != (brokers.length - 1)) { - builder.append(","); //is this separator safe? - } - } - coreMessage.putStringProperty(AMQ_MSG_BROKER_PATH, builder.toString()); + putMsgBrokerPath(brokers, coreMessage); } - BrokerId[] cluster = messageSend.getCluster(); + final BrokerId[] cluster = messageSend.getCluster(); if (cluster != null) { - StringBuilder builder = new StringBuilder(); - for (int i = 0; i < cluster.length; i++) { - builder.append(cluster[i].getValue()); - if (i != (cluster.length - 1)) { - builder.append(","); //is this separator safe? - } - } - coreMessage.putStringProperty(AMQ_MSG_CLUSTER, builder.toString()); + putMsgCluster(cluster, coreMessage); } coreMessage.putIntProperty(AMQ_MSG_COMMAND_ID, messageSend.getCommandId()); - String corrId = messageSend.getCorrelationId(); + final String corrId = messageSend.getCorrelationId(); if (corrId != null) { coreMessage.putStringProperty("JMSCorrelationID", corrId); } - DataStructure ds = messageSend.getDataStructure(); + final DataStructure ds = messageSend.getDataStructure(); if (ds != null) { - ByteSequence dsBytes = marshaller.marshal(ds); - dsBytes.compact(); - coreMessage.putBytesProperty(AMQ_MSG_DATASTRUCTURE, dsBytes.data); + putMsgDataStructure(ds, marshaller, coreMessage); } - String groupId = messageSend.getGroupID(); + final String groupId = messageSend.getGroupID(); if (groupId != null) { coreMessage.putStringProperty(AMQ_MSG_GROUP_ID, groupId); } coreMessage.putIntProperty(AMQ_MSG_GROUP_SEQUENCE, messageSend.getGroupSequence()); - MessageId messageId = messageSend.getMessageId(); + final MessageId messageId = messageSend.getMessageId(); - ByteSequence midBytes = marshaller.marshal(messageId); + final ByteSequence midBytes = marshaller.marshal(messageId); midBytes.compact(); coreMessage.putBytesProperty(AMQ_MSG_MESSAGE_ID, midBytes.data); - ProducerId producerId = messageSend.getProducerId(); + final ProducerId producerId = messageSend.getProducerId(); if (producerId != null) { - ByteSequence producerIdBytes = marshaller.marshal(producerId); - producerIdBytes.compact(); - coreMessage.putBytesProperty(AMQ_MSG_PRODUCER_ID, producerIdBytes.data); + putMsgProducerId(producerId, marshaller, coreMessage); } - ByteSequence propBytes = messageSend.getMarshalledProperties(); + final ByteSequence propBytes = messageSend.getMarshalledProperties(); if (propBytes != null) { - propBytes.compact(); - coreMessage.putBytesProperty(AMQ_MSG_MARSHALL_PROP, propBytes.data); - //unmarshall properties to core so selector will work - Map props = messageSend.getProperties(); - //Map props = MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(propBytes))); - for (Entry ent : props.entrySet()) { - Object value = ent.getValue(); - try { - coreMessage.putObjectProperty(ent.getKey(), value); - } catch (ActiveMQPropertyConversionException e) { - coreMessage.putStringProperty(ent.getKey(), value.toString()); - } - } + putMsgMarshalledProperties(propBytes, messageSend, coreMessage); } - ActiveMQDestination replyTo = messageSend.getReplyTo(); + final ActiveMQDestination replyTo = messageSend.getReplyTo(); if (replyTo != null) { - ByteSequence replyToBytes = marshaller.marshal(replyTo); - replyToBytes.compact(); - coreMessage.putBytesProperty(AMQ_MSG_REPLY_TO, replyToBytes.data); + putMsgReplyTo(replyTo, marshaller, coreMessage); } - String userId = messageSend.getUserID(); + final String userId = messageSend.getUserID(); if (userId != null) { coreMessage.putStringProperty(AMQ_MSG_USER_ID, userId); } coreMessage.putBooleanProperty(AMQ_MSG_DROPPABLE, messageSend.isDroppable()); - ActiveMQDestination origDest = messageSend.getOriginalDestination(); + final ActiveMQDestination origDest = messageSend.getOriginalDestination(); if (origDest != null) { - ByteSequence origDestBytes = marshaller.marshal(origDest); - origDestBytes.compact(); - coreMessage.putBytesProperty(AMQ_MSG_ORIG_DESTINATION, origDestBytes.data); + putMsgOriginalDestination(origDest, marshaller, coreMessage); } return coreMessage; } + private static void writeTextType(final ByteSequence contents, + final boolean messageCompressed, + final ActiveMQBuffer body) throws IOException { + InputStream tis = new ByteArrayInputStream(contents); + if (messageCompressed) { + tis = new InflaterInputStream(tis); + } + DataInputStream tdataIn = new DataInputStream(tis); + String text = MarshallingSupport.readUTF8(tdataIn); + tdataIn.close(); + body.writeNullableSimpleString(new SimpleString(text)); + } + + private static void writeMapType(final ByteSequence contents, + final boolean messageCompressed, + final ActiveMQBuffer body) throws IOException { + InputStream mis = new ByteArrayInputStream(contents); + if (messageCompressed) { + mis = new InflaterInputStream(mis); + } + DataInputStream mdataIn = new DataInputStream(mis); + Map map = MarshallingSupport.unmarshalPrimitiveMap(mdataIn); + mdataIn.close(); + TypedProperties props = new TypedProperties(); + loadMapIntoProperties(props, map); + props.encode(body.byteBuf()); + } + + private static void writeObjectType(ByteSequence contents, + final boolean messageCompressed, + final ActiveMQBuffer body) throws IOException { + if (messageCompressed) { + contents = writeCompressedObjectType(contents); + } + body.writeInt(contents.length); + body.writeBytes(contents.data, contents.offset, contents.length); + } + + private static ByteSequence writeCompressedObjectType(final ByteSequence contents) throws IOException { + try (InputStream ois = new InflaterInputStream(new ByteArrayInputStream(contents)); + org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream()) { + byte[] buf = new byte[1024]; + int n = ois.read(buf); + while (n != -1) { + decompressed.write(buf, 0, n); + n = ois.read(); + } + //read done + return decompressed.toByteSequence(); + } + } + + private static void writeStreamType(final ByteSequence contents, + final boolean messageCompressed, + final ActiveMQBuffer body) throws IOException { + InputStream sis = new ByteArrayInputStream(contents); + if (messageCompressed) { + sis = new InflaterInputStream(sis); + } + DataInputStream sdis = new DataInputStream(sis); + int stype = sdis.read(); + while (stype != -1) { + switch (stype) { + case MarshallingSupport.BOOLEAN_TYPE: + body.writeByte(DataConstants.BOOLEAN); + body.writeBoolean(sdis.readBoolean()); + break; + case MarshallingSupport.BYTE_TYPE: + body.writeByte(DataConstants.BYTE); + body.writeByte(sdis.readByte()); + break; + case MarshallingSupport.BYTE_ARRAY_TYPE: + body.writeByte(DataConstants.BYTES); + int slen = sdis.readInt(); + byte[] sbytes = new byte[slen]; + sdis.read(sbytes); + body.writeInt(slen); + body.writeBytes(sbytes); + break; + case MarshallingSupport.CHAR_TYPE: + body.writeByte(DataConstants.CHAR); + char schar = sdis.readChar(); + body.writeShort((short) schar); + break; + case MarshallingSupport.DOUBLE_TYPE: + body.writeByte(DataConstants.DOUBLE); + double sdouble = sdis.readDouble(); + body.writeLong(Double.doubleToLongBits(sdouble)); + break; + case MarshallingSupport.FLOAT_TYPE: + body.writeByte(DataConstants.FLOAT); + float sfloat = sdis.readFloat(); + body.writeInt(Float.floatToIntBits(sfloat)); + break; + case MarshallingSupport.INTEGER_TYPE: + body.writeByte(DataConstants.INT); + body.writeInt(sdis.readInt()); + break; + case MarshallingSupport.LONG_TYPE: + body.writeByte(DataConstants.LONG); + body.writeLong(sdis.readLong()); + break; + case MarshallingSupport.SHORT_TYPE: + body.writeByte(DataConstants.SHORT); + body.writeShort(sdis.readShort()); + break; + case MarshallingSupport.STRING_TYPE: + body.writeByte(DataConstants.STRING); + String sstring = sdis.readUTF(); + body.writeNullableString(sstring); + break; + case MarshallingSupport.BIG_STRING_TYPE: + body.writeByte(DataConstants.STRING); + String sbigString = MarshallingSupport.readUTF8(sdis); + body.writeNullableString(sbigString); + break; + case MarshallingSupport.NULL: + body.writeByte(DataConstants.STRING); + body.writeNullableString(null); + break; + default: + //something we don't know, ignore + break; + } + stype = sdis.read(); + } + sdis.close(); + } + + private static void writeBytesType(ByteSequence contents, + final boolean messageCompressed, + final ActiveMQBuffer body) throws IOException { + if (messageCompressed) { + contents = writeCompressedBytesType(contents); + } + body.writeBytes(contents.data, contents.offset, contents.length); + } + + private static ByteSequence writeCompressedBytesType(final ByteSequence contents) throws IOException { + Inflater inflater = new Inflater(); + try (org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream()) { + int length = ByteSequenceData.readIntBig(contents); + contents.offset = 0; + byte[] data = Arrays.copyOfRange(contents.getData(), 4, contents.getLength()); + + inflater.setInput(data); + byte[] buffer = new byte[length]; + int count = inflater.inflate(buffer); + decompressed.write(buffer, 0, count); + return decompressed.toByteSequence(); + } catch (Exception e) { + throw new IOException(e); + } finally { + inflater.end(); + } + } + + private static void writeDefaultType(ByteSequence contents, + final boolean messageCompressed, + final ActiveMQBuffer body) throws IOException { + if (messageCompressed) { + contents = writeCompressedDefaultType(contents); + } + body.writeBytes(contents.data, contents.offset, contents.length); + } + + private static ByteSequence writeCompressedDefaultType(final ByteSequence contents) throws IOException { + try (org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream(); + OutputStream os = new InflaterOutputStream(decompressed)) { + os.write(contents.data, contents.offset, contents.getLength()); + return decompressed.toByteSequence(); + } catch (Exception e) { + throw new IOException(e); + } + } + + private static void putMsgBrokerPath(final BrokerId[] brokers, final CoreMessage coreMessage) { + final StringBuilder builder = new StringBuilder(); + for (int i = 0, size = brokers.length; i < size; i++) { + builder.append(brokers[i].getValue()); + if (i != (size - 1)) { + builder.append(','); //is this separator safe? + } + } + coreMessage.putStringProperty(AMQ_MSG_BROKER_PATH, builder.toString()); + } + + private static void putMsgCluster(final BrokerId[] cluster, final CoreMessage coreMessage) { + final StringBuilder builder = new StringBuilder(); + for (int i = 0, size = cluster.length; i < size; i++) { + builder.append(cluster[i].getValue()); + if (i != (size - 1)) { + builder.append(','); //is this separator safe? + } + } + coreMessage.putStringProperty(AMQ_MSG_CLUSTER, builder.toString()); + } + + private static void putMsgDataStructure(final DataStructure ds, + final WireFormat marshaller, + final CoreMessage coreMessage) throws IOException { + final ByteSequence dsBytes = marshaller.marshal(ds); + dsBytes.compact(); + coreMessage.putBytesProperty(AMQ_MSG_DATASTRUCTURE, dsBytes.data); + } + + private static void putMsgProducerId(final ProducerId producerId, + final WireFormat marshaller, + final CoreMessage coreMessage) throws IOException { + final ByteSequence producerIdBytes = marshaller.marshal(producerId); + producerIdBytes.compact(); + coreMessage.putBytesProperty(AMQ_MSG_PRODUCER_ID, producerIdBytes.data); + } + + private static void putMsgMarshalledProperties(final ByteSequence propBytes, + final Message messageSend, + final CoreMessage coreMessage) throws IOException { + propBytes.compact(); + coreMessage.putBytesProperty(AMQ_MSG_MARSHALL_PROP, propBytes.data); + //unmarshall properties to core so selector will work + final Map props = messageSend.getProperties(); + if (!props.isEmpty()) { + props.forEach((key, value) -> { + try { + coreMessage.putObjectProperty(key, value); + } catch (ActiveMQPropertyConversionException e) { + coreMessage.putStringProperty(key, value.toString()); + } + }); + } + } + + private static void putMsgReplyTo(final ActiveMQDestination replyTo, + final WireFormat marshaller, + final CoreMessage coreMessage) throws IOException { + final ByteSequence replyToBytes = marshaller.marshal(replyTo); + replyToBytes.compact(); + coreMessage.putBytesProperty(AMQ_MSG_REPLY_TO, replyToBytes.data); + } + + private static void putMsgOriginalDestination(final ActiveMQDestination origDest, + final WireFormat marshaller, + final CoreMessage coreMessage) throws IOException { + final ByteSequence origDestBytes = marshaller.marshal(origDest); + origDestBytes.compact(); + coreMessage.putBytesProperty(AMQ_MSG_ORIG_DESTINATION, origDestBytes.data); + } + private static void loadMapIntoProperties(TypedProperties props, Map map) { for (Entry entry : map.entrySet()) { SimpleString key = new SimpleString(entry.getKey());