diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index 0948f8a926..3dc4a4ea7d 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -120,7 +120,7 @@ public class OpenWireMessageConverter implements MessageConverter 0) { - mapData.decode(buffer.byteBuf()); - Map map = mapData.getMap(); - ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize()); - OutputStream os = out; - if (isCompressed) { - os = new DeflaterOutputStream(os, true); - } - try (DataOutputStream dataOut = new DataOutputStream(os)) { - MarshallingSupport.marshalPrimitiveMap(map, dataOut); - dataOut.flush(); - } - bytes = out.toByteArray(); - } - - } else if (coreType == org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE) { - if (buffer.readableBytes() > 0) { - int len = buffer.readInt(); - bytes = new byte[len]; - buffer.readBytes(bytes); - if (isCompressed) { - ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); - try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) { - out.write(bytes); - out.flush(); - } - bytes = bytesOut.toByteArray(); - } - } - } else if (coreType == org.apache.activemq.artemis.api.core.Message.STREAM_TYPE) { - org.apache.activemq.util.ByteArrayOutputStream bytesOut = new org.apache.activemq.util.ByteArrayOutputStream(); - OutputStream out = bytesOut; - if (isCompressed) { - out = new DeflaterOutputStream(bytesOut, true); - } - try (DataOutputStream dataOut = new DataOutputStream(out)) { - - boolean stop = false; - while (!stop && buffer.readable()) { - byte primitiveType = buffer.readByte(); - switch (primitiveType) { - case DataConstants.BOOLEAN: - MarshallingSupport.marshalBoolean(dataOut, buffer.readBoolean()); - break; - case DataConstants.BYTE: - MarshallingSupport.marshalByte(dataOut, buffer.readByte()); - break; - case DataConstants.BYTES: - int len = buffer.readInt(); - byte[] bytesData = new byte[len]; - buffer.readBytes(bytesData); - MarshallingSupport.marshalByteArray(dataOut, bytesData); - break; - case DataConstants.CHAR: - char ch = (char) buffer.readShort(); - MarshallingSupport.marshalChar(dataOut, ch); - break; - case DataConstants.DOUBLE: - double doubleVal = Double.longBitsToDouble(buffer.readLong()); - MarshallingSupport.marshalDouble(dataOut, doubleVal); - break; - case DataConstants.FLOAT: - Float floatVal = Float.intBitsToFloat(buffer.readInt()); - MarshallingSupport.marshalFloat(dataOut, floatVal); - break; - case DataConstants.INT: - MarshallingSupport.marshalInt(dataOut, buffer.readInt()); - break; - case DataConstants.LONG: - MarshallingSupport.marshalLong(dataOut, buffer.readLong()); - break; - case DataConstants.SHORT: - MarshallingSupport.marshalShort(dataOut, buffer.readShort()); - break; - case DataConstants.STRING: - String string = buffer.readNullableString(); - if (string == null) { - MarshallingSupport.marshalNull(dataOut); - } else { - MarshallingSupport.marshalString(dataOut, string); - } - break; - default: - //now we stop - stop = true; - break; - } - dataOut.flush(); - } - } - bytes = bytesOut.toByteArray(); - } else if (coreType == org.apache.activemq.artemis.api.core.Message.BYTES_TYPE) { - int n = buffer.readableBytes(); - bytes = new byte[n]; - buffer.readBytes(bytes); - if (isCompressed) { - int length = bytes.length; - Deflater deflater = new Deflater(); - try (org.apache.activemq.util.ByteArrayOutputStream compressed = new org.apache.activemq.util.ByteArrayOutputStream()) { - compressed.write(new byte[4]); - deflater.setInput(bytes); - deflater.finish(); - byte[] bytesBuf = new byte[1024]; - while (!deflater.finished()) { - int count = deflater.deflate(bytesBuf); - compressed.write(bytesBuf, 0, count); - } - compressed.flush(); - ByteSequence byteSeq = compressed.toByteSequence(); - ByteSequenceData.writeIntBig(byteSeq, length); - bytes = Arrays.copyOfRange(byteSeq.data, 0, byteSeq.length); - } finally { - deflater.end(); - } - } - } else { - int n = buffer.readableBytes(); - bytes = new byte[n]; - buffer.readBytes(bytes); - if (isCompressed) { - try (ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); - DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) { - out.write(bytes); - out.flush(); - bytes = bytesOut.toByteArray(); - } - } - } - - buffer.resetReaderIndex();// this is important for topics as the buffer - // may be read multiple times - } - //we need check null because messages may come from other clients //and those amq specific attribute may not be set. Long arrival = (Long) coreMessage.getObjectProperty(AMQ_MSG_ARRIVAL); @@ -740,24 +590,14 @@ public class OpenWireMessageConverter implements MessageConverter props = coreMessage.getPropertyNames(); + final Set props = coreMessage.getPropertyNames(); if (props != null) { - for (SimpleString s : props) { - String keyStr = s.toString(); - if ((keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_")) && - !consumer.hasNotificationDestination()) { - continue; - } - Object prop = coreMessage.getObjectProperty(s); - try { - if (prop instanceof SimpleString) { - amqMsg.setObjectProperty(s.toString(), prop.toString()); - } else { - if (keyStr.equals(MessageUtil.JMSXDELIVERYCOUNT) && prop instanceof Long) { - Long l = (Long) prop; - amqMsg.setObjectProperty(s.toString(), l.intValue()); - } else { - amqMsg.setObjectProperty(s.toString(), prop); - } - } - } catch (JMSException e) { - throw new IOException("exception setting property " + s + " : " + prop, e); - } - } + setAMQMsgObjectProperties(amqMsg, coreMessage, props, consumer); } - amqMsg.setCompressed(isCompressed); if (bytes != null) { ByteSequence content = new ByteSequence(bytes); amqMsg.setContent(content); @@ -896,4 +701,269 @@ public class OpenWireMessageConverter implements MessageConverter 0) { + TypedProperties mapData = new TypedProperties(); + mapData.decode(buffer.byteBuf()); + Map map = mapData.getMap(); + ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize()); + OutputStream os = out; + if (isCompressed) { + os = new DeflaterOutputStream(os, true); + } + try (DataOutputStream dataOut = new DataOutputStream(os)) { + MarshallingSupport.marshalPrimitiveMap(map, dataOut); + dataOut.flush(); + } + bytes = out.toByteArray(); + } + return bytes; + } + + private static byte[] toAMQMessageObjectType(final ActiveMQBuffer buffer, + final boolean isCompressed) throws IOException { + byte[] bytes = null; + if (buffer.readableBytes() > 0) { + int len = buffer.readInt(); + bytes = new byte[len]; + buffer.readBytes(bytes); + if (isCompressed) { + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) { + out.write(bytes); + out.flush(); + } + bytes = bytesOut.toByteArray(); + } + } + return bytes; + } + + private static byte[] toAMQMessageStreamType(final ActiveMQBuffer buffer, + final boolean isCompressed) throws IOException { + byte[] bytes; + org.apache.activemq.util.ByteArrayOutputStream bytesOut = new org.apache.activemq.util.ByteArrayOutputStream(); + OutputStream out = bytesOut; + if (isCompressed) { + out = new DeflaterOutputStream(bytesOut, true); + } + try (DataOutputStream dataOut = new DataOutputStream(out)) { + + boolean stop = false; + while (!stop && buffer.readable()) { + byte primitiveType = buffer.readByte(); + switch (primitiveType) { + case DataConstants.BOOLEAN: + MarshallingSupport.marshalBoolean(dataOut, buffer.readBoolean()); + break; + case DataConstants.BYTE: + MarshallingSupport.marshalByte(dataOut, buffer.readByte()); + break; + case DataConstants.BYTES: + int len = buffer.readInt(); + byte[] bytesData = new byte[len]; + buffer.readBytes(bytesData); + MarshallingSupport.marshalByteArray(dataOut, bytesData); + break; + case DataConstants.CHAR: + char ch = (char) buffer.readShort(); + MarshallingSupport.marshalChar(dataOut, ch); + break; + case DataConstants.DOUBLE: + double doubleVal = Double.longBitsToDouble(buffer.readLong()); + MarshallingSupport.marshalDouble(dataOut, doubleVal); + break; + case DataConstants.FLOAT: + Float floatVal = Float.intBitsToFloat(buffer.readInt()); + MarshallingSupport.marshalFloat(dataOut, floatVal); + break; + case DataConstants.INT: + MarshallingSupport.marshalInt(dataOut, buffer.readInt()); + break; + case DataConstants.LONG: + MarshallingSupport.marshalLong(dataOut, buffer.readLong()); + break; + case DataConstants.SHORT: + MarshallingSupport.marshalShort(dataOut, buffer.readShort()); + break; + case DataConstants.STRING: + String string = buffer.readNullableString(); + if (string == null) { + MarshallingSupport.marshalNull(dataOut); + } else { + MarshallingSupport.marshalString(dataOut, string); + } + break; + default: + //now we stop + stop = true; + break; + } + dataOut.flush(); + } + } + bytes = bytesOut.toByteArray(); + return bytes; + } + + private static byte[] toAMQMessageBytesType(final ActiveMQBuffer buffer, + final boolean isCompressed) throws IOException { + int n = buffer.readableBytes(); + byte[] bytes = new byte[n]; + buffer.readBytes(bytes); + if (isCompressed) { + bytes = toAMQMessageCompressedBytesType(bytes); + } + return bytes; + } + + private static byte[] toAMQMessageCompressedBytesType(final byte[] bytes) throws IOException { + int length = bytes.length; + Deflater deflater = new Deflater(); + try (org.apache.activemq.util.ByteArrayOutputStream compressed = new org.apache.activemq.util.ByteArrayOutputStream()) { + compressed.write(new byte[4]); + deflater.setInput(bytes); + deflater.finish(); + byte[] bytesBuf = new byte[1024]; + while (!deflater.finished()) { + int count = deflater.deflate(bytesBuf); + compressed.write(bytesBuf, 0, count); + } + compressed.flush(); + ByteSequence byteSeq = compressed.toByteSequence(); + ByteSequenceData.writeIntBig(byteSeq, length); + return Arrays.copyOfRange(byteSeq.data, 0, byteSeq.length); + } finally { + deflater.end(); + } + } + + private static byte[] toAMQMessageDefaultType(final ActiveMQBuffer buffer, + final boolean isCompressed) throws IOException { + int n = buffer.readableBytes(); + byte[] bytes = new byte[n]; + buffer.readBytes(bytes); + if (isCompressed) { + try (ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) { + out.write(bytes); + out.flush(); + bytes = bytesOut.toByteArray(); + } + } + return bytes; + } + + private static void setAMQMsgBrokerPath(final ActiveMQMessage amqMsg, final String brokerPath) { + String[] brokers = brokerPath.split(","); + BrokerId[] bids = new BrokerId[brokers.length]; + for (int i = 0; i < bids.length; i++) { + bids[i] = new BrokerId(brokers[i]); + } + amqMsg.setBrokerPath(bids); + } + + private static void setAMQMsgClusterPath(final ActiveMQMessage amqMsg, final String clusterPath) { + String[] cluster = clusterPath.split(","); + BrokerId[] bids = new BrokerId[cluster.length]; + for (int i = 0; i < bids.length; i++) { + bids[i] = new BrokerId(cluster[i]); + } + amqMsg.setCluster(bids); + } + + private static void setAMQMsgDataStructure(final ActiveMQMessage amqMsg, + final WireFormat marshaller, + final byte[] dsBytes) throws IOException { + ByteSequence seq = new ByteSequence(dsBytes); + DataStructure ds = (DataStructure) marshaller.unmarshal(seq); + amqMsg.setDataStructure(ds); + } + + private static void setAMQMsgOriginalDestination(final ActiveMQMessage amqMsg, + final WireFormat marshaller, + final byte[] origDestBytes) throws IOException { + ActiveMQDestination origDest = (ActiveMQDestination) marshaller.unmarshal(new ByteSequence(origDestBytes)); + amqMsg.setOriginalDestination(origDest); + } + + private static void setAMQMsgOriginalTransactionId(final ActiveMQMessage amqMsg, + final WireFormat marshaller, + final byte[] origTxIdBytes) throws IOException { + TransactionId origTxId = (TransactionId) marshaller.unmarshal(new ByteSequence(origTxIdBytes)); + amqMsg.setOriginalTransactionId(origTxId); + } + + private static void setAMQMsgReplyTo(final ActiveMQMessage amqMsg, + final WireFormat marshaller, + final byte[] replyToBytes) throws IOException { + ActiveMQDestination replyTo = (ActiveMQDestination) marshaller.unmarshal(new ByteSequence(replyToBytes)); + amqMsg.setReplyTo(replyTo); + } + + private static void setAMQMsgDlqDeliveryFailureCause(final ActiveMQMessage amqMsg, + final SimpleString dlqCause) throws IOException { + try { + amqMsg.setStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, dlqCause.toString()); + } catch (JMSException e) { + throw new IOException("failure to set dlq property " + dlqCause, e); + } + } + + private static void setAMQMsgHdrLastValueName(final ActiveMQMessage amqMsg, + final SimpleString lastValueProperty) throws IOException { + try { + amqMsg.setStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_LAST_VALUE_NAME.toString(), lastValueProperty.toString()); + } catch (JMSException e) { + throw new IOException("failure to set lvq property " + lastValueProperty, e); + } + } + + private static void setAMQMsgObjectProperties(final ActiveMQMessage amqMsg, + final ICoreMessage coreMessage, + final Set props, + final AMQConsumer consumer) throws IOException { + for (SimpleString s : props) { + final String keyStr = s.toString(); + if (!consumer.hasNotificationDestination() && (keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_"))) { + continue; + } + final Object prop = coreMessage.getObjectProperty(s); + try { + if (prop instanceof SimpleString) { + amqMsg.setObjectProperty(keyStr, prop.toString()); + } else { + if (keyStr.equals(MessageUtil.JMSXDELIVERYCOUNT) && prop instanceof Long) { + Long l = (Long) prop; + amqMsg.setObjectProperty(keyStr, l.intValue()); + } else { + amqMsg.setObjectProperty(keyStr, prop); + } + } + } catch (JMSException e) { + throw new IOException("exception setting property " + s + " : " + prop, e); + } + } + } }