ARTEMIS-1616 OpenWire improvements

Cached Notification Destination check on AMQConsumer to avoid expensive ActiveMQDestination::toString
This commit is contained in:
Francesco Nigro 2018-01-17 00:37:24 +01:00 committed by Clebert Suconic
parent 64724c3520
commit 9650c80ba7
2 changed files with 12 additions and 7 deletions

View File

@ -97,8 +97,6 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
private static final SimpleString AMQ_MSG_DROPPABLE = new SimpleString(AMQ_PREFIX + "DROPPABLE");
private static final SimpleString AMQ_MSG_COMPRESSED = new SimpleString(AMQ_PREFIX + "COMPRESSED");
private static final String AMQ_NOTIFICATIONS_DESTINATION = "activemq.notifications";
private final WireFormat marshaller;
public OpenWireMessageConverter(WireFormat marshaller) {
@ -516,7 +514,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
public MessageDispatch createMessageDispatch(MessageReference reference,
ICoreMessage message,
AMQConsumer consumer) throws IOException, JMSException {
ActiveMQMessage amqMessage = toAMQMessage(reference, message, consumer.getOpenwireDestination());
ActiveMQMessage amqMessage = toAMQMessage(reference, message, consumer);
//we can use core message id for sequenceId
amqMessage.getMessageId().setBrokerSequenceId(message.getMessageID());
@ -533,7 +531,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
private ActiveMQMessage toAMQMessage(MessageReference reference,
ICoreMessage coreMessage,
ActiveMQDestination actualDestination) throws IOException {
AMQConsumer consumer) throws IOException {
ActiveMQMessage amqMsg = null;
byte coreType = coreMessage.getType();
switch (coreType) {
@ -779,7 +777,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
DataStructure ds = (DataStructure) marshaller.unmarshal(seq);
amqMsg.setDataStructure(ds);
}
final ActiveMQDestination actualDestination = consumer.getOpenwireDestination();
amqMsg.setDestination(OpenWireUtil.toAMQAddress(coreMessage, actualDestination));
Object value = coreMessage.getGroupID();
@ -869,7 +867,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
for (SimpleString s : props) {
String keyStr = s.toString();
if ((keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_")) &&
!(actualDestination.toString().contains(AMQ_NOTIFICATIONS_DESTINATION))) {
!consumer.hasNotificationDestination()) {
continue;
}
Object prop = coreMessage.getObjectProperty(s);

View File

@ -53,8 +53,10 @@ import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.RemoveInfo;
public class AMQConsumer {
private static final String AMQ_NOTIFICATIONS_DESTINATION = "activemq.notifications";
private AMQSession session;
private org.apache.activemq.command.ActiveMQDestination openwireDestination;
private final org.apache.activemq.command.ActiveMQDestination openwireDestination;
private final boolean hasNotificationDestination;
private ConsumerInfo info;
private final ScheduledExecutorService scheduledPool;
private ServerConsumer serverConsumer;
@ -74,6 +76,7 @@ public class AMQConsumer {
boolean internalAddress) {
this.session = amqSession;
this.openwireDestination = d;
this.hasNotificationDestination = d.toString().contains(AMQ_NOTIFICATIONS_DESTINATION);
this.info = info;
this.scheduledPool = scheduledPool;
this.prefetchSize = info.getPrefetchSize();
@ -331,6 +334,10 @@ public class AMQConsumer {
serverConsumer.close(false);
}
public boolean hasNotificationDestination() {
return hasNotificationDestination;
}
public org.apache.activemq.command.ActiveMQDestination getOpenwireDestination() {
return openwireDestination;
}