ARTEMIS-1616 OpenWire improvements
Avoided copy of CoreMessage when not needed and cached lambda on hot path
This commit is contained in:
parent
04a9884d30
commit
2db4eafc4d
|
@ -85,6 +85,8 @@ public class AMQSession implements SessionCallback {
|
|||
|
||||
private final OpenWireProtocolManager protocolManager;
|
||||
|
||||
private final Runnable enableAutoReadAndTtl;
|
||||
|
||||
private final CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
|
||||
|
||||
public AMQSession(ConnectionInfo connInfo,
|
||||
|
@ -102,6 +104,7 @@ public class AMQSession implements SessionCallback {
|
|||
OpenWireFormat marshaller = (OpenWireFormat) connection.getMarshaller();
|
||||
|
||||
this.converter = new OpenWireMessageConverter(marshaller.copy());
|
||||
this.enableAutoReadAndTtl = this::enableAutoReadAndTtl;
|
||||
}
|
||||
|
||||
public boolean isClosed() {
|
||||
|
@ -325,7 +328,7 @@ public class AMQSession implements SessionCallback {
|
|||
boolean sendProducerAck) throws Exception {
|
||||
messageSend.setBrokerInTime(System.currentTimeMillis());
|
||||
|
||||
ActiveMQDestination destination = messageSend.getDestination();
|
||||
final ActiveMQDestination destination = messageSend.getDestination();
|
||||
|
||||
ActiveMQDestination[] actualDestinations = null;
|
||||
if (destination.isComposite()) {
|
||||
|
@ -335,7 +338,7 @@ public class AMQSession implements SessionCallback {
|
|||
actualDestinations = new ActiveMQDestination[]{destination};
|
||||
}
|
||||
|
||||
org.apache.activemq.artemis.api.core.Message originalCoreMsg = getConverter().inbound(messageSend, coreMessageObjectPools);
|
||||
final org.apache.activemq.artemis.api.core.Message originalCoreMsg = getConverter().inbound(messageSend, coreMessageObjectPools);
|
||||
|
||||
originalCoreMsg.putStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString(), this.connection.getState().getInfo().getClientId());
|
||||
|
||||
|
@ -356,14 +359,15 @@ public class AMQSession implements SessionCallback {
|
|||
connection.getContext().setDontSendReponse(true);
|
||||
}
|
||||
|
||||
for (int i = 0; i < actualDestinations.length; i++) {
|
||||
ActiveMQDestination dest = actualDestinations[i];
|
||||
SimpleString address = SimpleString.toSimpleString(dest.getPhysicalName(), coreMessageObjectPools.getAddressStringSimpleStringPool());
|
||||
org.apache.activemq.artemis.api.core.Message coreMsg = originalCoreMsg.copy();
|
||||
for (int i = 0, actualDestinationsCount = actualDestinations.length; i < actualDestinationsCount; i++) {
|
||||
final ActiveMQDestination dest = actualDestinations[i];
|
||||
final SimpleString address = SimpleString.toSimpleString(dest.getPhysicalName(), coreMessageObjectPools.getAddressStringSimpleStringPool());
|
||||
//the last coreMsg could be directly the original one -> it avoid 1 copy if actualDestinations > 1 and ANY copy if actualDestinations == 1
|
||||
final org.apache.activemq.artemis.api.core.Message coreMsg = (i == actualDestinationsCount - 1) ? originalCoreMsg : originalCoreMsg.copy();
|
||||
coreMsg.setAddress(address);
|
||||
|
||||
if (actualDestinations[i].isQueue()) {
|
||||
checkAutoCreateQueue(SimpleString.toSimpleString(actualDestinations[i].getPhysicalName(), coreMessageObjectPools.getAddressStringSimpleStringPool()), actualDestinations[i].isTemporary());
|
||||
if (dest.isQueue()) {
|
||||
checkAutoCreateQueue(address, dest.isTemporary());
|
||||
coreMsg.setRoutingType(RoutingType.ANYCAST);
|
||||
} else {
|
||||
coreMsg.setRoutingType(RoutingType.MULTICAST);
|
||||
|
@ -424,12 +428,8 @@ public class AMQSession implements SessionCallback {
|
|||
//non-persistent messages goes here, by default we stop reading from
|
||||
//transport
|
||||
connection.getTransportConnection().setAutoRead(false);
|
||||
if (!store.checkMemory(() -> {
|
||||
connection.getTransportConnection().setAutoRead(true);
|
||||
connection.enableTtl();
|
||||
})) {
|
||||
connection.getTransportConnection().setAutoRead(true);
|
||||
connection.enableTtl();
|
||||
if (!store.checkMemory(enableAutoReadAndTtl)) {
|
||||
enableAutoReadAndTtl();
|
||||
throw new ResourceAllocationException("Queue is full " + address);
|
||||
}
|
||||
|
||||
|
@ -448,6 +448,11 @@ public class AMQSession implements SessionCallback {
|
|||
}
|
||||
}
|
||||
|
||||
private void enableAutoReadAndTtl() {
|
||||
connection.getTransportConnection().setAutoRead(true);
|
||||
connection.enableTtl();
|
||||
}
|
||||
|
||||
public String convertWildcard(String physicalName) {
|
||||
return OPENWIRE_WILDCARD.convert(physicalName, server.getConfiguration().getWildcardConfiguration());
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue