diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index 60aae4cfb0..653ee5f77a 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -72,6 +72,7 @@ public class AMQPMessage extends RefCountMessage { private DeliveryAnnotations _deliveryAnnotations; private MessageAnnotations _messageAnnotations; private Properties _properties; + private int appLocation = -1; private ApplicationProperties applicationProperties; private long scheduledTime = -1; private String connectionID; @@ -93,7 +94,7 @@ public class AMQPMessage extends RefCountMessage { public AMQPMessage(long messageFormat, Message message) { this.messageFormat = messageFormat; - this.protonMessage = (MessageImpl)message; + this.protonMessage = (MessageImpl) message; } @@ -124,7 +125,7 @@ public class AMQPMessage extends RefCountMessage { _deliveryAnnotations = new DeliveryAnnotations(new HashMap<>()); _properties = new Properties(); this.applicationProperties = new ApplicationProperties(new HashMap<>()); - this.protonMessage = (MessageImpl)Message.Factory.create(); + this.protonMessage = (MessageImpl) Message.Factory.create(); this.protonMessage.setApplicationProperties(applicationProperties); this.protonMessage.setDeliveryAnnotations(_deliveryAnnotations); } @@ -148,6 +149,20 @@ public class AMQPMessage extends RefCountMessage { private ApplicationProperties getApplicationProperties() { parseHeaders(); + + if (applicationProperties == null && appLocation >= 0) { + ByteBuffer buffer = getBuffer().nioBuffer(); + buffer.position(appLocation); + TLSEncode.getDecoder().setByteBuffer(buffer); + Object section = TLSEncode.getDecoder().readObject(); + if (section instanceof ApplicationProperties) { + this.applicationProperties = (ApplicationProperties) section; + } + this.appLocation = -1; + TLSEncode.getDecoder().setByteBuffer(null); + + } + return applicationProperties; } @@ -161,6 +176,7 @@ public class AMQPMessage extends RefCountMessage { parsedHeaders = true; } } + @Override public org.apache.activemq.artemis.api.core.Message setConnectionID(String connectionID) { this.connectionID = connectionID; @@ -172,7 +188,6 @@ public class AMQPMessage extends RefCountMessage { return connectionID; } - public MessageAnnotations getMessageAnnotations() { parseHeaders(); return _messageAnnotations; @@ -202,7 +217,6 @@ public class AMQPMessage extends RefCountMessage { return null; } - private void setSymbol(String symbol, Object value) { setSymbol(Symbol.getSymbol(symbol), value); } @@ -231,11 +245,9 @@ public class AMQPMessage extends RefCountMessage { return null; } */ - return null; } - @Override public SimpleString getGroupID() { parseHeaders(); @@ -247,7 +259,6 @@ public class AMQPMessage extends RefCountMessage { } } - @Override public Long getScheduledDeliveryTime() { @@ -339,15 +350,19 @@ public class AMQPMessage extends RefCountMessage { this.expiration = _properties.getAbsoluteExpiryTime().getTime(); } - if (buffer.hasRemaining()) { - section = (Section) decoder.readObject(); - } else { - section = null; - } + // We don't read the next section on purpose, as we will parse ApplicationProperties + // lazily + section = null; } if (section instanceof ApplicationProperties) { applicationProperties = (ApplicationProperties) section; + } else { + if (buffer.hasRemaining()) { + this.appLocation = buffer.position(); + } else { + this.appLocation = -1; + } } } finally { decoder.setByteBuffer(null); @@ -446,13 +461,11 @@ public class AMQPMessage extends RefCountMessage { } } - @Override public Object getDuplicateProperty() { return null; } - @Override public org.apache.activemq.artemis.api.core.Message setDurable(boolean durable) { return null; @@ -463,7 +476,7 @@ public class AMQPMessage extends RefCountMessage { if (address == null) { Properties properties = getProtonMessage().getProperties(); if (properties != null) { - return properties.getTo(); + return properties.getTo(); } else { return null; } @@ -539,7 +552,7 @@ public class AMQPMessage extends RefCountMessage { header.setDeliveryCount(UnsignedInteger.valueOf(deliveryCount - 1)); TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buffer)); TLSEncode.getEncoder().writeObject(header); - TLSEncode.getEncoder().setByteBuffer((WritableBuffer)null); + TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null); } } buffer.writeBytes(data, sendFrom, data.writerIndex() - sendFrom); @@ -676,27 +689,27 @@ public class AMQPMessage extends RefCountMessage { @Override public Boolean getBooleanProperty(String key) throws ActiveMQPropertyConversionException { - return (Boolean)getApplicationPropertiesMap().get(key); + return (Boolean) getApplicationPropertiesMap().get(key); } @Override public Byte getByteProperty(String key) throws ActiveMQPropertyConversionException { - return (Byte)getApplicationPropertiesMap().get(key); + return (Byte) getApplicationPropertiesMap().get(key); } @Override public Double getDoubleProperty(String key) throws ActiveMQPropertyConversionException { - return (Double)getApplicationPropertiesMap().get(key); + return (Double) getApplicationPropertiesMap().get(key); } @Override public Integer getIntProperty(String key) throws ActiveMQPropertyConversionException { - return (Integer)getApplicationPropertiesMap().get(key); + return (Integer) getApplicationPropertiesMap().get(key); } @Override public Long getLongProperty(String key) throws ActiveMQPropertyConversionException { - return (Long)getApplicationPropertiesMap().get(key); + return (Long) getApplicationPropertiesMap().get(key); } @Override @@ -712,12 +725,12 @@ public class AMQPMessage extends RefCountMessage { @Override public Short getShortProperty(String key) throws ActiveMQPropertyConversionException { - return (Short)getApplicationPropertiesMap().get(key); + return (Short) getApplicationPropertiesMap().get(key); } @Override public Float getFloatProperty(String key) throws ActiveMQPropertyConversionException { - return (Float)getApplicationPropertiesMap().get(key); + return (Float) getApplicationPropertiesMap().get(key); } @Override @@ -727,7 +740,7 @@ public class AMQPMessage extends RefCountMessage { } else if (key.equals(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString())) { return getConnectionID(); } else { - return (String)getApplicationPropertiesMap().get(key); + return (String) getApplicationPropertiesMap().get(key); } } @@ -747,7 +760,7 @@ public class AMQPMessage extends RefCountMessage { @Override public SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException { - return SimpleString.toSimpleString((String)getApplicationPropertiesMap().get(key)); + return SimpleString.toSimpleString((String) getApplicationPropertiesMap().get(key)); } @Override @@ -842,8 +855,7 @@ public class AMQPMessage extends RefCountMessage { @Override public int getMemoryEstimate() { if (memoryEstimate == -1) { - memoryEstimate = memoryOffset + - (data != null ? data.capacity() : 0); + memoryEstimate = memoryOffset + (data != null ? data.capacity() : 0); } return memoryEstimate; @@ -858,7 +870,6 @@ public class AMQPMessage extends RefCountMessage { } } - @Override public SimpleString getReplyTo() { if (getProperties() != null) { @@ -877,7 +888,6 @@ public class AMQPMessage extends RefCountMessage { return this; } - @Override public int getPersistSize() { checkBuffer(); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 962110e255..0e0447fbe2 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -42,7 +42,6 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFound import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException; import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle; import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionImpl; -import org.apache.activemq.artemis.protocol.amqp.util.CreditsSemaphore; import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.selector.filter.FilterException; import org.apache.activemq.artemis.selector.impl.SelectorParser; @@ -89,7 +88,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr private boolean multicast; //todo get this from somewhere private RoutingType defaultRoutingType = RoutingType.ANYCAST; - protected CreditsSemaphore creditsSemaphore = new CreditsSemaphore(0); private RoutingType routingTypeToUse = defaultRoutingType; private boolean shared = false; private boolean global = false; @@ -110,7 +108,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr @Override public void onFlow(int currentCredits, boolean drain) { - this.creditsSemaphore.setCredits(currentCredits); sessionSPI.onFlowConsumer(brokerConsumer, currentCredits, drain); } @@ -590,16 +587,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr return 0; } - if (!creditsSemaphore.tryAcquire()) { - try { - creditsSemaphore.acquire(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - // nothing to be done here.. we just keep going - throw new IllegalStateException(e.getMessage(), e); - } - } - // presettle means we can settle the message on the dealer side before we send it, i.e. // for browsers boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;