From 65ac7f700ba0a9409ecd345b77a991c5aedd53b9 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Mon, 20 Mar 2017 12:24:42 -0400 Subject: [PATCH] ARTEMIS-1052 Proper Expiry over AMQP --- .../activemq/artemis/api/core/Message.java | 44 ++++-- .../core/message/impl/CoreMessage.java | 4 +- .../protocol/amqp/broker/AMQPMessage.java | 105 +++++++++++---- .../proton/ProtonServerSenderContext.java | 2 +- .../protocol/openwire/OpenwireMessage.java | 4 +- .../impl/journal/LargeServerMessageImpl.java | 2 +- .../core/postoffice/impl/BindingsImpl.java | 4 +- .../core/postoffice/impl/PostOfficeImpl.java | 4 +- .../artemis/core/server/impl/DivertImpl.java | 2 + .../artemis/core/server/impl/QueueImpl.java | 18 ++- .../impl/ScheduledDeliveryHandlerTest.java | 4 +- .../transport/amqp/client/AmqpMessage.java | 2 +- .../amqp/AmqpClientTestSupport.java | 88 +----------- .../amqp/AmqpNettyFailoverTest.java | 1 - .../integration/amqp/AmqpTestSupport.java | 127 ++++++++++++++++++ .../amqp/SendingAndReceivingTest.java | 94 +++++++++++-- .../integration/client/AcknowledgeTest.java | 4 +- 17 files changed, 359 insertions(+), 150 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java index ec0a2db5c1..56097aee16 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java @@ -364,23 +364,30 @@ public interface Message { } setBuffer(null); } + + default void reencode() { + // only valid probably on AMQP + } + default void referenceOriginalMessage(final Message original, String originalQueue) { - String queueOnMessage = original.getStringProperty(Message.HDR_ORIGINAL_QUEUE.toString()); + String queueOnMessage = original.getAnnotationString(Message.HDR_ORIGINAL_QUEUE); if (queueOnMessage != null) { - putStringProperty(Message.HDR_ORIGINAL_QUEUE.toString(), queueOnMessage); + setAnnotation(Message.HDR_ORIGINAL_QUEUE, queueOnMessage); } else if (originalQueue != null) { - putStringProperty(Message.HDR_ORIGINAL_QUEUE.toString(), originalQueue); + setAnnotation(Message.HDR_ORIGINAL_QUEUE, originalQueue); } - if (original.containsProperty(Message.HDR_ORIG_MESSAGE_ID.toString())) { - putStringProperty(Message.HDR_ORIGINAL_ADDRESS.toString(), original.getStringProperty(Message.HDR_ORIGINAL_ADDRESS.toString())); + Object originalID = original.getAnnotation(Message.HDR_ORIG_MESSAGE_ID); - putLongProperty(Message.HDR_ORIG_MESSAGE_ID.toString(), original.getLongProperty(Message.HDR_ORIG_MESSAGE_ID.toString())); + if (originalID != null) { + setAnnotation(Message.HDR_ORIGINAL_ADDRESS, original.getAnnotationString(Message.HDR_ORIGINAL_ADDRESS)); + + setAnnotation(Message.HDR_ORIG_MESSAGE_ID, originalID); } else { - putStringProperty(Message.HDR_ORIGINAL_ADDRESS.toString(), original.getAddress()); + setAnnotation(Message.HDR_ORIGINAL_ADDRESS, original.getAddress()); - putLongProperty(Message.HDR_ORIG_MESSAGE_ID.toString(), original.getMessageID()); + setAnnotation(Message.HDR_ORIG_MESSAGE_ID, original.getMessageID()); } // reset expiry @@ -503,9 +510,26 @@ public interface Message { Object getObjectProperty(SimpleString key); - Object removeDeliveryAnnotationProperty(SimpleString key); + default Object removeAnnotation(SimpleString key) { + return removeProperty(key); + } - Object getDeliveryAnnotationProperty(SimpleString key); + default String getAnnotationString(SimpleString key) { + Object value = getAnnotation(key); + if (value != null) { + return value.toString(); + } else { + return null; + } + } + + Object getAnnotation(SimpleString key); + + /** Callers must call {@link #reencode()} in order to be sent to clients */ + default Message setAnnotation(SimpleString key, Object value) { + putObjectProperty(key, value); + return this; + } Short getShortProperty(SimpleString key) throws ActiveMQPropertyConversionException; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java index ce1ea96a2f..f0a8715fdc 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java @@ -98,13 +98,13 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { /** On core there's no delivery annotation */ @Override - public Object getDeliveryAnnotationProperty(SimpleString key) { + public Object getAnnotation(SimpleString key) { return getObjectProperty(key); } /** On core there's no delivery annotation */ @Override - public Object removeDeliveryAnnotationProperty(SimpleString key) { + public Object removeAnnotation(SimpleString key) { return removeProperty(key); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index c1c676cf19..e01d430306 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.protocol.amqp.broker; import java.nio.ByteBuffer; import java.util.Collections; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -62,14 +63,14 @@ public class AMQPMessage extends RefCountMessage { final long messageFormat; ByteBuf data; boolean bufferValid; - byte type; + boolean durable; long messageID; String address; MessageImpl protonMessage; private volatile int memoryEstimate = -1; private long expiration = 0; // this is to store where to start sending bytes, ignoring header and delivery annotations. - private int sendFrom = -1; + private int sendFrom = 0; private boolean parsedHeaders = false; private Header _header; private DeliveryAnnotations _deliveryAnnotations; @@ -123,7 +124,7 @@ public class AMQPMessage extends RefCountMessage { private void initalizeObjects() { if (protonMessage == null) { if (data == null) { - this.sendFrom = -1; + this.sendFrom = 0; _header = new Header(); _deliveryAnnotations = new DeliveryAnnotations(new HashMap<>()); _properties = new Properties(); @@ -220,12 +221,27 @@ public class AMQPMessage extends RefCountMessage { return null; } + private Object removeSymbol(Symbol symbol) { + MessageAnnotations annotations = getMessageAnnotations(); + Map mapAnnotations = annotations != null ? annotations.getValue() : null; + if (mapAnnotations != null) { + return mapAnnotations.remove(symbol); + } + + return null; + } + + private void setSymbol(String symbol, Object value) { setSymbol(Symbol.getSymbol(symbol), value); } private void setSymbol(Symbol symbol, Object value) { MessageAnnotations annotations = getMessageAnnotations(); + if (annotations == null) { + _messageAnnotations = new MessageAnnotations(new HashMap<>()); + annotations = _messageAnnotations; + } Map mapAnnotations = annotations != null ? annotations.getValue() : null; if (mapAnnotations != null) { mapAnnotations.put(symbol, value); @@ -408,7 +424,14 @@ public class AMQPMessage extends RefCountMessage { @Override public org.apache.activemq.artemis.api.core.Message copy() { checkBuffer(); - AMQPMessage newEncode = new AMQPMessage(this.messageFormat, data.array()); + + byte[] origin = data.array(); + byte[] newData = new byte[data.array().length - sendFrom]; + for (int i = 0; i < newData.length; i++) { + newData[i] = origin[i + sendFrom]; + } + AMQPMessage newEncode = new AMQPMessage(this.messageFormat, newData); + newEncode.setDurable(isDurable()); return newEncode; } @@ -436,6 +459,16 @@ public class AMQPMessage extends RefCountMessage { @Override public AMQPMessage setExpiration(long expiration) { + + Properties properties = getProperties(); + + if (properties != null) { + if (expiration <= 0) { + properties.setAbsoluteExpiryTime(null); + } else { + properties.setAbsoluteExpiryTime(new Date(expiration)); + } + } this.expiration = expiration; return this; } @@ -460,7 +493,7 @@ public class AMQPMessage extends RefCountMessage { if (getHeader() != null && getHeader().getDurable() != null) { return getHeader().getDurable().booleanValue(); } else { - return false; + return durable; } } @@ -471,7 +504,8 @@ public class AMQPMessage extends RefCountMessage { @Override public org.apache.activemq.artemis.api.core.Message setDurable(boolean durable) { - return null; + this.durable = durable; + return this; } @Override @@ -543,12 +577,20 @@ public class AMQPMessage extends RefCountMessage { } } + @Override + public int getEncodeSize() { + checkBuffer(); + // + 20checkBuffer is an estimate for the Header with the deliveryCount + return data.array().length - sendFrom + 20; + } + @Override public void sendBuffer(ByteBuf buffer, int deliveryCount) { checkBuffer(); Header header = getHeader(); if (header == null && deliveryCount > 0) { header = new Header(); + header.setDurable(durable); } if (header != null) { synchronized (header) { @@ -756,19 +798,36 @@ public class AMQPMessage extends RefCountMessage { } @Override - public Object removeDeliveryAnnotationProperty(SimpleString key) { - parseHeaders(); - if (_deliveryAnnotations == null || _deliveryAnnotations.getValue() == null) { - return null; - } - return _deliveryAnnotations.getValue().remove(key.toString()); + public Object removeAnnotation(SimpleString key) { + return removeSymbol(Symbol.getSymbol(key.toString())); } @Override - public Object getDeliveryAnnotationProperty(SimpleString key) { - return null; + public Object getAnnotation(SimpleString key) { + return getSymbol(key.toString()); } + @Override + public AMQPMessage setAnnotation(SimpleString key, Object value) { + setSymbol(key.toString(), value); + return this; + } + + + @Override + public void reencode() { + parseHeaders(); + ApplicationProperties properties = getApplicationProperties(); + getProtonMessage().setDeliveryAnnotations(_deliveryAnnotations); + getProtonMessage().setMessageAnnotations(_messageAnnotations); + getProtonMessage().setApplicationProperties(properties); + getProtonMessage().setProperties(this._properties); + bufferValid = false; + checkBuffer(); + } + + + @Override public SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException { return SimpleString.toSimpleString((String) getApplicationPropertiesMap().get(key)); @@ -849,11 +908,6 @@ public class AMQPMessage extends RefCountMessage { return putStringProperty(key.toString(), value.toString()); } - @Override - public int getEncodeSize() { - return 0; - } - @Override public Set getPropertyNames() { HashSet values = new HashSet<>(); @@ -901,15 +955,18 @@ public class AMQPMessage extends RefCountMessage { @Override public int getPersistSize() { - checkBuffer(); - return data.array().length + DataConstants.SIZE_INT; + return DataConstants.SIZE_INT + internalPersistSize(); + } + + private int internalPersistSize() { + return data.array().length - sendFrom; } @Override public void persist(ActiveMQBuffer targetRecord) { checkBuffer(); - targetRecord.writeInt(data.array().length); - targetRecord.writeBytes(data.array()); + targetRecord.writeInt(internalPersistSize()); + targetRecord.writeBytes(data.array(), sendFrom, data.array().length - sendFrom); } @Override @@ -917,8 +974,10 @@ public class AMQPMessage extends RefCountMessage { int size = record.readInt(); byte[] recordArray = new byte[size]; record.readBytes(recordArray); + this.sendFrom = 0; // whatever was persisted will be sent this.data = Unpooled.wrappedBuffer(recordArray); this.bufferValid = true; + this.durable = true; // it's coming from the journal, so it's durable parseHeaders(); } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 5a97c02da3..d24464c054 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -594,7 +594,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr // we only need a tag if we are going to settle later byte[] tag = preSettle ? new byte[0] : protonSession.getTag(); - ByteBuf nettyBuffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024); + ByteBuf nettyBuffer = PooledByteBufAllocator.DEFAULT.heapBuffer(message.getEncodeSize()); try { message.sendBuffer(nettyBuffer, deliveryCount); diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java index 9fb6eb911c..3bd95f4aa5 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java @@ -58,12 +58,12 @@ public class OpenwireMessage implements Message { } @Override - public Object removeDeliveryAnnotationProperty(SimpleString key) { + public Object removeAnnotation(SimpleString key) { return null; } @Override - public Object getDeliveryAnnotationProperty(SimpleString key) { + public Object getAnnotation(SimpleString key) { return null; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java index 46bd335882..2c3e01cffb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java @@ -242,7 +242,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe LargeServerMessageImpl otherLM = (LargeServerMessageImpl) original; this.paged = otherLM.paged; if (this.paged) { - this.removeProperty(Message.HDR_ORIG_MESSAGE_ID); + this.removeAnnotation(Message.HDR_ORIG_MESSAGE_ID); } } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java index c7b6024111..377223b97e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java @@ -238,7 +238,7 @@ public final class BindingsImpl implements Bindings { /* This is a special treatment for scaled-down messages involving SnF queues. * See org.apache.activemq.artemis.core.server.impl.ScaleDownHandler.scaleDownMessages() for the logic that sends messages with this property */ - byte[] ids = (byte[]) message.removeDeliveryAnnotationProperty(Message.HDR_SCALEDOWN_TO_IDS); + byte[] ids = (byte[]) message.removeAnnotation(Message.HDR_SCALEDOWN_TO_IDS); if (ids != null) { ByteBuffer buffer = ByteBuffer.wrap(ids); @@ -268,7 +268,7 @@ public final class BindingsImpl implements Bindings { if (!routed) { // Remove the ids now, in order to avoid double check - ids = (byte[]) message.removeDeliveryAnnotationProperty(Message.HDR_ROUTE_TO_IDS); + ids = (byte[]) message.removeAnnotation(Message.HDR_ROUTE_TO_IDS); // Fetch the groupId now, in order to avoid double checking SimpleString groupId = message.getGroupID(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 1f51210a8d..2ef76571dc 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -765,6 +765,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding message.setAddress(dlaAddress); + message.reencode(); + route(message, context.getTransaction(), false); result = RoutingStatus.NO_BINDINGS_DLA; } @@ -1221,7 +1223,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding AtomicBoolean startedTX) throws Exception { // Check the DuplicateCache for the Bridge first - Object bridgeDup = message.removeDeliveryAnnotationProperty(Message.HDR_BRIDGE_DUPLICATE_ID); + Object bridgeDup = message.removeAnnotation(Message.HDR_BRIDGE_DUPLICATE_ID); if (bridgeDup != null) { // if the message is being sent from the bridge, we just ignore the duplicate id, and use the internal one byte[] bridgeDupBytes = (byte[]) bridgeDup; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java index 5b0d406133..c73fd80a6c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java @@ -104,6 +104,8 @@ public class DivertImpl implements Divert { copy.setExpiration(message.getExpiration()); + copy.reencode(); + switch (routingType) { case ANYCAST: copy.putByteProperty(Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType()); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index fc655f682d..406ba5d2f2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -1718,14 +1718,14 @@ public class QueueImpl implements Queue { @Override public int retryMessages(Filter filter) throws Exception { - final HashMap queues = new HashMap<>(); + final HashMap queues = new HashMap<>(); return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() { @Override public void actMessage(Transaction tx, MessageReference ref) throws Exception { - SimpleString originalMessageAddress = ref.getMessage().getSimpleStringProperty(Message.HDR_ORIGINAL_ADDRESS); - SimpleString originalMessageQueue = ref.getMessage().getSimpleStringProperty(Message.HDR_ORIGINAL_QUEUE); + String originalMessageAddress = ref.getMessage().getAnnotationString(Message.HDR_ORIGINAL_ADDRESS); + String originalMessageQueue = ref.getMessage().getAnnotationString(Message.HDR_ORIGINAL_QUEUE); if (originalMessageAddress != null) { @@ -1735,7 +1735,7 @@ public class QueueImpl implements Queue { if (originalMessageQueue != null && !originalMessageQueue.equals(originalMessageAddress)) { targetQueue = queues.get(originalMessageQueue); if (targetQueue == null) { - Binding binding = postOffice.getBinding(originalMessageQueue); + Binding binding = postOffice.getBinding(SimpleString.toSimpleString(originalMessageQueue)); if (binding != null && binding instanceof LocalQueueBinding) { targetQueue = ((LocalQueueBinding) binding).getID(); @@ -1745,9 +1745,9 @@ public class QueueImpl implements Queue { } if (targetQueue != null) { - move(originalMessageAddress, tx, ref, false, false, targetQueue.longValue()); + move(SimpleString.toSimpleString(originalMessageAddress), tx, ref, false, false, targetQueue.longValue()); } else { - move(originalMessageAddress, tx, ref, false, false); + move(SimpleString.toSimpleString(originalMessageAddress), tx, ref, false, false); } @@ -2495,10 +2495,14 @@ public class QueueImpl implements Queue { copy.referenceOriginalMessage(message, ref != null ? ref.getQueue().getName().toString() : null); } + copy.setExpiration(0); + if (expiry) { - copy.putLongProperty(Message.HDR_ACTUAL_EXPIRY_TIME.toString(), System.currentTimeMillis()); + copy.setAnnotation(Message.HDR_ACTUAL_EXPIRY_TIME, System.currentTimeMillis()); } + copy.reencode(); + return copy; } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index 8d0628618f..dd48b58ffe 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -299,12 +299,12 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override - public Object removeDeliveryAnnotationProperty(SimpleString key) { + public Object removeAnnotation(SimpleString key) { return null; } @Override - public Object getDeliveryAnnotationProperty(SimpleString key) { + public Object getAnnotation(SimpleString key) { return null; } diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java index 8599fa9656..bf9e0b5117 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java @@ -471,7 +471,7 @@ public class AmqpMessage { /** * Sets the creation time property on the message. * - * @param absoluteExpiryTime the expiration time value to set. + * @param creationTime the time value to set. */ public void setCreationTime(long creationTime) { checkReadOnly(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java index d9b45d3fea..99ce4dbee5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java @@ -17,8 +17,6 @@ package org.apache.activemq.artemis.tests.integration.amqp; -import java.net.URI; -import java.util.LinkedList; import java.util.Set; import org.apache.activemq.artemis.api.core.SimpleString; @@ -32,7 +30,6 @@ import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.jms.server.JMSServerManager; import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl; -import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpMessage; @@ -46,23 +43,14 @@ import org.junit.Before; * Test support class for tests that will be using the AMQP Proton wrapper client. * This is to make it easier to migrate tests from ActiveMQ5 */ -public class AmqpClientTestSupport extends ActiveMQTestBase { +public class AmqpClientTestSupport extends AmqpTestSupport { protected static Symbol SHARED = Symbol.getSymbol("shared"); protected static Symbol GLOBAL = Symbol.getSymbol("global"); - private boolean useSSL; - protected JMSServerManager serverManager; protected ActiveMQServer server; - protected LinkedList connections = new LinkedList<>(); - - protected AmqpConnection addConnection(AmqpConnection connection) { - connections.add(connection); - return connection; - } - @Before @Override public void setUp() throws Exception { @@ -80,6 +68,7 @@ public class AmqpClientTestSupport extends ActiveMQTestBase { ignored.printStackTrace(); } } + connections.clear(); if (serverManager != null) { try { @@ -149,79 +138,6 @@ public class AmqpClientTestSupport extends ActiveMQTestBase { this.useSSL = useSSL; } - public boolean isUseSSL() { - return useSSL; - } - - public String getAmqpConnectionURIOptions() { - return ""; - } - - public URI getBrokerAmqpConnectionURI() { - boolean webSocket = false; - - try { - int port = 61616; - - String uri = null; - - if (isUseSSL()) { - if (webSocket) { - uri = "wss://127.0.0.1:" + port; - } else { - uri = "ssl://127.0.0.1:" + port; - } - } else { - if (webSocket) { - uri = "ws://127.0.0.1:" + port; - } else { - uri = "tcp://127.0.0.1:" + port; - } - } - - if (!getAmqpConnectionURIOptions().isEmpty()) { - uri = uri + "?" + getAmqpConnectionURIOptions(); - } - - return new URI(uri); - } catch (Exception e) { - throw new RuntimeException(); - } - } - - public AmqpConnection createAmqpConnection() throws Exception { - return createAmqpConnection(getBrokerAmqpConnectionURI()); - } - - public AmqpConnection createAmqpConnection(String username, String password) throws Exception { - return createAmqpConnection(getBrokerAmqpConnectionURI(), username, password); - } - - public AmqpConnection createAmqpConnection(URI brokerURI) throws Exception { - return createAmqpConnection(brokerURI, null, null); - } - - public AmqpConnection createAmqpConnection(URI brokerURI, String username, String password) throws Exception { - return createAmqpClient(brokerURI, username, password).connect(); - } - - public AmqpClient createAmqpClient() throws Exception { - return createAmqpClient(getBrokerAmqpConnectionURI(), null, null); - } - - public AmqpClient createAmqpClient(URI brokerURI) throws Exception { - return createAmqpClient(brokerURI, null, null); - } - - public AmqpClient createAmqpClient(String username, String password) throws Exception { - return createAmqpClient(getBrokerAmqpConnectionURI(), username, password); - } - - public AmqpClient createAmqpClient(URI brokerURI, String username, String password) throws Exception { - return new AmqpClient(brokerURI, username, password); - } - - protected void sendMessages(int numMessages, String address) throws Exception { AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNettyFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNettyFailoverTest.java index 5496883404..5fb4e35415 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNettyFailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNettyFailoverTest.java @@ -18,7 +18,6 @@ package org.apache.activemq.artemis.tests.integration.amqp; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; -import org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTest; import org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase; import org.apache.qpid.jms.JmsConnectionFactory; import org.junit.Assert; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java new file mode 100644 index 0000000000..8f8945213a --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.amqp; + +import java.net.URI; +import java.util.LinkedList; + +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.junit.After; + +/** This will only add methods to support AMQP Testing without creating servers or anything */ +public class AmqpTestSupport extends ActiveMQTestBase { + protected LinkedList connections = new LinkedList<>(); + + protected boolean useSSL; + + protected AmqpConnection addConnection(AmqpConnection connection) { + connections.add(connection); + return connection; + } + + @After + @Override + public void tearDown() throws Exception { + for (AmqpConnection conn : connections) { + try { + conn.close(); + } catch (Throwable ignored) { + ignored.printStackTrace(); + } + } + + super.tearDown(); + } + + public boolean isUseSSL() { + return useSSL; + } + + public String getAmqpConnectionURIOptions() { + return ""; + } + + public URI getBrokerAmqpConnectionURI() { + boolean webSocket = false; + + try { + int port = 61616; + + String uri = null; + + if (isUseSSL()) { + if (webSocket) { + uri = "wss://127.0.0.1:" + port; + } else { + uri = "ssl://127.0.0.1:" + port; + } + } else { + if (webSocket) { + uri = "ws://127.0.0.1:" + port; + } else { + uri = "tcp://127.0.0.1:" + port; + } + } + + if (!getAmqpConnectionURIOptions().isEmpty()) { + uri = uri + "?" + getAmqpConnectionURIOptions(); + } + + return new URI(uri); + } catch (Exception e) { + throw new RuntimeException(); + } + } + + public AmqpConnection createAmqpConnection() throws Exception { + return createAmqpConnection(getBrokerAmqpConnectionURI()); + } + + public AmqpConnection createAmqpConnection(String username, String password) throws Exception { + return createAmqpConnection(getBrokerAmqpConnectionURI(), username, password); + } + + public AmqpConnection createAmqpConnection(URI brokerURI) throws Exception { + return createAmqpConnection(brokerURI, null, null); + } + + public AmqpConnection createAmqpConnection(URI brokerURI, String username, String password) throws Exception { + return createAmqpClient(brokerURI, username, password).connect(); + } + + public AmqpClient createAmqpClient() throws Exception { + return createAmqpClient(getBrokerAmqpConnectionURI(), null, null); + } + + public AmqpClient createAmqpClient(URI brokerURI) throws Exception { + return createAmqpClient(brokerURI, null, null); + } + + public AmqpClient createAmqpClient(String username, String password) throws Exception { + return createAmqpClient(getBrokerAmqpConnectionURI(), username, password); + } + + public AmqpClient createAmqpClient(URI brokerURI, String username, String password) throws Exception { + return new AmqpClient(brokerURI, username, password); + } + + + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java index 3c4e915943..87a4710813 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java @@ -26,20 +26,28 @@ import javax.jms.Session; import javax.jms.TextMessage; import java.util.Random; import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; -import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; import org.apache.qpid.jms.JmsConnectionFactory; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -public class SendingAndReceivingTest extends ActiveMQTestBase { +public class SendingAndReceivingTest extends AmqpTestSupport { private ActiveMQServer server; @@ -55,7 +63,15 @@ public class SendingAndReceivingTest extends ActiveMQTestBase { tc.getExtraParams().put("multicastPrefix", "multicast://"); } } + server.getConfiguration().setMessageExpiryScanPeriod(1); server.start(); + server.createQueue(SimpleString.toSimpleString("exampleQueue"), RoutingType.ANYCAST, SimpleString.toSimpleString("exampleQueue"), null, true, false, -1, false, true); + server.createQueue(SimpleString.toSimpleString("DLQ"), RoutingType.ANYCAST, SimpleString.toSimpleString("DLQ"), null, true, false, -1, false, true); + + AddressSettings as = new AddressSettings(); + as.setExpiryAddress(SimpleString.toSimpleString("DLQ")); + HierarchicalRepository repos = server.getAddressSettingsRepository(); + repos.addMatch("#", as); } @After @@ -112,21 +128,24 @@ public class SendingAndReceivingTest extends ActiveMQTestBase { Queue queue = session.createQueue(address); MessageProducer sender = session.createProducer(queue); - sender.setTimeToLive(10); + sender.setTimeToLive(1); Message message = session.createMessage(); sender.send(message); connection.start(); - MessageConsumer consumer = session.createConsumer(queue); - Message m = consumer.receive(5000); + MessageConsumer consumer = session.createConsumer(session.createQueue("DLQ")); + Message m = consumer.receive(10000); + Assert.assertNotNull(m); + consumer.close(); + + + consumer = session.createConsumer(queue); + m = consumer.receiveNoWait(); Assert.assertNull(m); consumer.close(); - consumer = session.createConsumer(session.createQueue("DLQ")); - m = consumer.receive(5000); - Assert.assertNotNull(m); - consumer.close(); + } finally { if (connection != null) { connection.close(); @@ -134,6 +153,63 @@ public class SendingAndReceivingTest extends ActiveMQTestBase { } } + @Test(timeout = 60000) + public void testSendExpiry() throws Throwable { + internalSendExpiry(false); + } + + @Test(timeout = 60000) + public void testSendExpiryRestartServer() throws Throwable { + internalSendExpiry(true); + } + + public void internalSendExpiry(boolean restartServer) throws Throwable { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + + try { + + // Normal Session which won't create an TXN itself + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender("exampleQueue"); + + AmqpMessage message = new AmqpMessage(); + message.setDurable(true); + message.setText("Test-Message"); + message.setDeliveryAnnotation("shouldDisappear", 1); + message.setAbsoluteExpiryTime(System.currentTimeMillis() + 1000); + sender.send(message); + + org.apache.activemq.artemis.core.server.Queue dlq = server.locateQueue(SimpleString.toSimpleString("DLQ")); + + Wait.waitFor(() -> dlq.getMessageCount() > 0, 5000, 500); + + connection.close(); + + if (restartServer) { + server.stop(); + server.start(); + } + + connection = client.connect(); + session = connection.createSession(); + + // Read all messages from the Queue, do not accept them yet. + AmqpReceiver receiver = session.createReceiver("DLQ"); + receiver.flow(20); + message = receiver.receive(5, TimeUnit.SECONDS); + Assert.assertNotNull(message); + Assert.assertEquals("exampleQueue", message.getMessageAnnotation(org.apache.activemq.artemis.api.core.Message.HDR_ORIGINAL_ADDRESS.toString())); + Assert.assertNull(message.getDeliveryAnnotation("shouldDisappear")); + Assert.assertNull(receiver.receiveNoWait()); + + } finally { + connection.close(); + } + } + + + private static String createMessage(int messageSize) { final String AB = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"; Random rnd = new Random(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java index 43dad847b4..31e26e38e8 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java @@ -357,12 +357,12 @@ public class AcknowledgeTest extends ActiveMQTestBase { } @Override - public Object removeDeliveryAnnotationProperty(SimpleString key) { + public Object removeAnnotation(SimpleString key) { return null; } @Override - public Object getDeliveryAnnotationProperty(SimpleString key) { + public Object getAnnotation(SimpleString key) { return null; }