ARTEMIS-1009 AMQP shouldn't use application properties
As part of my refactoring on AMQP, the broker shouldn't rely on Application properties for any broker semantic changes on delivery. I am removing any access to those now, so we can properly deal with this post 2.0.0.
This commit is contained in:
parent
78e935b184
commit
a41a1930ef
|
@ -164,25 +164,15 @@ public interface Message {
|
||||||
|
|
||||||
byte STREAM_TYPE = 6;
|
byte STREAM_TYPE = 6;
|
||||||
|
|
||||||
|
|
||||||
default SimpleString getDeliveryAnnotationPropertyString(SimpleString property) {
|
|
||||||
Object obj = getDeliveryAnnotationProperty(property);
|
|
||||||
if (obj == null) {
|
|
||||||
return null;
|
|
||||||
} else if (obj instanceof SimpleString) {
|
|
||||||
return (SimpleString)obj;
|
|
||||||
} else {
|
|
||||||
return SimpleString.toSimpleString(obj.toString());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
default void cleanupInternalProperties() {
|
default void cleanupInternalProperties() {
|
||||||
// only on core
|
// only on core
|
||||||
}
|
}
|
||||||
|
|
||||||
RoutingType getRouteType();
|
RoutingType getRouteType();
|
||||||
|
|
||||||
boolean containsDeliveryAnnotationProperty(SimpleString property);
|
default SimpleString getLastValueProperty() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @deprecated do not use this, use through ICoreMessage or ClientMessage
|
* @deprecated do not use this, use through ICoreMessage or ClientMessage
|
||||||
|
@ -417,15 +407,10 @@ public interface Message {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* it will translate a property named HDR_DUPLICATE_DETECTION_ID.
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
default Object getDuplicateProperty() {
|
default Object getDuplicateProperty() {
|
||||||
return getDeliveryAnnotationProperty(Message.HDR_DUPLICATE_DETECTION_ID);
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
Message putBooleanProperty(String key, boolean value);
|
Message putBooleanProperty(String key, boolean value);
|
||||||
|
|
||||||
Message putByteProperty(String key, byte value);
|
Message putByteProperty(String key, byte value);
|
||||||
|
|
|
@ -132,12 +132,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean containsDeliveryAnnotationProperty(SimpleString property) {
|
|
||||||
checkProperties();
|
|
||||||
return properties.containsProperty(property);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Persister<Message> getPersister() {
|
public Persister<Message> getPersister() {
|
||||||
return CoreMessagePersister.getInstance();
|
return CoreMessagePersister.getInstance();
|
||||||
|
@ -225,13 +219,17 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
||||||
return ((Number) property).longValue();
|
return ((Number) property).longValue();
|
||||||
}
|
}
|
||||||
|
|
||||||
return null;
|
return 0L;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CoreMessage setScheduledDeliveryTime(Long time) {
|
public CoreMessage setScheduledDeliveryTime(Long time) {
|
||||||
checkProperties();
|
checkProperties();
|
||||||
putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, time);
|
if (time == null || time == 0) {
|
||||||
|
removeProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
|
||||||
|
} else {
|
||||||
|
putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, time);
|
||||||
|
}
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -568,6 +566,15 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
||||||
/* PropertySize and Properties */checkProperties().getEncodeSize();
|
/* PropertySize and Properties */checkProperties().getEncodeSize();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object getDuplicateProperty() {
|
||||||
|
return getObjectProperty(Message.HDR_DUPLICATE_DETECTION_ID);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SimpleString getLastValueProperty() {
|
||||||
|
return getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getEncodeSize() {
|
public int getEncodeSize() {
|
||||||
|
|
|
@ -726,23 +726,12 @@ public class ActiveMQMessage implements javax.jms.Message {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getJMSDeliveryTime() throws JMSException {
|
public long getJMSDeliveryTime() throws JMSException {
|
||||||
Long value;
|
return message.getScheduledDeliveryTime();
|
||||||
try {
|
|
||||||
value = message.getLongProperty(org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME);
|
|
||||||
} catch (Exception e) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (value == null) {
|
|
||||||
return 0;
|
|
||||||
} else {
|
|
||||||
return value.longValue();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setJMSDeliveryTime(long deliveryTime) throws JMSException {
|
public void setJMSDeliveryTime(long deliveryTime) throws JMSException {
|
||||||
message.putLongProperty(org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME, deliveryTime);
|
message.setScheduledDeliveryTime(deliveryTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -262,7 +262,7 @@ public class AMQPMessage extends RefCountMessage {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return scheduledTime == 0 ? null : scheduledTime;
|
return scheduledTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -442,6 +442,13 @@ public class AMQPMessage extends RefCountMessage {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object getDuplicateProperty() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public org.apache.activemq.artemis.api.core.Message setDurable(boolean durable) {
|
public org.apache.activemq.artemis.api.core.Message setDurable(boolean durable) {
|
||||||
return null;
|
return null;
|
||||||
|
@ -720,15 +727,6 @@ public class AMQPMessage extends RefCountMessage {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean containsDeliveryAnnotationProperty(SimpleString key) {
|
|
||||||
parseHeaders();
|
|
||||||
if (_deliveryAnnotations == null || _deliveryAnnotations.getValue() == null) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return _deliveryAnnotations.getValue().containsKey(key.toString());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Object removeDeliveryAnnotationProperty(SimpleString key) {
|
public Object removeDeliveryAnnotationProperty(SimpleString key) {
|
||||||
parseHeaders();
|
parseHeaders();
|
||||||
|
|
|
@ -524,7 +524,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
||||||
try {
|
try {
|
||||||
sessionSPI.ack(null, brokerConsumer, message);
|
sessionSPI.ack(null, brokerConsumer, message);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
e.printStackTrace();
|
log.warn(e.toString(), e);
|
||||||
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
|
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
|
||||||
}
|
}
|
||||||
} else if (remoteState instanceof Released) {
|
} else if (remoteState instanceof Released) {
|
||||||
|
|
|
@ -57,11 +57,6 @@ public class OpenwireMessage implements Message {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean containsDeliveryAnnotationProperty(SimpleString property) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Object removeDeliveryAnnotationProperty(SimpleString key) {
|
public Object removeDeliveryAnnotationProperty(SimpleString key) {
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -134,11 +134,7 @@ public class PagedReferenceImpl implements PagedReference {
|
||||||
if (deliveryTime == null) {
|
if (deliveryTime == null) {
|
||||||
try {
|
try {
|
||||||
Message msg = getMessage();
|
Message msg = getMessage();
|
||||||
if (msg.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) {
|
return msg.getScheduledDeliveryTime();
|
||||||
deliveryTime = getMessage().getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
|
|
||||||
} else {
|
|
||||||
deliveryTime = 0L;
|
|
||||||
}
|
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
|
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
|
||||||
return 0L;
|
return 0L;
|
||||||
|
|
|
@ -1121,7 +1121,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
storageManager.storeReference(queue.getID(), message.getMessageID(), !iter.hasNext());
|
storageManager.storeReference(queue.getID(), message.getMessageID(), !iter.hasNext());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (message.containsDeliveryAnnotationProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) {
|
|
||||||
|
if (deliveryTime > 0) {
|
||||||
if (tx != null) {
|
if (tx != null) {
|
||||||
storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(), reference);
|
storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(), reference);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -37,7 +37,7 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A queue that will discard messages if a newer message with the same
|
* A queue that will discard messages if a newer message with the same
|
||||||
* {@link org.apache.activemq.artemis.core.message.impl.MessageImpl#HDR_LAST_VALUE_NAME} property value. In other words it only retains the last
|
* {@link org.apache.activemq.artemis.core.message.impl.CoreMessage#HDR_LAST_VALUE_NAME} property value. In other words it only retains the last
|
||||||
* value
|
* value
|
||||||
* <p>
|
* <p>
|
||||||
* This is useful for example, for stock prices, where you're only interested in the latest value
|
* This is useful for example, for stock prices, where you're only interested in the latest value
|
||||||
|
@ -73,7 +73,7 @@ public class LastValueQueue extends QueueImpl {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME.toString());
|
SimpleString prop = ref.getMessage().getLastValueProperty();
|
||||||
|
|
||||||
if (prop != null) {
|
if (prop != null) {
|
||||||
HolderReference hr = map.get(prop);
|
HolderReference hr = map.get(prop);
|
||||||
|
@ -97,10 +97,11 @@ public class LastValueQueue extends QueueImpl {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void addHead(final MessageReference ref, boolean scheduling) {
|
public synchronized void addHead(final MessageReference ref, boolean scheduling) {
|
||||||
SimpleString prop = ref.getMessage().getDeliveryAnnotationPropertyString(Message.HDR_LAST_VALUE_NAME);
|
|
||||||
|
|
||||||
if (prop != null) {
|
SimpleString lastValueProp = ref.getMessage().getLastValueProperty();
|
||||||
HolderReference hr = map.get(prop);
|
|
||||||
|
if (lastValueProp != null) {
|
||||||
|
HolderReference hr = map.get(lastValueProp);
|
||||||
|
|
||||||
if (hr != null) {
|
if (hr != null) {
|
||||||
if (scheduling) {
|
if (scheduling) {
|
||||||
|
@ -119,9 +120,9 @@ public class LastValueQueue extends QueueImpl {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
hr = new HolderReference(prop, ref);
|
hr = new HolderReference(lastValueProp, ref);
|
||||||
|
|
||||||
map.put(prop, hr);
|
map.put(lastValueProp, hr);
|
||||||
|
|
||||||
super.addHead(hr, scheduling);
|
super.addHead(hr, scheduling);
|
||||||
}
|
}
|
||||||
|
@ -147,7 +148,7 @@ public class LastValueQueue extends QueueImpl {
|
||||||
@Override
|
@Override
|
||||||
protected void refRemoved(MessageReference ref) {
|
protected void refRemoved(MessageReference ref) {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME.toString());
|
SimpleString prop = ref.getMessage().getLastValueProperty();
|
||||||
|
|
||||||
if (prop != null) {
|
if (prop != null) {
|
||||||
map.remove(prop);
|
map.remove(prop);
|
||||||
|
|
|
@ -213,11 +213,11 @@ public class PostOfficeJournalLoader implements JournalLoader {
|
||||||
|
|
||||||
if (scheduledDeliveryTime != 0 && scheduledDeliveryTime <= currentTime) {
|
if (scheduledDeliveryTime != 0 && scheduledDeliveryTime <= currentTime) {
|
||||||
scheduledDeliveryTime = 0;
|
scheduledDeliveryTime = 0;
|
||||||
record.getMessage().removeProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
|
record.getMessage().setScheduledDeliveryTime(0L);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (scheduledDeliveryTime != 0) {
|
if (scheduledDeliveryTime != 0) {
|
||||||
record.getMessage().putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, scheduledDeliveryTime);
|
record.getMessage().setScheduledDeliveryTime(scheduledDeliveryTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
MessageReference ref = postOffice.reroute(record.getMessage(), queue, null);
|
MessageReference ref = postOffice.reroute(record.getMessage(), queue, null);
|
||||||
|
@ -225,7 +225,7 @@ public class PostOfficeJournalLoader implements JournalLoader {
|
||||||
ref.setDeliveryCount(record.getDeliveryCount());
|
ref.setDeliveryCount(record.getDeliveryCount());
|
||||||
|
|
||||||
if (scheduledDeliveryTime != 0) {
|
if (scheduledDeliveryTime != 0) {
|
||||||
record.getMessage().removeProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
|
record.getMessage().setScheduledDeliveryTime(0L);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -298,11 +298,6 @@ public class ScheduledDeliveryHandlerTest extends Assert {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean containsDeliveryAnnotationProperty(SimpleString property) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Object removeDeliveryAnnotationProperty(SimpleString key) {
|
public Object removeDeliveryAnnotationProperty(SimpleString key) {
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -356,11 +356,6 @@ public class AcknowledgeTest extends ActiveMQTestBase {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean containsDeliveryAnnotationProperty(SimpleString property) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Object removeDeliveryAnnotationProperty(SimpleString key) {
|
public Object removeDeliveryAnnotationProperty(SimpleString key) {
|
||||||
return null;
|
return null;
|
||||||
|
|
Loading…
Reference in New Issue