ARTEMIS-1045 Performance improvements on AMQP

This commit is contained in:
Clebert Suconic 2017-03-17 11:14:18 -04:00
parent 861c231551
commit 291a4719b6
2 changed files with 39 additions and 42 deletions

View File

@ -72,6 +72,7 @@ public class AMQPMessage extends RefCountMessage {
private DeliveryAnnotations _deliveryAnnotations; private DeliveryAnnotations _deliveryAnnotations;
private MessageAnnotations _messageAnnotations; private MessageAnnotations _messageAnnotations;
private Properties _properties; private Properties _properties;
private int appLocation = -1;
private ApplicationProperties applicationProperties; private ApplicationProperties applicationProperties;
private long scheduledTime = -1; private long scheduledTime = -1;
private String connectionID; private String connectionID;
@ -93,7 +94,7 @@ public class AMQPMessage extends RefCountMessage {
public AMQPMessage(long messageFormat, Message message) { public AMQPMessage(long messageFormat, Message message) {
this.messageFormat = messageFormat; this.messageFormat = messageFormat;
this.protonMessage = (MessageImpl)message; this.protonMessage = (MessageImpl) message;
} }
@ -124,7 +125,7 @@ public class AMQPMessage extends RefCountMessage {
_deliveryAnnotations = new DeliveryAnnotations(new HashMap<>()); _deliveryAnnotations = new DeliveryAnnotations(new HashMap<>());
_properties = new Properties(); _properties = new Properties();
this.applicationProperties = new ApplicationProperties(new HashMap<>()); this.applicationProperties = new ApplicationProperties(new HashMap<>());
this.protonMessage = (MessageImpl)Message.Factory.create(); this.protonMessage = (MessageImpl) Message.Factory.create();
this.protonMessage.setApplicationProperties(applicationProperties); this.protonMessage.setApplicationProperties(applicationProperties);
this.protonMessage.setDeliveryAnnotations(_deliveryAnnotations); this.protonMessage.setDeliveryAnnotations(_deliveryAnnotations);
} }
@ -148,6 +149,20 @@ public class AMQPMessage extends RefCountMessage {
private ApplicationProperties getApplicationProperties() { private ApplicationProperties getApplicationProperties() {
parseHeaders(); parseHeaders();
if (applicationProperties == null && appLocation >= 0) {
ByteBuffer buffer = getBuffer().nioBuffer();
buffer.position(appLocation);
TLSEncode.getDecoder().setByteBuffer(buffer);
Object section = TLSEncode.getDecoder().readObject();
if (section instanceof ApplicationProperties) {
this.applicationProperties = (ApplicationProperties) section;
}
this.appLocation = -1;
TLSEncode.getDecoder().setByteBuffer(null);
}
return applicationProperties; return applicationProperties;
} }
@ -161,6 +176,7 @@ public class AMQPMessage extends RefCountMessage {
parsedHeaders = true; parsedHeaders = true;
} }
} }
@Override @Override
public org.apache.activemq.artemis.api.core.Message setConnectionID(String connectionID) { public org.apache.activemq.artemis.api.core.Message setConnectionID(String connectionID) {
this.connectionID = connectionID; this.connectionID = connectionID;
@ -172,7 +188,6 @@ public class AMQPMessage extends RefCountMessage {
return connectionID; return connectionID;
} }
public MessageAnnotations getMessageAnnotations() { public MessageAnnotations getMessageAnnotations() {
parseHeaders(); parseHeaders();
return _messageAnnotations; return _messageAnnotations;
@ -202,7 +217,6 @@ public class AMQPMessage extends RefCountMessage {
return null; return null;
} }
private void setSymbol(String symbol, Object value) { private void setSymbol(String symbol, Object value) {
setSymbol(Symbol.getSymbol(symbol), value); setSymbol(Symbol.getSymbol(symbol), value);
} }
@ -231,11 +245,9 @@ public class AMQPMessage extends RefCountMessage {
return null; return null;
} */ } */
return null; return null;
} }
@Override @Override
public SimpleString getGroupID() { public SimpleString getGroupID() {
parseHeaders(); parseHeaders();
@ -247,7 +259,6 @@ public class AMQPMessage extends RefCountMessage {
} }
} }
@Override @Override
public Long getScheduledDeliveryTime() { public Long getScheduledDeliveryTime() {
@ -339,15 +350,19 @@ public class AMQPMessage extends RefCountMessage {
this.expiration = _properties.getAbsoluteExpiryTime().getTime(); this.expiration = _properties.getAbsoluteExpiryTime().getTime();
} }
if (buffer.hasRemaining()) { // We don't read the next section on purpose, as we will parse ApplicationProperties
section = (Section) decoder.readObject(); // lazily
} else { section = null;
section = null;
}
} }
if (section instanceof ApplicationProperties) { if (section instanceof ApplicationProperties) {
applicationProperties = (ApplicationProperties) section; applicationProperties = (ApplicationProperties) section;
} else {
if (buffer.hasRemaining()) {
this.appLocation = buffer.position();
} else {
this.appLocation = -1;
}
} }
} finally { } finally {
decoder.setByteBuffer(null); decoder.setByteBuffer(null);
@ -446,13 +461,11 @@ public class AMQPMessage extends RefCountMessage {
} }
} }
@Override @Override
public Object getDuplicateProperty() { public Object getDuplicateProperty() {
return null; return null;
} }
@Override @Override
public org.apache.activemq.artemis.api.core.Message setDurable(boolean durable) { public org.apache.activemq.artemis.api.core.Message setDurable(boolean durable) {
return null; return null;
@ -463,7 +476,7 @@ public class AMQPMessage extends RefCountMessage {
if (address == null) { if (address == null) {
Properties properties = getProtonMessage().getProperties(); Properties properties = getProtonMessage().getProperties();
if (properties != null) { if (properties != null) {
return properties.getTo(); return properties.getTo();
} else { } else {
return null; return null;
} }
@ -539,7 +552,7 @@ public class AMQPMessage extends RefCountMessage {
header.setDeliveryCount(UnsignedInteger.valueOf(deliveryCount - 1)); header.setDeliveryCount(UnsignedInteger.valueOf(deliveryCount - 1));
TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buffer)); TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buffer));
TLSEncode.getEncoder().writeObject(header); TLSEncode.getEncoder().writeObject(header);
TLSEncode.getEncoder().setByteBuffer((WritableBuffer)null); TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
} }
} }
buffer.writeBytes(data, sendFrom, data.writerIndex() - sendFrom); buffer.writeBytes(data, sendFrom, data.writerIndex() - sendFrom);
@ -676,27 +689,27 @@ public class AMQPMessage extends RefCountMessage {
@Override @Override
public Boolean getBooleanProperty(String key) throws ActiveMQPropertyConversionException { public Boolean getBooleanProperty(String key) throws ActiveMQPropertyConversionException {
return (Boolean)getApplicationPropertiesMap().get(key); return (Boolean) getApplicationPropertiesMap().get(key);
} }
@Override @Override
public Byte getByteProperty(String key) throws ActiveMQPropertyConversionException { public Byte getByteProperty(String key) throws ActiveMQPropertyConversionException {
return (Byte)getApplicationPropertiesMap().get(key); return (Byte) getApplicationPropertiesMap().get(key);
} }
@Override @Override
public Double getDoubleProperty(String key) throws ActiveMQPropertyConversionException { public Double getDoubleProperty(String key) throws ActiveMQPropertyConversionException {
return (Double)getApplicationPropertiesMap().get(key); return (Double) getApplicationPropertiesMap().get(key);
} }
@Override @Override
public Integer getIntProperty(String key) throws ActiveMQPropertyConversionException { public Integer getIntProperty(String key) throws ActiveMQPropertyConversionException {
return (Integer)getApplicationPropertiesMap().get(key); return (Integer) getApplicationPropertiesMap().get(key);
} }
@Override @Override
public Long getLongProperty(String key) throws ActiveMQPropertyConversionException { public Long getLongProperty(String key) throws ActiveMQPropertyConversionException {
return (Long)getApplicationPropertiesMap().get(key); return (Long) getApplicationPropertiesMap().get(key);
} }
@Override @Override
@ -712,12 +725,12 @@ public class AMQPMessage extends RefCountMessage {
@Override @Override
public Short getShortProperty(String key) throws ActiveMQPropertyConversionException { public Short getShortProperty(String key) throws ActiveMQPropertyConversionException {
return (Short)getApplicationPropertiesMap().get(key); return (Short) getApplicationPropertiesMap().get(key);
} }
@Override @Override
public Float getFloatProperty(String key) throws ActiveMQPropertyConversionException { public Float getFloatProperty(String key) throws ActiveMQPropertyConversionException {
return (Float)getApplicationPropertiesMap().get(key); return (Float) getApplicationPropertiesMap().get(key);
} }
@Override @Override
@ -727,7 +740,7 @@ public class AMQPMessage extends RefCountMessage {
} else if (key.equals(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString())) { } else if (key.equals(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString())) {
return getConnectionID(); return getConnectionID();
} else { } else {
return (String)getApplicationPropertiesMap().get(key); return (String) getApplicationPropertiesMap().get(key);
} }
} }
@ -747,7 +760,7 @@ public class AMQPMessage extends RefCountMessage {
@Override @Override
public SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException { public SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException {
return SimpleString.toSimpleString((String)getApplicationPropertiesMap().get(key)); return SimpleString.toSimpleString((String) getApplicationPropertiesMap().get(key));
} }
@Override @Override
@ -842,8 +855,7 @@ public class AMQPMessage extends RefCountMessage {
@Override @Override
public int getMemoryEstimate() { public int getMemoryEstimate() {
if (memoryEstimate == -1) { if (memoryEstimate == -1) {
memoryEstimate = memoryOffset + memoryEstimate = memoryOffset + (data != null ? data.capacity() : 0);
(data != null ? data.capacity() : 0);
} }
return memoryEstimate; return memoryEstimate;
@ -858,7 +870,6 @@ public class AMQPMessage extends RefCountMessage {
} }
} }
@Override @Override
public SimpleString getReplyTo() { public SimpleString getReplyTo() {
if (getProperties() != null) { if (getProperties() != null) {
@ -877,7 +888,6 @@ public class AMQPMessage extends RefCountMessage {
return this; return this;
} }
@Override @Override
public int getPersistSize() { public int getPersistSize() {
checkBuffer(); checkBuffer();

View File

@ -42,7 +42,6 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFound
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle; import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionImpl; import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionImpl;
import org.apache.activemq.artemis.protocol.amqp.util.CreditsSemaphore;
import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.selector.filter.FilterException; import org.apache.activemq.artemis.selector.filter.FilterException;
import org.apache.activemq.artemis.selector.impl.SelectorParser; import org.apache.activemq.artemis.selector.impl.SelectorParser;
@ -89,7 +88,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
private boolean multicast; private boolean multicast;
//todo get this from somewhere //todo get this from somewhere
private RoutingType defaultRoutingType = RoutingType.ANYCAST; private RoutingType defaultRoutingType = RoutingType.ANYCAST;
protected CreditsSemaphore creditsSemaphore = new CreditsSemaphore(0);
private RoutingType routingTypeToUse = defaultRoutingType; private RoutingType routingTypeToUse = defaultRoutingType;
private boolean shared = false; private boolean shared = false;
private boolean global = false; private boolean global = false;
@ -110,7 +108,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
@Override @Override
public void onFlow(int currentCredits, boolean drain) { public void onFlow(int currentCredits, boolean drain) {
this.creditsSemaphore.setCredits(currentCredits);
sessionSPI.onFlowConsumer(brokerConsumer, currentCredits, drain); sessionSPI.onFlowConsumer(brokerConsumer, currentCredits, drain);
} }
@ -590,16 +587,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
return 0; return 0;
} }
if (!creditsSemaphore.tryAcquire()) {
try {
creditsSemaphore.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// nothing to be done here.. we just keep going
throw new IllegalStateException(e.getMessage(), e);
}
}
// presettle means we can settle the message on the dealer side before we send it, i.e. // presettle means we can settle the message on the dealer side before we send it, i.e.
// for browsers // for browsers
boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED; boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;