From 2db4eafc4d81cd18f9394e356ee11b202f5f2301 Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Tue, 16 Jan 2018 14:24:32 +0100 Subject: [PATCH] ARTEMIS-1616 OpenWire improvements Avoided copy of CoreMessage when not needed and cached lambda on hot path --- .../protocol/openwire/amq/AMQSession.java | 33 +++++++++++-------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index d284d6cc65..3ff3ae1b37 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -85,6 +85,8 @@ public class AMQSession implements SessionCallback { private final OpenWireProtocolManager protocolManager; + private final Runnable enableAutoReadAndTtl; + private final CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools(); public AMQSession(ConnectionInfo connInfo, @@ -102,6 +104,7 @@ public class AMQSession implements SessionCallback { OpenWireFormat marshaller = (OpenWireFormat) connection.getMarshaller(); this.converter = new OpenWireMessageConverter(marshaller.copy()); + this.enableAutoReadAndTtl = this::enableAutoReadAndTtl; } public boolean isClosed() { @@ -325,7 +328,7 @@ public class AMQSession implements SessionCallback { boolean sendProducerAck) throws Exception { messageSend.setBrokerInTime(System.currentTimeMillis()); - ActiveMQDestination destination = messageSend.getDestination(); + final ActiveMQDestination destination = messageSend.getDestination(); ActiveMQDestination[] actualDestinations = null; if (destination.isComposite()) { @@ -335,7 +338,7 @@ public class AMQSession implements SessionCallback { actualDestinations = new ActiveMQDestination[]{destination}; } - org.apache.activemq.artemis.api.core.Message originalCoreMsg = getConverter().inbound(messageSend, coreMessageObjectPools); + final org.apache.activemq.artemis.api.core.Message originalCoreMsg = getConverter().inbound(messageSend, coreMessageObjectPools); originalCoreMsg.putStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString(), this.connection.getState().getInfo().getClientId()); @@ -356,14 +359,15 @@ public class AMQSession implements SessionCallback { connection.getContext().setDontSendReponse(true); } - for (int i = 0; i < actualDestinations.length; i++) { - ActiveMQDestination dest = actualDestinations[i]; - SimpleString address = SimpleString.toSimpleString(dest.getPhysicalName(), coreMessageObjectPools.getAddressStringSimpleStringPool()); - org.apache.activemq.artemis.api.core.Message coreMsg = originalCoreMsg.copy(); + for (int i = 0, actualDestinationsCount = actualDestinations.length; i < actualDestinationsCount; i++) { + final ActiveMQDestination dest = actualDestinations[i]; + final SimpleString address = SimpleString.toSimpleString(dest.getPhysicalName(), coreMessageObjectPools.getAddressStringSimpleStringPool()); + //the last coreMsg could be directly the original one -> it avoid 1 copy if actualDestinations > 1 and ANY copy if actualDestinations == 1 + final org.apache.activemq.artemis.api.core.Message coreMsg = (i == actualDestinationsCount - 1) ? originalCoreMsg : originalCoreMsg.copy(); coreMsg.setAddress(address); - if (actualDestinations[i].isQueue()) { - checkAutoCreateQueue(SimpleString.toSimpleString(actualDestinations[i].getPhysicalName(), coreMessageObjectPools.getAddressStringSimpleStringPool()), actualDestinations[i].isTemporary()); + if (dest.isQueue()) { + checkAutoCreateQueue(address, dest.isTemporary()); coreMsg.setRoutingType(RoutingType.ANYCAST); } else { coreMsg.setRoutingType(RoutingType.MULTICAST); @@ -424,12 +428,8 @@ public class AMQSession implements SessionCallback { //non-persistent messages goes here, by default we stop reading from //transport connection.getTransportConnection().setAutoRead(false); - if (!store.checkMemory(() -> { - connection.getTransportConnection().setAutoRead(true); - connection.enableTtl(); - })) { - connection.getTransportConnection().setAutoRead(true); - connection.enableTtl(); + if (!store.checkMemory(enableAutoReadAndTtl)) { + enableAutoReadAndTtl(); throw new ResourceAllocationException("Queue is full " + address); } @@ -448,6 +448,11 @@ public class AMQSession implements SessionCallback { } } + private void enableAutoReadAndTtl() { + connection.getTransportConnection().setAutoRead(true); + connection.enableTtl(); + } + public String convertWildcard(String physicalName) { return OPENWIRE_WILDCARD.convert(physicalName, server.getConfiguration().getWildcardConfiguration()); }