From a41a1930ef73c961221dbd213b547e2e38694782 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Thu, 9 Mar 2017 12:08:35 -0500 Subject: [PATCH] ARTEMIS-1009 AMQP shouldn't use application properties As part of my refactoring on AMQP, the broker shouldn't rely on Application properties for any broker semantic changes on delivery. I am removing any access to those now, so we can properly deal with this post 2.0.0. --- .../activemq/artemis/api/core/Message.java | 23 ++++--------------- .../core/message/impl/CoreMessage.java | 23 ++++++++++++------- .../artemis/jms/client/ActiveMQMessage.java | 15 ++---------- .../protocol/amqp/broker/AMQPMessage.java | 18 +++++++-------- .../proton/ProtonServerSenderContext.java | 2 +- .../protocol/openwire/OpenwireMessage.java | 5 ---- .../paging/cursor/PagedReferenceImpl.java | 6 +---- .../core/postoffice/impl/PostOfficeImpl.java | 3 ++- .../core/server/impl/LastValueQueue.java | 17 +++++++------- .../server/impl/PostOfficeJournalLoader.java | 6 ++--- .../impl/ScheduledDeliveryHandlerTest.java | 5 ---- .../integration/client/AcknowledgeTest.java | 5 ---- 12 files changed, 45 insertions(+), 83 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java index 5fd077833f..ec0a2db5c1 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java @@ -164,25 +164,15 @@ public interface Message { byte STREAM_TYPE = 6; - - default SimpleString getDeliveryAnnotationPropertyString(SimpleString property) { - Object obj = getDeliveryAnnotationProperty(property); - if (obj == null) { - return null; - } else if (obj instanceof SimpleString) { - return (SimpleString)obj; - } else { - return SimpleString.toSimpleString(obj.toString()); - } - } - default void cleanupInternalProperties() { // only on core } RoutingType getRouteType(); - boolean containsDeliveryAnnotationProperty(SimpleString property); + default SimpleString getLastValueProperty() { + return null; + } /** * @deprecated do not use this, use through ICoreMessage or ClientMessage @@ -417,15 +407,10 @@ public interface Message { } } - /** - * it will translate a property named HDR_DUPLICATE_DETECTION_ID. - * @return - */ default Object getDuplicateProperty() { - return getDeliveryAnnotationProperty(Message.HDR_DUPLICATE_DETECTION_ID); + return null; } - Message putBooleanProperty(String key, boolean value); Message putByteProperty(String key, byte value); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java index 8d1538455d..ce1ea96a2f 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java @@ -132,12 +132,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { } } - @Override - public boolean containsDeliveryAnnotationProperty(SimpleString property) { - checkProperties(); - return properties.containsProperty(property); - } - @Override public Persister getPersister() { return CoreMessagePersister.getInstance(); @@ -225,13 +219,17 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { return ((Number) property).longValue(); } - return null; + return 0L; } @Override public CoreMessage setScheduledDeliveryTime(Long time) { checkProperties(); - putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, time); + if (time == null || time == 0) { + removeProperty(Message.HDR_SCHEDULED_DELIVERY_TIME); + } else { + putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, time); + } return this; } @@ -568,6 +566,15 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { /* PropertySize and Properties */checkProperties().getEncodeSize(); } + @Override + public Object getDuplicateProperty() { + return getObjectProperty(Message.HDR_DUPLICATE_DETECTION_ID); + } + + @Override + public SimpleString getLastValueProperty() { + return getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME); + } @Override public int getEncodeSize() { diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java index 80a07ac52a..286bc57538 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java @@ -726,23 +726,12 @@ public class ActiveMQMessage implements javax.jms.Message { @Override public long getJMSDeliveryTime() throws JMSException { - Long value; - try { - value = message.getLongProperty(org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME); - } catch (Exception e) { - return 0; - } - - if (value == null) { - return 0; - } else { - return value.longValue(); - } + return message.getScheduledDeliveryTime(); } @Override public void setJMSDeliveryTime(long deliveryTime) throws JMSException { - message.putLongProperty(org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME, deliveryTime); + message.setScheduledDeliveryTime(deliveryTime); } @Override diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index 24175d2832..a158799dce 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -262,7 +262,7 @@ public class AMQPMessage extends RefCountMessage { } } - return scheduledTime == 0 ? null : scheduledTime; + return scheduledTime; } @Override @@ -442,6 +442,13 @@ public class AMQPMessage extends RefCountMessage { } } + + @Override + public Object getDuplicateProperty() { + return null; + } + + @Override public org.apache.activemq.artemis.api.core.Message setDurable(boolean durable) { return null; @@ -720,15 +727,6 @@ public class AMQPMessage extends RefCountMessage { } } - @Override - public boolean containsDeliveryAnnotationProperty(SimpleString key) { - parseHeaders(); - if (_deliveryAnnotations == null || _deliveryAnnotations.getValue() == null) { - return false; - } - return _deliveryAnnotations.getValue().containsKey(key.toString()); - } - @Override public Object removeDeliveryAnnotationProperty(SimpleString key) { parseHeaders(); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index a2a8eda19e..55ad5508b7 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -524,7 +524,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr try { sessionSPI.ack(null, brokerConsumer, message); } catch (Exception e) { - e.printStackTrace(); + log.warn(e.toString(), e); throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage()); } } else if (remoteState instanceof Released) { diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java index 2504a7111a..9fb6eb911c 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java @@ -57,11 +57,6 @@ public class OpenwireMessage implements Message { return null; } - @Override - public boolean containsDeliveryAnnotationProperty(SimpleString property) { - return false; - } - @Override public Object removeDeliveryAnnotationProperty(SimpleString key) { return null; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java index 823eef4eba..cd4f70a07a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java @@ -134,11 +134,7 @@ public class PagedReferenceImpl implements PagedReference { if (deliveryTime == null) { try { Message msg = getMessage(); - if (msg.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) { - deliveryTime = getMessage().getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME); - } else { - deliveryTime = 0L; - } + return msg.getScheduledDeliveryTime(); } catch (Throwable e) { ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); return 0L; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index e2e7b24ece..1f51210a8d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -1121,7 +1121,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding storageManager.storeReference(queue.getID(), message.getMessageID(), !iter.hasNext()); } - if (message.containsDeliveryAnnotationProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) { + + if (deliveryTime > 0) { if (tx != null) { storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(), reference); } else { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java index d059d2cdad..ceec92c0ce 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java @@ -37,7 +37,7 @@ import org.apache.activemq.artemis.core.transaction.Transaction; /** * A queue that will discard messages if a newer message with the same - * {@link org.apache.activemq.artemis.core.message.impl.MessageImpl#HDR_LAST_VALUE_NAME} property value. In other words it only retains the last + * {@link org.apache.activemq.artemis.core.message.impl.CoreMessage#HDR_LAST_VALUE_NAME} property value. In other words it only retains the last * value *

* This is useful for example, for stock prices, where you're only interested in the latest value @@ -73,7 +73,7 @@ public class LastValueQueue extends QueueImpl { return; } - SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME.toString()); + SimpleString prop = ref.getMessage().getLastValueProperty(); if (prop != null) { HolderReference hr = map.get(prop); @@ -97,10 +97,11 @@ public class LastValueQueue extends QueueImpl { @Override public synchronized void addHead(final MessageReference ref, boolean scheduling) { - SimpleString prop = ref.getMessage().getDeliveryAnnotationPropertyString(Message.HDR_LAST_VALUE_NAME); - if (prop != null) { - HolderReference hr = map.get(prop); + SimpleString lastValueProp = ref.getMessage().getLastValueProperty(); + + if (lastValueProp != null) { + HolderReference hr = map.get(lastValueProp); if (hr != null) { if (scheduling) { @@ -119,9 +120,9 @@ public class LastValueQueue extends QueueImpl { } } } else { - hr = new HolderReference(prop, ref); + hr = new HolderReference(lastValueProp, ref); - map.put(prop, hr); + map.put(lastValueProp, hr); super.addHead(hr, scheduling); } @@ -147,7 +148,7 @@ public class LastValueQueue extends QueueImpl { @Override protected void refRemoved(MessageReference ref) { synchronized (this) { - SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME.toString()); + SimpleString prop = ref.getMessage().getLastValueProperty(); if (prop != null) { map.remove(prop); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java index 717e2e2703..bac2d3fa98 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java @@ -213,11 +213,11 @@ public class PostOfficeJournalLoader implements JournalLoader { if (scheduledDeliveryTime != 0 && scheduledDeliveryTime <= currentTime) { scheduledDeliveryTime = 0; - record.getMessage().removeProperty(Message.HDR_SCHEDULED_DELIVERY_TIME); + record.getMessage().setScheduledDeliveryTime(0L); } if (scheduledDeliveryTime != 0) { - record.getMessage().putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, scheduledDeliveryTime); + record.getMessage().setScheduledDeliveryTime(scheduledDeliveryTime); } MessageReference ref = postOffice.reroute(record.getMessage(), queue, null); @@ -225,7 +225,7 @@ public class PostOfficeJournalLoader implements JournalLoader { ref.setDeliveryCount(record.getDeliveryCount()); if (scheduledDeliveryTime != 0) { - record.getMessage().removeProperty(Message.HDR_SCHEDULED_DELIVERY_TIME); + record.getMessage().setScheduledDeliveryTime(0L); } } } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index 2a604e7ff3..8d0628618f 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -298,11 +298,6 @@ public class ScheduledDeliveryHandlerTest extends Assert { return null; } - @Override - public boolean containsDeliveryAnnotationProperty(SimpleString property) { - return false; - } - @Override public Object removeDeliveryAnnotationProperty(SimpleString key) { return null; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java index 7181c61823..43dad847b4 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java @@ -356,11 +356,6 @@ public class AcknowledgeTest extends ActiveMQTestBase { return null; } - @Override - public boolean containsDeliveryAnnotationProperty(SimpleString property) { - return false; - } - @Override public Object removeDeliveryAnnotationProperty(SimpleString key) { return null;