ARTEMIS-1616 OpenWire improvements
Refactored OpenWireMessageConverter::toAMQMessage into smaller methods
This commit is contained in:
parent
e7a1dca7b5
commit
c6b6dd95d1
|
@ -120,7 +120,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
|
|||
return null;
|
||||
}
|
||||
|
||||
// @Override
|
||||
// @Override
|
||||
public org.apache.activemq.artemis.api.core.Message inbound(Object message, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
|
||||
|
||||
final Message messageSend = (Message) message;
|
||||
|
@ -205,7 +205,9 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
|
|||
|
||||
final ProducerId producerId = messageSend.getProducerId();
|
||||
if (producerId != null) {
|
||||
putMsgProducerId(producerId, marshaller, coreMessage);
|
||||
final ByteSequence producerIdBytes = marshaller.marshal(producerId);
|
||||
producerIdBytes.compact();
|
||||
coreMessage.putBytesProperty(AMQ_MSG_PRODUCER_ID, producerIdBytes.data);
|
||||
}
|
||||
final ByteSequence propBytes = messageSend.getMarshalledProperties();
|
||||
if (propBytes != null) {
|
||||
|
@ -437,14 +439,6 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
|
|||
coreMessage.putBytesProperty(AMQ_MSG_DATASTRUCTURE, dsBytes.data);
|
||||
}
|
||||
|
||||
private static void putMsgProducerId(final ProducerId producerId,
|
||||
final WireFormat marshaller,
|
||||
final CoreMessage coreMessage) throws IOException {
|
||||
final ByteSequence producerIdBytes = marshaller.marshal(producerId);
|
||||
producerIdBytes.compact();
|
||||
coreMessage.putBytesProperty(AMQ_MSG_PRODUCER_ID, producerIdBytes.data);
|
||||
}
|
||||
|
||||
private static void putMsgMarshalledProperties(final ByteSequence propBytes,
|
||||
final Message messageSend,
|
||||
final CoreMessage coreMessage) throws IOException {
|
||||
|
@ -512,9 +506,9 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
|
|||
}
|
||||
|
||||
public MessageDispatch createMessageDispatch(MessageReference reference,
|
||||
ICoreMessage message,
|
||||
AMQConsumer consumer) throws IOException, JMSException {
|
||||
ActiveMQMessage amqMessage = toAMQMessage(reference, message, consumer);
|
||||
ICoreMessage message,
|
||||
AMQConsumer consumer) throws IOException, JMSException {
|
||||
ActiveMQMessage amqMessage = toAMQMessage(reference, message, marshaller, consumer);
|
||||
|
||||
//we can use core message id for sequenceId
|
||||
amqMessage.getMessageId().setBrokerSequenceId(message.getMessageID());
|
||||
|
@ -529,35 +523,48 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
|
|||
return md;
|
||||
}
|
||||
|
||||
private ActiveMQMessage toAMQMessage(MessageReference reference,
|
||||
private static ActiveMQMessage toAMQMessage(MessageReference reference,
|
||||
ICoreMessage coreMessage,
|
||||
WireFormat marshaller,
|
||||
AMQConsumer consumer) throws IOException {
|
||||
ActiveMQMessage amqMsg = null;
|
||||
byte coreType = coreMessage.getType();
|
||||
final ActiveMQMessage amqMsg;
|
||||
final byte coreType = coreMessage.getType();
|
||||
final Boolean compressProp = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED);
|
||||
final boolean isCompressed = compressProp == null ? false : compressProp.booleanValue();
|
||||
final byte[] bytes;
|
||||
final ActiveMQBuffer buffer = coreMessage.getReadOnlyBodyBuffer();
|
||||
buffer.resetReaderIndex();
|
||||
|
||||
switch (coreType) {
|
||||
case org.apache.activemq.artemis.api.core.Message.BYTES_TYPE:
|
||||
amqMsg = new ActiveMQBytesMessage();
|
||||
bytes = toAMQMessageBytesType(buffer, isCompressed);
|
||||
break;
|
||||
case org.apache.activemq.artemis.api.core.Message.MAP_TYPE:
|
||||
amqMsg = new ActiveMQMapMessage();
|
||||
bytes = toAMQMessageMapType(buffer, isCompressed);
|
||||
break;
|
||||
case org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE:
|
||||
amqMsg = new ActiveMQObjectMessage();
|
||||
bytes = toAMQMessageObjectType(buffer, isCompressed);
|
||||
break;
|
||||
case org.apache.activemq.artemis.api.core.Message.STREAM_TYPE:
|
||||
amqMsg = new ActiveMQStreamMessage();
|
||||
bytes = toAMQMessageStreamType(buffer, isCompressed);
|
||||
break;
|
||||
case org.apache.activemq.artemis.api.core.Message.TEXT_TYPE:
|
||||
amqMsg = new ActiveMQTextMessage();
|
||||
bytes = toAMQMessageTextType(buffer, isCompressed);
|
||||
break;
|
||||
case org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE:
|
||||
amqMsg = new ActiveMQMessage();
|
||||
bytes = toAMQMessageDefaultType(buffer, isCompressed);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalStateException("Unknown message type: " + coreMessage.getType());
|
||||
}
|
||||
|
||||
String type = coreMessage.getStringProperty(JMS_TYPE_PROPERTY);
|
||||
final String type = coreMessage.getStringProperty(JMS_TYPE_PROPERTY);
|
||||
if (type != null) {
|
||||
amqMsg.setJMSType(type);
|
||||
}
|
||||
|
@ -572,165 +579,8 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
|
|||
}
|
||||
amqMsg.setBrokerInTime(brokerInTime);
|
||||
|
||||
ActiveMQBuffer buffer = coreMessage.getReadOnlyBodyBuffer();
|
||||
Boolean compressProp = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED);
|
||||
boolean isCompressed = compressProp == null ? false : compressProp.booleanValue();
|
||||
amqMsg.setCompressed(isCompressed);
|
||||
|
||||
byte[] bytes = null;
|
||||
if (buffer != null) {
|
||||
buffer.resetReaderIndex();
|
||||
|
||||
if (coreType == org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) {
|
||||
SimpleString text = buffer.readNullableSimpleString();
|
||||
if (text != null) {
|
||||
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(text.length() + 4);
|
||||
OutputStream out = bytesOut;
|
||||
if (isCompressed) {
|
||||
out = new DeflaterOutputStream(out, true);
|
||||
}
|
||||
try (DataOutputStream dataOut = new DataOutputStream(out)) {
|
||||
MarshallingSupport.writeUTF8(dataOut, text.toString());
|
||||
dataOut.flush();
|
||||
bytes = bytesOut.toByteArray();
|
||||
}
|
||||
}
|
||||
} else if (coreType == org.apache.activemq.artemis.api.core.Message.MAP_TYPE) {
|
||||
TypedProperties mapData = new TypedProperties();
|
||||
//it could be a null map
|
||||
if (buffer.readableBytes() > 0) {
|
||||
mapData.decode(buffer.byteBuf());
|
||||
Map<String, Object> map = mapData.getMap();
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize());
|
||||
OutputStream os = out;
|
||||
if (isCompressed) {
|
||||
os = new DeflaterOutputStream(os, true);
|
||||
}
|
||||
try (DataOutputStream dataOut = new DataOutputStream(os)) {
|
||||
MarshallingSupport.marshalPrimitiveMap(map, dataOut);
|
||||
dataOut.flush();
|
||||
}
|
||||
bytes = out.toByteArray();
|
||||
}
|
||||
|
||||
} else if (coreType == org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE) {
|
||||
if (buffer.readableBytes() > 0) {
|
||||
int len = buffer.readInt();
|
||||
bytes = new byte[len];
|
||||
buffer.readBytes(bytes);
|
||||
if (isCompressed) {
|
||||
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
|
||||
try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
|
||||
out.write(bytes);
|
||||
out.flush();
|
||||
}
|
||||
bytes = bytesOut.toByteArray();
|
||||
}
|
||||
}
|
||||
} else if (coreType == org.apache.activemq.artemis.api.core.Message.STREAM_TYPE) {
|
||||
org.apache.activemq.util.ByteArrayOutputStream bytesOut = new org.apache.activemq.util.ByteArrayOutputStream();
|
||||
OutputStream out = bytesOut;
|
||||
if (isCompressed) {
|
||||
out = new DeflaterOutputStream(bytesOut, true);
|
||||
}
|
||||
try (DataOutputStream dataOut = new DataOutputStream(out)) {
|
||||
|
||||
boolean stop = false;
|
||||
while (!stop && buffer.readable()) {
|
||||
byte primitiveType = buffer.readByte();
|
||||
switch (primitiveType) {
|
||||
case DataConstants.BOOLEAN:
|
||||
MarshallingSupport.marshalBoolean(dataOut, buffer.readBoolean());
|
||||
break;
|
||||
case DataConstants.BYTE:
|
||||
MarshallingSupport.marshalByte(dataOut, buffer.readByte());
|
||||
break;
|
||||
case DataConstants.BYTES:
|
||||
int len = buffer.readInt();
|
||||
byte[] bytesData = new byte[len];
|
||||
buffer.readBytes(bytesData);
|
||||
MarshallingSupport.marshalByteArray(dataOut, bytesData);
|
||||
break;
|
||||
case DataConstants.CHAR:
|
||||
char ch = (char) buffer.readShort();
|
||||
MarshallingSupport.marshalChar(dataOut, ch);
|
||||
break;
|
||||
case DataConstants.DOUBLE:
|
||||
double doubleVal = Double.longBitsToDouble(buffer.readLong());
|
||||
MarshallingSupport.marshalDouble(dataOut, doubleVal);
|
||||
break;
|
||||
case DataConstants.FLOAT:
|
||||
Float floatVal = Float.intBitsToFloat(buffer.readInt());
|
||||
MarshallingSupport.marshalFloat(dataOut, floatVal);
|
||||
break;
|
||||
case DataConstants.INT:
|
||||
MarshallingSupport.marshalInt(dataOut, buffer.readInt());
|
||||
break;
|
||||
case DataConstants.LONG:
|
||||
MarshallingSupport.marshalLong(dataOut, buffer.readLong());
|
||||
break;
|
||||
case DataConstants.SHORT:
|
||||
MarshallingSupport.marshalShort(dataOut, buffer.readShort());
|
||||
break;
|
||||
case DataConstants.STRING:
|
||||
String string = buffer.readNullableString();
|
||||
if (string == null) {
|
||||
MarshallingSupport.marshalNull(dataOut);
|
||||
} else {
|
||||
MarshallingSupport.marshalString(dataOut, string);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
//now we stop
|
||||
stop = true;
|
||||
break;
|
||||
}
|
||||
dataOut.flush();
|
||||
}
|
||||
}
|
||||
bytes = bytesOut.toByteArray();
|
||||
} else if (coreType == org.apache.activemq.artemis.api.core.Message.BYTES_TYPE) {
|
||||
int n = buffer.readableBytes();
|
||||
bytes = new byte[n];
|
||||
buffer.readBytes(bytes);
|
||||
if (isCompressed) {
|
||||
int length = bytes.length;
|
||||
Deflater deflater = new Deflater();
|
||||
try (org.apache.activemq.util.ByteArrayOutputStream compressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
|
||||
compressed.write(new byte[4]);
|
||||
deflater.setInput(bytes);
|
||||
deflater.finish();
|
||||
byte[] bytesBuf = new byte[1024];
|
||||
while (!deflater.finished()) {
|
||||
int count = deflater.deflate(bytesBuf);
|
||||
compressed.write(bytesBuf, 0, count);
|
||||
}
|
||||
compressed.flush();
|
||||
ByteSequence byteSeq = compressed.toByteSequence();
|
||||
ByteSequenceData.writeIntBig(byteSeq, length);
|
||||
bytes = Arrays.copyOfRange(byteSeq.data, 0, byteSeq.length);
|
||||
} finally {
|
||||
deflater.end();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
int n = buffer.readableBytes();
|
||||
bytes = new byte[n];
|
||||
buffer.readBytes(bytes);
|
||||
if (isCompressed) {
|
||||
try (ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
|
||||
DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
|
||||
out.write(bytes);
|
||||
out.flush();
|
||||
bytes = bytesOut.toByteArray();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
buffer.resetReaderIndex();// this is important for topics as the buffer
|
||||
// may be read multiple times
|
||||
}
|
||||
|
||||
//we need check null because messages may come from other clients
|
||||
//and those amq specific attribute may not be set.
|
||||
Long arrival = (Long) coreMessage.getObjectProperty(AMQ_MSG_ARRIVAL);
|
||||
|
@ -740,24 +590,14 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
|
|||
}
|
||||
amqMsg.setArrival(arrival);
|
||||
|
||||
String brokerPath = (String) coreMessage.getObjectProperty(AMQ_MSG_BROKER_PATH);
|
||||
if (brokerPath != null && brokerPath.isEmpty()) {
|
||||
String[] brokers = brokerPath.split(",");
|
||||
BrokerId[] bids = new BrokerId[brokers.length];
|
||||
for (int i = 0; i < bids.length; i++) {
|
||||
bids[i] = new BrokerId(brokers[i]);
|
||||
}
|
||||
amqMsg.setBrokerPath(bids);
|
||||
final String brokerPath = (String) coreMessage.getObjectProperty(AMQ_MSG_BROKER_PATH);
|
||||
if (brokerPath != null && !brokerPath.isEmpty()) {
|
||||
setAMQMsgBrokerPath(amqMsg, brokerPath);
|
||||
}
|
||||
|
||||
String clusterPath = (String) coreMessage.getObjectProperty(AMQ_MSG_CLUSTER);
|
||||
if (clusterPath != null && clusterPath.isEmpty()) {
|
||||
String[] cluster = clusterPath.split(",");
|
||||
BrokerId[] bids = new BrokerId[cluster.length];
|
||||
for (int i = 0; i < bids.length; i++) {
|
||||
bids[i] = new BrokerId(cluster[i]);
|
||||
}
|
||||
amqMsg.setCluster(bids);
|
||||
final String clusterPath = (String) coreMessage.getObjectProperty(AMQ_MSG_CLUSTER);
|
||||
if (clusterPath != null && !clusterPath.isEmpty()) {
|
||||
setAMQMsgClusterPath(amqMsg, clusterPath);
|
||||
}
|
||||
|
||||
Integer commandId = (Integer) coreMessage.getObjectProperty(AMQ_MSG_COMMAND_ID);
|
||||
|
@ -766,21 +606,19 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
|
|||
}
|
||||
amqMsg.setCommandId(commandId);
|
||||
|
||||
SimpleString corrId = (SimpleString) coreMessage.getObjectProperty(JMS_CORRELATION_ID_PROPERTY);
|
||||
final SimpleString corrId = (SimpleString) coreMessage.getObjectProperty(JMS_CORRELATION_ID_PROPERTY);
|
||||
if (corrId != null) {
|
||||
amqMsg.setCorrelationId(corrId.toString());
|
||||
}
|
||||
|
||||
byte[] dsBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_DATASTRUCTURE);
|
||||
final byte[] dsBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_DATASTRUCTURE);
|
||||
if (dsBytes != null) {
|
||||
ByteSequence seq = new ByteSequence(dsBytes);
|
||||
DataStructure ds = (DataStructure) marshaller.unmarshal(seq);
|
||||
amqMsg.setDataStructure(ds);
|
||||
setAMQMsgDataStructure(amqMsg, marshaller, dsBytes);
|
||||
}
|
||||
final ActiveMQDestination actualDestination = consumer.getOpenwireDestination();
|
||||
amqMsg.setDestination(OpenWireUtil.toAMQAddress(coreMessage, actualDestination));
|
||||
|
||||
Object value = coreMessage.getGroupID();
|
||||
final Object value = coreMessage.getGroupID();
|
||||
if (value != null) {
|
||||
String groupId = value.toString();
|
||||
amqMsg.setGroupID(groupId);
|
||||
|
@ -792,8 +630,8 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
|
|||
}
|
||||
amqMsg.setGroupSequence(groupSequence);
|
||||
|
||||
MessageId mid = null;
|
||||
byte[] midBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_MESSAGE_ID);
|
||||
final MessageId mid;
|
||||
final byte[] midBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_MESSAGE_ID);
|
||||
if (midBytes != null) {
|
||||
ByteSequence midSeq = new ByteSequence(midBytes);
|
||||
mid = (MessageId) marshaller.unmarshal(midSeq);
|
||||
|
@ -803,92 +641,59 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
|
|||
|
||||
amqMsg.setMessageId(mid);
|
||||
|
||||
byte[] origDestBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_ORIG_DESTINATION);
|
||||
final byte[] origDestBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_ORIG_DESTINATION);
|
||||
if (origDestBytes != null) {
|
||||
ActiveMQDestination origDest = (ActiveMQDestination) marshaller.unmarshal(new ByteSequence(origDestBytes));
|
||||
amqMsg.setOriginalDestination(origDest);
|
||||
setAMQMsgOriginalDestination(amqMsg, marshaller, origDestBytes);
|
||||
}
|
||||
|
||||
byte[] origTxIdBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_ORIG_TXID);
|
||||
final byte[] origTxIdBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_ORIG_TXID);
|
||||
if (origTxIdBytes != null) {
|
||||
TransactionId origTxId = (TransactionId) marshaller.unmarshal(new ByteSequence(origTxIdBytes));
|
||||
amqMsg.setOriginalTransactionId(origTxId);
|
||||
setAMQMsgOriginalTransactionId(amqMsg, marshaller, origTxIdBytes);
|
||||
}
|
||||
|
||||
byte[] producerIdBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_PRODUCER_ID);
|
||||
final byte[] producerIdBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_PRODUCER_ID);
|
||||
if (producerIdBytes != null) {
|
||||
ProducerId producerId = (ProducerId) marshaller.unmarshal(new ByteSequence(producerIdBytes));
|
||||
amqMsg.setProducerId(producerId);
|
||||
}
|
||||
|
||||
byte[] marshalledBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_MARSHALL_PROP);
|
||||
final byte[] marshalledBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_MARSHALL_PROP);
|
||||
if (marshalledBytes != null) {
|
||||
amqMsg.setMarshalledProperties(new ByteSequence(marshalledBytes));
|
||||
}
|
||||
|
||||
amqMsg.setRedeliveryCounter(reference.getDeliveryCount() - 1);
|
||||
|
||||
byte[] replyToBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_REPLY_TO);
|
||||
final byte[] replyToBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_REPLY_TO);
|
||||
if (replyToBytes != null) {
|
||||
ActiveMQDestination replyTo = (ActiveMQDestination) marshaller.unmarshal(new ByteSequence(replyToBytes));
|
||||
amqMsg.setReplyTo(replyTo);
|
||||
setAMQMsgReplyTo(amqMsg, marshaller, replyToBytes);
|
||||
}
|
||||
|
||||
String userId = (String) coreMessage.getObjectProperty(AMQ_MSG_USER_ID);
|
||||
final String userId = (String) coreMessage.getObjectProperty(AMQ_MSG_USER_ID);
|
||||
if (userId != null) {
|
||||
amqMsg.setUserID(userId);
|
||||
}
|
||||
|
||||
Boolean isDroppable = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_DROPPABLE);
|
||||
final Boolean isDroppable = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_DROPPABLE);
|
||||
if (isDroppable != null) {
|
||||
amqMsg.setDroppable(isDroppable);
|
||||
}
|
||||
|
||||
SimpleString dlqCause = (SimpleString) coreMessage.getObjectProperty(AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
|
||||
final SimpleString dlqCause = (SimpleString) coreMessage.getObjectProperty(AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
|
||||
if (dlqCause != null) {
|
||||
try {
|
||||
amqMsg.setStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, dlqCause.toString());
|
||||
} catch (JMSException e) {
|
||||
throw new IOException("failure to set dlq property " + dlqCause, e);
|
||||
}
|
||||
setAMQMsgDlqDeliveryFailureCause(amqMsg, dlqCause);
|
||||
}
|
||||
|
||||
SimpleString lastValueProperty = coreMessage.getLastValueProperty();
|
||||
final SimpleString lastValueProperty = coreMessage.getLastValueProperty();
|
||||
if (lastValueProperty != null) {
|
||||
try {
|
||||
amqMsg.setStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_LAST_VALUE_NAME.toString(), lastValueProperty.toString());
|
||||
} catch (JMSException e) {
|
||||
throw new IOException("failure to set lvq property " + dlqCause, e);
|
||||
}
|
||||
setAMQMsgHdrLastValueName(amqMsg, lastValueProperty);
|
||||
}
|
||||
|
||||
Set<SimpleString> props = coreMessage.getPropertyNames();
|
||||
final Set<SimpleString> props = coreMessage.getPropertyNames();
|
||||
if (props != null) {
|
||||
for (SimpleString s : props) {
|
||||
String keyStr = s.toString();
|
||||
if ((keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_")) &&
|
||||
!consumer.hasNotificationDestination()) {
|
||||
continue;
|
||||
}
|
||||
Object prop = coreMessage.getObjectProperty(s);
|
||||
try {
|
||||
if (prop instanceof SimpleString) {
|
||||
amqMsg.setObjectProperty(s.toString(), prop.toString());
|
||||
} else {
|
||||
if (keyStr.equals(MessageUtil.JMSXDELIVERYCOUNT) && prop instanceof Long) {
|
||||
Long l = (Long) prop;
|
||||
amqMsg.setObjectProperty(s.toString(), l.intValue());
|
||||
} else {
|
||||
amqMsg.setObjectProperty(s.toString(), prop);
|
||||
}
|
||||
}
|
||||
} catch (JMSException e) {
|
||||
throw new IOException("exception setting property " + s + " : " + prop, e);
|
||||
}
|
||||
}
|
||||
setAMQMsgObjectProperties(amqMsg, coreMessage, props, consumer);
|
||||
}
|
||||
|
||||
amqMsg.setCompressed(isCompressed);
|
||||
if (bytes != null) {
|
||||
ByteSequence content = new ByteSequence(bytes);
|
||||
amqMsg.setContent(content);
|
||||
|
@ -896,4 +701,269 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
|
|||
return amqMsg;
|
||||
}
|
||||
|
||||
private static byte[] toAMQMessageTextType(final ActiveMQBuffer buffer,
|
||||
final boolean isCompressed) throws IOException {
|
||||
byte[] bytes = null;
|
||||
SimpleString text = buffer.readNullableSimpleString();
|
||||
if (text != null) {
|
||||
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(text.length() + 4);
|
||||
OutputStream out = bytesOut;
|
||||
if (isCompressed) {
|
||||
out = new DeflaterOutputStream(out, true);
|
||||
}
|
||||
try (DataOutputStream dataOut = new DataOutputStream(out)) {
|
||||
MarshallingSupport.writeUTF8(dataOut, text.toString());
|
||||
dataOut.flush();
|
||||
bytes = bytesOut.toByteArray();
|
||||
}
|
||||
}
|
||||
return bytes;
|
||||
}
|
||||
|
||||
private static byte[] toAMQMessageMapType(final ActiveMQBuffer buffer,
|
||||
final boolean isCompressed) throws IOException {
|
||||
byte[] bytes = null;
|
||||
//it could be a null map
|
||||
if (buffer.readableBytes() > 0) {
|
||||
TypedProperties mapData = new TypedProperties();
|
||||
mapData.decode(buffer.byteBuf());
|
||||
Map<String, Object> map = mapData.getMap();
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize());
|
||||
OutputStream os = out;
|
||||
if (isCompressed) {
|
||||
os = new DeflaterOutputStream(os, true);
|
||||
}
|
||||
try (DataOutputStream dataOut = new DataOutputStream(os)) {
|
||||
MarshallingSupport.marshalPrimitiveMap(map, dataOut);
|
||||
dataOut.flush();
|
||||
}
|
||||
bytes = out.toByteArray();
|
||||
}
|
||||
return bytes;
|
||||
}
|
||||
|
||||
private static byte[] toAMQMessageObjectType(final ActiveMQBuffer buffer,
|
||||
final boolean isCompressed) throws IOException {
|
||||
byte[] bytes = null;
|
||||
if (buffer.readableBytes() > 0) {
|
||||
int len = buffer.readInt();
|
||||
bytes = new byte[len];
|
||||
buffer.readBytes(bytes);
|
||||
if (isCompressed) {
|
||||
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
|
||||
try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
|
||||
out.write(bytes);
|
||||
out.flush();
|
||||
}
|
||||
bytes = bytesOut.toByteArray();
|
||||
}
|
||||
}
|
||||
return bytes;
|
||||
}
|
||||
|
||||
private static byte[] toAMQMessageStreamType(final ActiveMQBuffer buffer,
|
||||
final boolean isCompressed) throws IOException {
|
||||
byte[] bytes;
|
||||
org.apache.activemq.util.ByteArrayOutputStream bytesOut = new org.apache.activemq.util.ByteArrayOutputStream();
|
||||
OutputStream out = bytesOut;
|
||||
if (isCompressed) {
|
||||
out = new DeflaterOutputStream(bytesOut, true);
|
||||
}
|
||||
try (DataOutputStream dataOut = new DataOutputStream(out)) {
|
||||
|
||||
boolean stop = false;
|
||||
while (!stop && buffer.readable()) {
|
||||
byte primitiveType = buffer.readByte();
|
||||
switch (primitiveType) {
|
||||
case DataConstants.BOOLEAN:
|
||||
MarshallingSupport.marshalBoolean(dataOut, buffer.readBoolean());
|
||||
break;
|
||||
case DataConstants.BYTE:
|
||||
MarshallingSupport.marshalByte(dataOut, buffer.readByte());
|
||||
break;
|
||||
case DataConstants.BYTES:
|
||||
int len = buffer.readInt();
|
||||
byte[] bytesData = new byte[len];
|
||||
buffer.readBytes(bytesData);
|
||||
MarshallingSupport.marshalByteArray(dataOut, bytesData);
|
||||
break;
|
||||
case DataConstants.CHAR:
|
||||
char ch = (char) buffer.readShort();
|
||||
MarshallingSupport.marshalChar(dataOut, ch);
|
||||
break;
|
||||
case DataConstants.DOUBLE:
|
||||
double doubleVal = Double.longBitsToDouble(buffer.readLong());
|
||||
MarshallingSupport.marshalDouble(dataOut, doubleVal);
|
||||
break;
|
||||
case DataConstants.FLOAT:
|
||||
Float floatVal = Float.intBitsToFloat(buffer.readInt());
|
||||
MarshallingSupport.marshalFloat(dataOut, floatVal);
|
||||
break;
|
||||
case DataConstants.INT:
|
||||
MarshallingSupport.marshalInt(dataOut, buffer.readInt());
|
||||
break;
|
||||
case DataConstants.LONG:
|
||||
MarshallingSupport.marshalLong(dataOut, buffer.readLong());
|
||||
break;
|
||||
case DataConstants.SHORT:
|
||||
MarshallingSupport.marshalShort(dataOut, buffer.readShort());
|
||||
break;
|
||||
case DataConstants.STRING:
|
||||
String string = buffer.readNullableString();
|
||||
if (string == null) {
|
||||
MarshallingSupport.marshalNull(dataOut);
|
||||
} else {
|
||||
MarshallingSupport.marshalString(dataOut, string);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
//now we stop
|
||||
stop = true;
|
||||
break;
|
||||
}
|
||||
dataOut.flush();
|
||||
}
|
||||
}
|
||||
bytes = bytesOut.toByteArray();
|
||||
return bytes;
|
||||
}
|
||||
|
||||
private static byte[] toAMQMessageBytesType(final ActiveMQBuffer buffer,
|
||||
final boolean isCompressed) throws IOException {
|
||||
int n = buffer.readableBytes();
|
||||
byte[] bytes = new byte[n];
|
||||
buffer.readBytes(bytes);
|
||||
if (isCompressed) {
|
||||
bytes = toAMQMessageCompressedBytesType(bytes);
|
||||
}
|
||||
return bytes;
|
||||
}
|
||||
|
||||
private static byte[] toAMQMessageCompressedBytesType(final byte[] bytes) throws IOException {
|
||||
int length = bytes.length;
|
||||
Deflater deflater = new Deflater();
|
||||
try (org.apache.activemq.util.ByteArrayOutputStream compressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
|
||||
compressed.write(new byte[4]);
|
||||
deflater.setInput(bytes);
|
||||
deflater.finish();
|
||||
byte[] bytesBuf = new byte[1024];
|
||||
while (!deflater.finished()) {
|
||||
int count = deflater.deflate(bytesBuf);
|
||||
compressed.write(bytesBuf, 0, count);
|
||||
}
|
||||
compressed.flush();
|
||||
ByteSequence byteSeq = compressed.toByteSequence();
|
||||
ByteSequenceData.writeIntBig(byteSeq, length);
|
||||
return Arrays.copyOfRange(byteSeq.data, 0, byteSeq.length);
|
||||
} finally {
|
||||
deflater.end();
|
||||
}
|
||||
}
|
||||
|
||||
private static byte[] toAMQMessageDefaultType(final ActiveMQBuffer buffer,
|
||||
final boolean isCompressed) throws IOException {
|
||||
int n = buffer.readableBytes();
|
||||
byte[] bytes = new byte[n];
|
||||
buffer.readBytes(bytes);
|
||||
if (isCompressed) {
|
||||
try (ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
|
||||
out.write(bytes);
|
||||
out.flush();
|
||||
bytes = bytesOut.toByteArray();
|
||||
}
|
||||
}
|
||||
return bytes;
|
||||
}
|
||||
|
||||
private static void setAMQMsgBrokerPath(final ActiveMQMessage amqMsg, final String brokerPath) {
|
||||
String[] brokers = brokerPath.split(",");
|
||||
BrokerId[] bids = new BrokerId[brokers.length];
|
||||
for (int i = 0; i < bids.length; i++) {
|
||||
bids[i] = new BrokerId(brokers[i]);
|
||||
}
|
||||
amqMsg.setBrokerPath(bids);
|
||||
}
|
||||
|
||||
private static void setAMQMsgClusterPath(final ActiveMQMessage amqMsg, final String clusterPath) {
|
||||
String[] cluster = clusterPath.split(",");
|
||||
BrokerId[] bids = new BrokerId[cluster.length];
|
||||
for (int i = 0; i < bids.length; i++) {
|
||||
bids[i] = new BrokerId(cluster[i]);
|
||||
}
|
||||
amqMsg.setCluster(bids);
|
||||
}
|
||||
|
||||
private static void setAMQMsgDataStructure(final ActiveMQMessage amqMsg,
|
||||
final WireFormat marshaller,
|
||||
final byte[] dsBytes) throws IOException {
|
||||
ByteSequence seq = new ByteSequence(dsBytes);
|
||||
DataStructure ds = (DataStructure) marshaller.unmarshal(seq);
|
||||
amqMsg.setDataStructure(ds);
|
||||
}
|
||||
|
||||
private static void setAMQMsgOriginalDestination(final ActiveMQMessage amqMsg,
|
||||
final WireFormat marshaller,
|
||||
final byte[] origDestBytes) throws IOException {
|
||||
ActiveMQDestination origDest = (ActiveMQDestination) marshaller.unmarshal(new ByteSequence(origDestBytes));
|
||||
amqMsg.setOriginalDestination(origDest);
|
||||
}
|
||||
|
||||
private static void setAMQMsgOriginalTransactionId(final ActiveMQMessage amqMsg,
|
||||
final WireFormat marshaller,
|
||||
final byte[] origTxIdBytes) throws IOException {
|
||||
TransactionId origTxId = (TransactionId) marshaller.unmarshal(new ByteSequence(origTxIdBytes));
|
||||
amqMsg.setOriginalTransactionId(origTxId);
|
||||
}
|
||||
|
||||
private static void setAMQMsgReplyTo(final ActiveMQMessage amqMsg,
|
||||
final WireFormat marshaller,
|
||||
final byte[] replyToBytes) throws IOException {
|
||||
ActiveMQDestination replyTo = (ActiveMQDestination) marshaller.unmarshal(new ByteSequence(replyToBytes));
|
||||
amqMsg.setReplyTo(replyTo);
|
||||
}
|
||||
|
||||
private static void setAMQMsgDlqDeliveryFailureCause(final ActiveMQMessage amqMsg,
|
||||
final SimpleString dlqCause) throws IOException {
|
||||
try {
|
||||
amqMsg.setStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, dlqCause.toString());
|
||||
} catch (JMSException e) {
|
||||
throw new IOException("failure to set dlq property " + dlqCause, e);
|
||||
}
|
||||
}
|
||||
|
||||
private static void setAMQMsgHdrLastValueName(final ActiveMQMessage amqMsg,
|
||||
final SimpleString lastValueProperty) throws IOException {
|
||||
try {
|
||||
amqMsg.setStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_LAST_VALUE_NAME.toString(), lastValueProperty.toString());
|
||||
} catch (JMSException e) {
|
||||
throw new IOException("failure to set lvq property " + lastValueProperty, e);
|
||||
}
|
||||
}
|
||||
|
||||
private static void setAMQMsgObjectProperties(final ActiveMQMessage amqMsg,
|
||||
final ICoreMessage coreMessage,
|
||||
final Set<SimpleString> props,
|
||||
final AMQConsumer consumer) throws IOException {
|
||||
for (SimpleString s : props) {
|
||||
final String keyStr = s.toString();
|
||||
if (!consumer.hasNotificationDestination() && (keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_"))) {
|
||||
continue;
|
||||
}
|
||||
final Object prop = coreMessage.getObjectProperty(s);
|
||||
try {
|
||||
if (prop instanceof SimpleString) {
|
||||
amqMsg.setObjectProperty(keyStr, prop.toString());
|
||||
} else {
|
||||
if (keyStr.equals(MessageUtil.JMSXDELIVERYCOUNT) && prop instanceof Long) {
|
||||
Long l = (Long) prop;
|
||||
amqMsg.setObjectProperty(keyStr, l.intValue());
|
||||
} else {
|
||||
amqMsg.setObjectProperty(keyStr, prop);
|
||||
}
|
||||
}
|
||||
} catch (JMSException e) {
|
||||
throw new IOException("exception setting property " + s + " : " + prop, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue