ARTEMIS-1616 OpenWire improvements

Refactored OpenWireMessageConverter::inbound into smaller methods
This commit is contained in:
Francesco Nigro 2018-01-16 17:01:54 +01:00 committed by Clebert Suconic
parent 2db4eafc4d
commit 54d0161850
1 changed files with 280 additions and 194 deletions

View File

@ -123,10 +123,10 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
// @Override // @Override
public org.apache.activemq.artemis.api.core.Message inbound(Object message, CoreMessageObjectPools coreMessageObjectPools) throws Exception { public org.apache.activemq.artemis.api.core.Message inbound(Object message, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
Message messageSend = (Message) message; final Message messageSend = (Message) message;
CoreMessage coreMessage = new CoreMessage(-1, messageSend.getSize(), coreMessageObjectPools); final CoreMessage coreMessage = new CoreMessage(-1, messageSend.getSize(), coreMessageObjectPools);
String type = messageSend.getType(); final String type = messageSend.getType();
if (type != null) { if (type != null) {
coreMessage.putStringProperty(new SimpleString("JMSType"), new SimpleString(type)); coreMessage.putStringProperty(new SimpleString("JMSType"), new SimpleString(type));
} }
@ -135,22 +135,105 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
coreMessage.setPriority(messageSend.getPriority()); coreMessage.setPriority(messageSend.getPriority());
coreMessage.setTimestamp(messageSend.getTimestamp()); coreMessage.setTimestamp(messageSend.getTimestamp());
byte coreType = toCoreType(messageSend.getDataStructureType()); final byte coreType = toCoreType(messageSend.getDataStructureType());
coreMessage.setType(coreType); coreMessage.setType(coreType);
ActiveMQBuffer body = coreMessage.getBodyBuffer(); final ActiveMQBuffer body = coreMessage.getBodyBuffer();
ByteSequence contents = messageSend.getContent(); final ByteSequence contents = messageSend.getContent();
if (contents == null && coreType == org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) { if (contents == null && coreType == org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) {
body.writeNullableString(null); body.writeNullableString(null);
} else if (contents != null) { } else if (contents != null) {
boolean messageCompressed = messageSend.isCompressed(); final boolean messageCompressed = messageSend.isCompressed();
if (messageCompressed) { if (messageCompressed) {
coreMessage.putBooleanProperty(AMQ_MSG_COMPRESSED, messageCompressed); coreMessage.putBooleanProperty(AMQ_MSG_COMPRESSED, messageCompressed);
} }
switch (coreType) { switch (coreType) {
case org.apache.activemq.artemis.api.core.Message.TEXT_TYPE: case org.apache.activemq.artemis.api.core.Message.TEXT_TYPE:
writeTextType(contents, messageCompressed, body);
break;
case org.apache.activemq.artemis.api.core.Message.MAP_TYPE:
writeMapType(contents, messageCompressed, body);
break;
case org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE:
writeObjectType(contents, messageCompressed, body);
break;
case org.apache.activemq.artemis.api.core.Message.STREAM_TYPE:
writeStreamType(contents, messageCompressed, body);
break;
case org.apache.activemq.artemis.api.core.Message.BYTES_TYPE:
writeBytesType(contents, messageCompressed, body);
break;
default:
writeDefaultType(contents, messageCompressed, body);
break;
}
}
//amq specific
coreMessage.putLongProperty(AMQ_MSG_ARRIVAL, messageSend.getArrival());
coreMessage.putLongProperty(AMQ_MSG_BROKER_IN_TIME, messageSend.getBrokerInTime());
final BrokerId[] brokers = messageSend.getBrokerPath();
if (brokers != null) {
putMsgBrokerPath(brokers, coreMessage);
}
final BrokerId[] cluster = messageSend.getCluster();
if (cluster != null) {
putMsgCluster(cluster, coreMessage);
}
coreMessage.putIntProperty(AMQ_MSG_COMMAND_ID, messageSend.getCommandId());
final String corrId = messageSend.getCorrelationId();
if (corrId != null) {
coreMessage.putStringProperty("JMSCorrelationID", corrId);
}
final DataStructure ds = messageSend.getDataStructure();
if (ds != null) {
putMsgDataStructure(ds, marshaller, coreMessage);
}
final String groupId = messageSend.getGroupID();
if (groupId != null) {
coreMessage.putStringProperty(AMQ_MSG_GROUP_ID, groupId);
}
coreMessage.putIntProperty(AMQ_MSG_GROUP_SEQUENCE, messageSend.getGroupSequence());
final MessageId messageId = messageSend.getMessageId();
final ByteSequence midBytes = marshaller.marshal(messageId);
midBytes.compact();
coreMessage.putBytesProperty(AMQ_MSG_MESSAGE_ID, midBytes.data);
final ProducerId producerId = messageSend.getProducerId();
if (producerId != null) {
putMsgProducerId(producerId, marshaller, coreMessage);
}
final ByteSequence propBytes = messageSend.getMarshalledProperties();
if (propBytes != null) {
putMsgMarshalledProperties(propBytes, messageSend, coreMessage);
}
final ActiveMQDestination replyTo = messageSend.getReplyTo();
if (replyTo != null) {
putMsgReplyTo(replyTo, marshaller, coreMessage);
}
final String userId = messageSend.getUserID();
if (userId != null) {
coreMessage.putStringProperty(AMQ_MSG_USER_ID, userId);
}
coreMessage.putBooleanProperty(AMQ_MSG_DROPPABLE, messageSend.isDroppable());
final ActiveMQDestination origDest = messageSend.getOriginalDestination();
if (origDest != null) {
putMsgOriginalDestination(origDest, marshaller, coreMessage);
}
return coreMessage;
}
private static void writeTextType(final ByteSequence contents,
final boolean messageCompressed,
final ActiveMQBuffer body) throws IOException {
InputStream tis = new ByteArrayInputStream(contents); InputStream tis = new ByteArrayInputStream(contents);
if (messageCompressed) { if (messageCompressed) {
tis = new InflaterInputStream(tis); tis = new InflaterInputStream(tis);
@ -159,8 +242,11 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
String text = MarshallingSupport.readUTF8(tdataIn); String text = MarshallingSupport.readUTF8(tdataIn);
tdataIn.close(); tdataIn.close();
body.writeNullableSimpleString(new SimpleString(text)); body.writeNullableSimpleString(new SimpleString(text));
break; }
case org.apache.activemq.artemis.api.core.Message.MAP_TYPE:
private static void writeMapType(final ByteSequence contents,
final boolean messageCompressed,
final ActiveMQBuffer body) throws IOException {
InputStream mis = new ByteArrayInputStream(contents); InputStream mis = new ByteArrayInputStream(contents);
if (messageCompressed) { if (messageCompressed) {
mis = new InflaterInputStream(mis); mis = new InflaterInputStream(mis);
@ -171,9 +257,19 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
TypedProperties props = new TypedProperties(); TypedProperties props = new TypedProperties();
loadMapIntoProperties(props, map); loadMapIntoProperties(props, map);
props.encode(body.byteBuf()); props.encode(body.byteBuf());
break; }
case org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE:
private static void writeObjectType(ByteSequence contents,
final boolean messageCompressed,
final ActiveMQBuffer body) throws IOException {
if (messageCompressed) { if (messageCompressed) {
contents = writeCompressedObjectType(contents);
}
body.writeInt(contents.length);
body.writeBytes(contents.data, contents.offset, contents.length);
}
private static ByteSequence writeCompressedObjectType(final ByteSequence contents) throws IOException {
try (InputStream ois = new InflaterInputStream(new ByteArrayInputStream(contents)); try (InputStream ois = new InflaterInputStream(new ByteArrayInputStream(contents));
org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream()) { org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
byte[] buf = new byte[1024]; byte[] buf = new byte[1024];
@ -183,13 +279,13 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
n = ois.read(); n = ois.read();
} }
//read done //read done
contents = decompressed.toByteSequence(); return decompressed.toByteSequence();
} }
} }
body.writeInt(contents.length);
body.writeBytes(contents.data, contents.offset, contents.length); private static void writeStreamType(final ByteSequence contents,
break; final boolean messageCompressed,
case org.apache.activemq.artemis.api.core.Message.STREAM_TYPE: final ActiveMQBuffer body) throws IOException {
InputStream sis = new ByteArrayInputStream(contents); InputStream sis = new ByteArrayInputStream(contents);
if (messageCompressed) { if (messageCompressed) {
sis = new InflaterInputStream(sis); sis = new InflaterInputStream(sis);
@ -262,9 +358,18 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
stype = sdis.read(); stype = sdis.read();
} }
sdis.close(); sdis.close();
break; }
case org.apache.activemq.artemis.api.core.Message.BYTES_TYPE:
private static void writeBytesType(ByteSequence contents,
final boolean messageCompressed,
final ActiveMQBuffer body) throws IOException {
if (messageCompressed) { if (messageCompressed) {
contents = writeCompressedBytesType(contents);
}
body.writeBytes(contents.data, contents.offset, contents.length);
}
private static ByteSequence writeCompressedBytesType(final ByteSequence contents) throws IOException {
Inflater inflater = new Inflater(); Inflater inflater = new Inflater();
try (org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream()) { try (org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
int length = ByteSequenceData.readIntBig(contents); int length = ByteSequenceData.readIntBig(contents);
@ -275,124 +380,105 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
byte[] buffer = new byte[length]; byte[] buffer = new byte[length];
int count = inflater.inflate(buffer); int count = inflater.inflate(buffer);
decompressed.write(buffer, 0, count); decompressed.write(buffer, 0, count);
contents = decompressed.toByteSequence(); return decompressed.toByteSequence();
} catch (Exception e) { } catch (Exception e) {
throw new IOException(e); throw new IOException(e);
} finally { } finally {
inflater.end(); inflater.end();
} }
} }
body.writeBytes(contents.data, contents.offset, contents.length);
break; private static void writeDefaultType(ByteSequence contents,
default: final boolean messageCompressed,
final ActiveMQBuffer body) throws IOException {
if (messageCompressed) { if (messageCompressed) {
contents = writeCompressedDefaultType(contents);
}
body.writeBytes(contents.data, contents.offset, contents.length);
}
private static ByteSequence writeCompressedDefaultType(final ByteSequence contents) throws IOException {
try (org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream(); try (org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream();
OutputStream os = new InflaterOutputStream(decompressed)) { OutputStream os = new InflaterOutputStream(decompressed)) {
os.write(contents.data, contents.offset, contents.getLength()); os.write(contents.data, contents.offset, contents.getLength());
contents = decompressed.toByteSequence(); return decompressed.toByteSequence();
} catch (Exception e) { } catch (Exception e) {
throw new IOException(e); throw new IOException(e);
} }
} }
body.writeBytes(contents.data, contents.offset, contents.length);
break; private static void putMsgBrokerPath(final BrokerId[] brokers, final CoreMessage coreMessage) {
} final StringBuilder builder = new StringBuilder();
} for (int i = 0, size = brokers.length; i < size; i++) {
//amq specific
coreMessage.putLongProperty(AMQ_MSG_ARRIVAL, messageSend.getArrival());
coreMessage.putLongProperty(AMQ_MSG_BROKER_IN_TIME, messageSend.getBrokerInTime());
BrokerId[] brokers = messageSend.getBrokerPath();
if (brokers != null) {
StringBuilder builder = new StringBuilder();
for (int i = 0; i < brokers.length; i++) {
builder.append(brokers[i].getValue()); builder.append(brokers[i].getValue());
if (i != (brokers.length - 1)) { if (i != (size - 1)) {
builder.append(","); //is this separator safe? builder.append(','); //is this separator safe?
} }
} }
coreMessage.putStringProperty(AMQ_MSG_BROKER_PATH, builder.toString()); coreMessage.putStringProperty(AMQ_MSG_BROKER_PATH, builder.toString());
} }
BrokerId[] cluster = messageSend.getCluster();
if (cluster != null) { private static void putMsgCluster(final BrokerId[] cluster, final CoreMessage coreMessage) {
StringBuilder builder = new StringBuilder(); final StringBuilder builder = new StringBuilder();
for (int i = 0; i < cluster.length; i++) { for (int i = 0, size = cluster.length; i < size; i++) {
builder.append(cluster[i].getValue()); builder.append(cluster[i].getValue());
if (i != (cluster.length - 1)) { if (i != (size - 1)) {
builder.append(","); //is this separator safe? builder.append(','); //is this separator safe?
} }
} }
coreMessage.putStringProperty(AMQ_MSG_CLUSTER, builder.toString()); coreMessage.putStringProperty(AMQ_MSG_CLUSTER, builder.toString());
} }
coreMessage.putIntProperty(AMQ_MSG_COMMAND_ID, messageSend.getCommandId()); private static void putMsgDataStructure(final DataStructure ds,
String corrId = messageSend.getCorrelationId(); final WireFormat marshaller,
if (corrId != null) { final CoreMessage coreMessage) throws IOException {
coreMessage.putStringProperty("JMSCorrelationID", corrId); final ByteSequence dsBytes = marshaller.marshal(ds);
}
DataStructure ds = messageSend.getDataStructure();
if (ds != null) {
ByteSequence dsBytes = marshaller.marshal(ds);
dsBytes.compact(); dsBytes.compact();
coreMessage.putBytesProperty(AMQ_MSG_DATASTRUCTURE, dsBytes.data); coreMessage.putBytesProperty(AMQ_MSG_DATASTRUCTURE, dsBytes.data);
} }
String groupId = messageSend.getGroupID();
if (groupId != null) {
coreMessage.putStringProperty(AMQ_MSG_GROUP_ID, groupId);
}
coreMessage.putIntProperty(AMQ_MSG_GROUP_SEQUENCE, messageSend.getGroupSequence());
MessageId messageId = messageSend.getMessageId(); private static void putMsgProducerId(final ProducerId producerId,
final WireFormat marshaller,
ByteSequence midBytes = marshaller.marshal(messageId); final CoreMessage coreMessage) throws IOException {
midBytes.compact(); final ByteSequence producerIdBytes = marshaller.marshal(producerId);
coreMessage.putBytesProperty(AMQ_MSG_MESSAGE_ID, midBytes.data);
ProducerId producerId = messageSend.getProducerId();
if (producerId != null) {
ByteSequence producerIdBytes = marshaller.marshal(producerId);
producerIdBytes.compact(); producerIdBytes.compact();
coreMessage.putBytesProperty(AMQ_MSG_PRODUCER_ID, producerIdBytes.data); coreMessage.putBytesProperty(AMQ_MSG_PRODUCER_ID, producerIdBytes.data);
} }
ByteSequence propBytes = messageSend.getMarshalledProperties();
if (propBytes != null) { private static void putMsgMarshalledProperties(final ByteSequence propBytes,
final Message messageSend,
final CoreMessage coreMessage) throws IOException {
propBytes.compact(); propBytes.compact();
coreMessage.putBytesProperty(AMQ_MSG_MARSHALL_PROP, propBytes.data); coreMessage.putBytesProperty(AMQ_MSG_MARSHALL_PROP, propBytes.data);
//unmarshall properties to core so selector will work //unmarshall properties to core so selector will work
Map<String, Object> props = messageSend.getProperties(); final Map<String, Object> props = messageSend.getProperties();
//Map<String, Object> props = MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(propBytes))); if (!props.isEmpty()) {
for (Entry<String, Object> ent : props.entrySet()) { props.forEach((key, value) -> {
Object value = ent.getValue();
try { try {
coreMessage.putObjectProperty(ent.getKey(), value); coreMessage.putObjectProperty(key, value);
} catch (ActiveMQPropertyConversionException e) { } catch (ActiveMQPropertyConversionException e) {
coreMessage.putStringProperty(ent.getKey(), value.toString()); coreMessage.putStringProperty(key, value.toString());
} }
});
} }
} }
ActiveMQDestination replyTo = messageSend.getReplyTo(); private static void putMsgReplyTo(final ActiveMQDestination replyTo,
if (replyTo != null) { final WireFormat marshaller,
ByteSequence replyToBytes = marshaller.marshal(replyTo); final CoreMessage coreMessage) throws IOException {
final ByteSequence replyToBytes = marshaller.marshal(replyTo);
replyToBytes.compact(); replyToBytes.compact();
coreMessage.putBytesProperty(AMQ_MSG_REPLY_TO, replyToBytes.data); coreMessage.putBytesProperty(AMQ_MSG_REPLY_TO, replyToBytes.data);
} }
String userId = messageSend.getUserID(); private static void putMsgOriginalDestination(final ActiveMQDestination origDest,
if (userId != null) { final WireFormat marshaller,
coreMessage.putStringProperty(AMQ_MSG_USER_ID, userId); final CoreMessage coreMessage) throws IOException {
} final ByteSequence origDestBytes = marshaller.marshal(origDest);
coreMessage.putBooleanProperty(AMQ_MSG_DROPPABLE, messageSend.isDroppable());
ActiveMQDestination origDest = messageSend.getOriginalDestination();
if (origDest != null) {
ByteSequence origDestBytes = marshaller.marshal(origDest);
origDestBytes.compact(); origDestBytes.compact();
coreMessage.putBytesProperty(AMQ_MSG_ORIG_DESTINATION, origDestBytes.data); coreMessage.putBytesProperty(AMQ_MSG_ORIG_DESTINATION, origDestBytes.data);
} }
return coreMessage;
}
private static void loadMapIntoProperties(TypedProperties props, Map<String, Object> map) { private static void loadMapIntoProperties(TypedProperties props, Map<String, Object> map) {
for (Entry<String, Object> entry : map.entrySet()) { for (Entry<String, Object> entry : map.entrySet()) {
SimpleString key = new SimpleString(entry.getKey()); SimpleString key = new SimpleString(entry.getKey());