ARTEMIS-1616 OpenWire improvements

Used SimpleString on OpenWireMessageConverter to avoid translations on CoreMessage
This commit is contained in:
Francesco Nigro 2018-01-16 22:24:08 +01:00 committed by Clebert Suconic
parent 17c0a331ac
commit 64724c3520
2 changed files with 170 additions and 169 deletions

View File

@ -71,29 +71,31 @@ import org.fusesource.hawtbuf.UTF8Buffer;
public class OpenWireMessageConverter implements MessageConverter<OpenwireMessage> { public class OpenWireMessageConverter implements MessageConverter<OpenwireMessage> {
public static final String AMQ_PREFIX = "__HDR_"; private static final SimpleString JMS_TYPE_PROPERTY = new SimpleString("JMSType");
public static final String AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY = AMQ_PREFIX + "dlqDeliveryFailureCause"; private static final SimpleString JMS_CORRELATION_ID_PROPERTY = new SimpleString("JMSCorrelationID");
private static final SimpleString AMQ_PREFIX = new SimpleString("__HDR_");
public static final SimpleString AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY = new SimpleString(AMQ_PREFIX + "dlqDeliveryFailureCause");
private static final String AMQ_MSG_ARRIVAL = AMQ_PREFIX + "ARRIVAL"; private static final SimpleString AMQ_MSG_ARRIVAL = new SimpleString(AMQ_PREFIX + "ARRIVAL");
private static final String AMQ_MSG_BROKER_IN_TIME = AMQ_PREFIX + "BROKER_IN_TIME"; private static final SimpleString AMQ_MSG_BROKER_IN_TIME = new SimpleString(AMQ_PREFIX + "BROKER_IN_TIME");
private static final String AMQ_MSG_BROKER_PATH = AMQ_PREFIX + "BROKER_PATH"; private static final SimpleString AMQ_MSG_BROKER_PATH = new SimpleString(AMQ_PREFIX + "BROKER_PATH");
private static final String AMQ_MSG_CLUSTER = AMQ_PREFIX + "CLUSTER"; private static final SimpleString AMQ_MSG_CLUSTER = new SimpleString(AMQ_PREFIX + "CLUSTER");
private static final String AMQ_MSG_COMMAND_ID = AMQ_PREFIX + "COMMAND_ID"; private static final SimpleString AMQ_MSG_COMMAND_ID = new SimpleString(AMQ_PREFIX + "COMMAND_ID");
private static final String AMQ_MSG_DATASTRUCTURE = AMQ_PREFIX + "DATASTRUCTURE"; private static final SimpleString AMQ_MSG_DATASTRUCTURE = new SimpleString(AMQ_PREFIX + "DATASTRUCTURE");
private static final String AMQ_MSG_GROUP_ID = org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID.toString(); private static final SimpleString AMQ_MSG_GROUP_ID = org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID;
private static final String AMQ_MSG_GROUP_SEQUENCE = AMQ_PREFIX + "GROUP_SEQUENCE"; private static final SimpleString AMQ_MSG_GROUP_SEQUENCE = new SimpleString(AMQ_PREFIX + "GROUP_SEQUENCE");
private static final String AMQ_MSG_MESSAGE_ID = AMQ_PREFIX + "MESSAGE_ID"; private static final SimpleString AMQ_MSG_MESSAGE_ID = new SimpleString(AMQ_PREFIX + "MESSAGE_ID");
private static final String AMQ_MSG_ORIG_DESTINATION = AMQ_PREFIX + "ORIG_DESTINATION"; private static final SimpleString AMQ_MSG_ORIG_DESTINATION = new SimpleString(AMQ_PREFIX + "ORIG_DESTINATION");
private static final String AMQ_MSG_ORIG_TXID = AMQ_PREFIX + "ORIG_TXID"; private static final SimpleString AMQ_MSG_ORIG_TXID = new SimpleString(AMQ_PREFIX + "ORIG_TXID");
private static final String AMQ_MSG_PRODUCER_ID = AMQ_PREFIX + "PRODUCER_ID"; private static final SimpleString AMQ_MSG_PRODUCER_ID = new SimpleString(AMQ_PREFIX + "PRODUCER_ID");
private static final String AMQ_MSG_MARSHALL_PROP = AMQ_PREFIX + "MARSHALL_PROP"; private static final SimpleString AMQ_MSG_MARSHALL_PROP = new SimpleString(AMQ_PREFIX + "MARSHALL_PROP");
private static final String AMQ_MSG_REPLY_TO = AMQ_PREFIX + "REPLY_TO"; private static final SimpleString AMQ_MSG_REPLY_TO = new SimpleString(AMQ_PREFIX + "REPLY_TO");
private static final String AMQ_MSG_USER_ID = AMQ_PREFIX + "USER_ID"; private static final SimpleString AMQ_MSG_USER_ID = new SimpleString(AMQ_PREFIX + "USER_ID");
private static final String AMQ_MSG_DROPPABLE = AMQ_PREFIX + "DROPPABLE"; private static final SimpleString AMQ_MSG_DROPPABLE = new SimpleString(AMQ_PREFIX + "DROPPABLE");
private static final String AMQ_MSG_COMPRESSED = AMQ_PREFIX + "COMPRESSED"; private static final SimpleString AMQ_MSG_COMPRESSED = new SimpleString(AMQ_PREFIX + "COMPRESSED");
private static final String AMQ_NOTIFICATIONS_DESTINATION = "activemq.notifications"; private static final String AMQ_NOTIFICATIONS_DESTINATION = "activemq.notifications";
@ -128,7 +130,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
final String type = messageSend.getType(); final String type = messageSend.getType();
if (type != null) { if (type != null) {
coreMessage.putStringProperty(new SimpleString("JMSType"), new SimpleString(type)); coreMessage.putStringProperty(JMS_TYPE_PROPERTY, new SimpleString(type));
} }
coreMessage.setDurable(messageSend.isPersistent()); coreMessage.setDurable(messageSend.isPersistent());
coreMessage.setExpiration(messageSend.getExpiration()); coreMessage.setExpiration(messageSend.getExpiration());
@ -185,7 +187,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
coreMessage.putIntProperty(AMQ_MSG_COMMAND_ID, messageSend.getCommandId()); coreMessage.putIntProperty(AMQ_MSG_COMMAND_ID, messageSend.getCommandId());
final String corrId = messageSend.getCorrelationId(); final String corrId = messageSend.getCorrelationId();
if (corrId != null) { if (corrId != null) {
coreMessage.putStringProperty("JMSCorrelationID", corrId); coreMessage.putStringProperty(JMS_CORRELATION_ID_PROPERTY, new SimpleString(corrId));
} }
final DataStructure ds = messageSend.getDataStructure(); final DataStructure ds = messageSend.getDataStructure();
if (ds != null) { if (ds != null) {
@ -193,7 +195,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
} }
final String groupId = messageSend.getGroupID(); final String groupId = messageSend.getGroupID();
if (groupId != null) { if (groupId != null) {
coreMessage.putStringProperty(AMQ_MSG_GROUP_ID, groupId); coreMessage.putStringProperty(AMQ_MSG_GROUP_ID, new SimpleString(groupId));
} }
coreMessage.putIntProperty(AMQ_MSG_GROUP_SEQUENCE, messageSend.getGroupSequence()); coreMessage.putIntProperty(AMQ_MSG_GROUP_SEQUENCE, messageSend.getGroupSequence());
@ -219,7 +221,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
final String userId = messageSend.getUserID(); final String userId = messageSend.getUserID();
if (userId != null) { if (userId != null) {
coreMessage.putStringProperty(AMQ_MSG_USER_ID, userId); coreMessage.putStringProperty(AMQ_MSG_USER_ID, new SimpleString(userId));
} }
coreMessage.putBooleanProperty(AMQ_MSG_DROPPABLE, messageSend.isDroppable()); coreMessage.putBooleanProperty(AMQ_MSG_DROPPABLE, messageSend.isDroppable());
@ -415,7 +417,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
builder.append(','); //is this separator safe? builder.append(','); //is this separator safe?
} }
} }
coreMessage.putStringProperty(AMQ_MSG_BROKER_PATH, builder.toString()); coreMessage.putStringProperty(AMQ_MSG_BROKER_PATH, new SimpleString(builder.toString()));
} }
private static void putMsgCluster(final BrokerId[] cluster, final CoreMessage coreMessage) { private static void putMsgCluster(final BrokerId[] cluster, final CoreMessage coreMessage) {
@ -426,7 +428,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
builder.append(','); //is this separator safe? builder.append(','); //is this separator safe?
} }
} }
coreMessage.putStringProperty(AMQ_MSG_CLUSTER, builder.toString()); coreMessage.putStringProperty(AMQ_MSG_CLUSTER, new SimpleString(builder.toString()));
} }
private static void putMsgDataStructure(final DataStructure ds, private static void putMsgDataStructure(final DataStructure ds,
@ -557,7 +559,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
throw new IllegalStateException("Unknown message type: " + coreMessage.getType()); throw new IllegalStateException("Unknown message type: " + coreMessage.getType());
} }
String type = coreMessage.getStringProperty(new SimpleString("JMSType")); String type = coreMessage.getStringProperty(JMS_TYPE_PROPERTY);
if (type != null) { if (type != null) {
amqMsg.setJMSType(type); amqMsg.setJMSType(type);
} }
@ -580,156 +582,155 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
byte[] bytes = null; byte[] bytes = null;
if (buffer != null) { if (buffer != null) {
buffer.resetReaderIndex(); buffer.resetReaderIndex();
synchronized (buffer) {
if (coreType == org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) {
SimpleString text = buffer.readNullableSimpleString();
if (text != null) {
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(text.length() + 4);
OutputStream out = bytesOut;
if (isCompressed) {
out = new DeflaterOutputStream(out, true);
}
try (DataOutputStream dataOut = new DataOutputStream(out)) {
MarshallingSupport.writeUTF8(dataOut, text.toString());
dataOut.flush();
bytes = bytesOut.toByteArray();
}
}
} else if (coreType == org.apache.activemq.artemis.api.core.Message.MAP_TYPE) {
TypedProperties mapData = new TypedProperties();
//it could be a null map
if (buffer.readableBytes() > 0) {
mapData.decode(buffer.byteBuf());
Map<String, Object> map = mapData.getMap();
ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize());
OutputStream os = out;
if (isCompressed) {
os = new DeflaterOutputStream(os, true);
}
try (DataOutputStream dataOut = new DataOutputStream(os)) {
MarshallingSupport.marshalPrimitiveMap(map, dataOut);
dataOut.flush();
}
bytes = out.toByteArray();
}
} else if (coreType == org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE) { if (coreType == org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) {
if (buffer.readableBytes() > 0) { SimpleString text = buffer.readNullableSimpleString();
int len = buffer.readInt(); if (text != null) {
bytes = new byte[len]; ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(text.length() + 4);
buffer.readBytes(bytes);
if (isCompressed) {
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
out.write(bytes);
out.flush();
}
bytes = bytesOut.toByteArray();
}
}
} else if (coreType == org.apache.activemq.artemis.api.core.Message.STREAM_TYPE) {
org.apache.activemq.util.ByteArrayOutputStream bytesOut = new org.apache.activemq.util.ByteArrayOutputStream();
OutputStream out = bytesOut; OutputStream out = bytesOut;
if (isCompressed) { if (isCompressed) {
out = new DeflaterOutputStream(bytesOut, true); out = new DeflaterOutputStream(out, true);
} }
try (DataOutputStream dataOut = new DataOutputStream(out)) { try (DataOutputStream dataOut = new DataOutputStream(out)) {
MarshallingSupport.writeUTF8(dataOut, text.toString());
boolean stop = false; dataOut.flush();
while (!stop && buffer.readable()) { bytes = bytesOut.toByteArray();
byte primitiveType = buffer.readByte();
switch (primitiveType) {
case DataConstants.BOOLEAN:
MarshallingSupport.marshalBoolean(dataOut, buffer.readBoolean());
break;
case DataConstants.BYTE:
MarshallingSupport.marshalByte(dataOut, buffer.readByte());
break;
case DataConstants.BYTES:
int len = buffer.readInt();
byte[] bytesData = new byte[len];
buffer.readBytes(bytesData);
MarshallingSupport.marshalByteArray(dataOut, bytesData);
break;
case DataConstants.CHAR:
char ch = (char) buffer.readShort();
MarshallingSupport.marshalChar(dataOut, ch);
break;
case DataConstants.DOUBLE:
double doubleVal = Double.longBitsToDouble(buffer.readLong());
MarshallingSupport.marshalDouble(dataOut, doubleVal);
break;
case DataConstants.FLOAT:
Float floatVal = Float.intBitsToFloat(buffer.readInt());
MarshallingSupport.marshalFloat(dataOut, floatVal);
break;
case DataConstants.INT:
MarshallingSupport.marshalInt(dataOut, buffer.readInt());
break;
case DataConstants.LONG:
MarshallingSupport.marshalLong(dataOut, buffer.readLong());
break;
case DataConstants.SHORT:
MarshallingSupport.marshalShort(dataOut, buffer.readShort());
break;
case DataConstants.STRING:
String string = buffer.readNullableString();
if (string == null) {
MarshallingSupport.marshalNull(dataOut);
} else {
MarshallingSupport.marshalString(dataOut, string);
}
break;
default:
//now we stop
stop = true;
break;
}
dataOut.flush();
}
}
bytes = bytesOut.toByteArray();
} else if (coreType == org.apache.activemq.artemis.api.core.Message.BYTES_TYPE) {
int n = buffer.readableBytes();
bytes = new byte[n];
buffer.readBytes(bytes);
if (isCompressed) {
int length = bytes.length;
Deflater deflater = new Deflater();
try (org.apache.activemq.util.ByteArrayOutputStream compressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
compressed.write(new byte[4]);
deflater.setInput(bytes);
deflater.finish();
byte[] bytesBuf = new byte[1024];
while (!deflater.finished()) {
int count = deflater.deflate(bytesBuf);
compressed.write(bytesBuf, 0, count);
}
compressed.flush();
ByteSequence byteSeq = compressed.toByteSequence();
ByteSequenceData.writeIntBig(byteSeq, length);
bytes = Arrays.copyOfRange(byteSeq.data, 0, byteSeq.length);
} finally {
deflater.end();
}
}
} else {
int n = buffer.readableBytes();
bytes = new byte[n];
buffer.readBytes(bytes);
if (isCompressed) {
try (ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
out.write(bytes);
out.flush();
bytes = bytesOut.toByteArray();
}
} }
} }
} else if (coreType == org.apache.activemq.artemis.api.core.Message.MAP_TYPE) {
TypedProperties mapData = new TypedProperties();
//it could be a null map
if (buffer.readableBytes() > 0) {
mapData.decode(buffer.byteBuf());
Map<String, Object> map = mapData.getMap();
ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize());
OutputStream os = out;
if (isCompressed) {
os = new DeflaterOutputStream(os, true);
}
try (DataOutputStream dataOut = new DataOutputStream(os)) {
MarshallingSupport.marshalPrimitiveMap(map, dataOut);
dataOut.flush();
}
bytes = out.toByteArray();
}
buffer.resetReaderIndex();// this is important for topics as the buffer } else if (coreType == org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE) {
// may be read multiple times if (buffer.readableBytes() > 0) {
int len = buffer.readInt();
bytes = new byte[len];
buffer.readBytes(bytes);
if (isCompressed) {
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
out.write(bytes);
out.flush();
}
bytes = bytesOut.toByteArray();
}
}
} else if (coreType == org.apache.activemq.artemis.api.core.Message.STREAM_TYPE) {
org.apache.activemq.util.ByteArrayOutputStream bytesOut = new org.apache.activemq.util.ByteArrayOutputStream();
OutputStream out = bytesOut;
if (isCompressed) {
out = new DeflaterOutputStream(bytesOut, true);
}
try (DataOutputStream dataOut = new DataOutputStream(out)) {
boolean stop = false;
while (!stop && buffer.readable()) {
byte primitiveType = buffer.readByte();
switch (primitiveType) {
case DataConstants.BOOLEAN:
MarshallingSupport.marshalBoolean(dataOut, buffer.readBoolean());
break;
case DataConstants.BYTE:
MarshallingSupport.marshalByte(dataOut, buffer.readByte());
break;
case DataConstants.BYTES:
int len = buffer.readInt();
byte[] bytesData = new byte[len];
buffer.readBytes(bytesData);
MarshallingSupport.marshalByteArray(dataOut, bytesData);
break;
case DataConstants.CHAR:
char ch = (char) buffer.readShort();
MarshallingSupport.marshalChar(dataOut, ch);
break;
case DataConstants.DOUBLE:
double doubleVal = Double.longBitsToDouble(buffer.readLong());
MarshallingSupport.marshalDouble(dataOut, doubleVal);
break;
case DataConstants.FLOAT:
Float floatVal = Float.intBitsToFloat(buffer.readInt());
MarshallingSupport.marshalFloat(dataOut, floatVal);
break;
case DataConstants.INT:
MarshallingSupport.marshalInt(dataOut, buffer.readInt());
break;
case DataConstants.LONG:
MarshallingSupport.marshalLong(dataOut, buffer.readLong());
break;
case DataConstants.SHORT:
MarshallingSupport.marshalShort(dataOut, buffer.readShort());
break;
case DataConstants.STRING:
String string = buffer.readNullableString();
if (string == null) {
MarshallingSupport.marshalNull(dataOut);
} else {
MarshallingSupport.marshalString(dataOut, string);
}
break;
default:
//now we stop
stop = true;
break;
}
dataOut.flush();
}
}
bytes = bytesOut.toByteArray();
} else if (coreType == org.apache.activemq.artemis.api.core.Message.BYTES_TYPE) {
int n = buffer.readableBytes();
bytes = new byte[n];
buffer.readBytes(bytes);
if (isCompressed) {
int length = bytes.length;
Deflater deflater = new Deflater();
try (org.apache.activemq.util.ByteArrayOutputStream compressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
compressed.write(new byte[4]);
deflater.setInput(bytes);
deflater.finish();
byte[] bytesBuf = new byte[1024];
while (!deflater.finished()) {
int count = deflater.deflate(bytesBuf);
compressed.write(bytesBuf, 0, count);
}
compressed.flush();
ByteSequence byteSeq = compressed.toByteSequence();
ByteSequenceData.writeIntBig(byteSeq, length);
bytes = Arrays.copyOfRange(byteSeq.data, 0, byteSeq.length);
} finally {
deflater.end();
}
}
} else {
int n = buffer.readableBytes();
bytes = new byte[n];
buffer.readBytes(bytes);
if (isCompressed) {
try (ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
out.write(bytes);
out.flush();
bytes = bytesOut.toByteArray();
}
}
} }
buffer.resetReaderIndex();// this is important for topics as the buffer
// may be read multiple times
} }
//we need check null because messages may come from other clients //we need check null because messages may come from other clients
@ -767,7 +768,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
} }
amqMsg.setCommandId(commandId); amqMsg.setCommandId(commandId);
SimpleString corrId = (SimpleString) coreMessage.getObjectProperty("JMSCorrelationID"); SimpleString corrId = (SimpleString) coreMessage.getObjectProperty(JMS_CORRELATION_ID_PROPERTY);
if (corrId != null) { if (corrId != null) {
amqMsg.setCorrelationId(corrId.toString()); amqMsg.setCorrelationId(corrId.toString());
} }

View File

@ -286,7 +286,7 @@ public class AMQConsumer {
for (MessageReference ref : ackList) { for (MessageReference ref : ackList) {
Throwable poisonCause = ack.getPoisonCause(); Throwable poisonCause = ack.getPoisonCause();
if (poisonCause != null) { if (poisonCause != null) {
ref.getMessage().putStringProperty(OpenWireMessageConverter.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, poisonCause.toString()); ref.getMessage().putStringProperty(OpenWireMessageConverter.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, new SimpleString(poisonCause.toString()));
} }
ref.getQueue().sendToDeadLetterAddress(transaction, ref); ref.getQueue().sendToDeadLetterAddress(transaction, ref);
} }