From 1f4473e8d77ccc724a16921b5e1207b565ab7c4c Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Tue, 28 Mar 2017 17:59:41 -0400 Subject: [PATCH] ARTEMIS-1081 Implementing AMQP UndeliverableHere --- .../activemq/artemis/api/core/Message.java | 7 ++++++ .../protocol/amqp/broker/AMQPMessage.java | 22 +++++++++++++++++++ .../proton/ProtonServerSenderContext.java | 10 +++++++-- .../artemis/core/server/Consumer.java | 3 +++ .../core/server/cluster/impl/BridgeImpl.java | 9 ++++++++ .../server/cluster/impl/Redistributor.java | 9 ++++++++ .../core/server/impl/ServerConsumerImpl.java | 14 ++++++++++++ .../integration/cli/DummyServerConsumer.java | 5 +++++ .../core/server/impl/fakes/FakeConsumer.java | 5 +++++ 9 files changed, 82 insertions(+), 2 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java index 9cd3fa744c..856e8653ba 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java @@ -251,6 +251,13 @@ public interface Message { /** It will generate a new instance of the message encode, being a deep copy, new properties, new everything */ Message copy(long newID); + default boolean acceptsConsumer(long uniqueConsumerID) { + return true; + } + + default void rejectConsumer(long uniqueConsumerID) { + } + /** * Returns the messageID. *
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index d241958212..08953a29ce 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -81,6 +81,8 @@ public class AMQPMessage extends RefCountMessage { private long scheduledTime = -1; private String connectionID; + Set rejectedConsumers; + public AMQPMessage(long messageFormat, byte[] data) { this.data = Unpooled.wrappedBuffer(data); this.messageFormat = messageFormat; @@ -323,6 +325,26 @@ public class AMQPMessage extends RefCountMessage { return AMQPMessagePersister.getInstance(); } + @Override + public synchronized boolean acceptsConsumer(long consumer) { + + if (rejectedConsumers == null) { + return true; + } else { + return !rejectedConsumers.contains(consumer); + } + } + + @Override + public synchronized void rejectConsumer(long consumer) { + if (rejectedConsumers == null) { + rejectedConsumers = new HashSet<>(); + } + + rejectedConsumers.add(consumer); + } + + private synchronized void partialDecode(ByteBuffer buffer) { DecoderImpl decoder = TLSEncode.getDecoder(); decoder.setByteBuffer(buffer); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index fb540a8dbc..69d156bf32 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -30,6 +30,7 @@ import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.postoffice.impl.CompositeAddress; import org.apache.activemq.artemis.core.server.AddressQueryResult; +import org.apache.activemq.artemis.core.server.Consumer; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; @@ -78,7 +79,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr private static final Symbol SHARED = Symbol.valueOf("shared"); private static final Symbol GLOBAL = Symbol.valueOf("global"); - private Object brokerConsumer; + private Consumer brokerConsumer; protected final AMQPSessionContext protonSession; protected final Sender sender; @@ -391,7 +392,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr boolean browseOnly = !multicast && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY); try { - brokerConsumer = sessionSPI.createSender(this, queue, multicast ? null : selector, browseOnly); + brokerConsumer = (Consumer)sessionSPI.createSender(this, queue, multicast ? null : selector, browseOnly); } catch (ActiveMQAMQPResourceLimitExceededException e1) { throw new ActiveMQAMQPResourceLimitExceededException(e1.getMessage()); } catch (Exception e) { @@ -553,6 +554,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } else if (remoteState instanceof Modified) { try { Modified modification = (Modified) remoteState; + + if (Boolean.TRUE.equals(modification.getUndeliverableHere())) { + message.rejectConsumer(((Consumer)brokerConsumer).sequentialID()); + } + if (Boolean.TRUE.equals(modification.getDeliveryFailed())) { sessionSPI.cancel(brokerConsumer, message, true); } else { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java index 58c7d81cec..50c0b01d5f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java @@ -68,4 +68,7 @@ public interface Consumer { * disconnect the consumer */ void disconnect(); + + /** an unique sequential ID for this consumer */ + long sequentialID(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java index fe43532e29..c1a0ccc815 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java @@ -86,6 +86,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled private final UUID nodeUUID; + private final long sequentialID; + private final SimpleString name; private final Queue queue; @@ -170,6 +172,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled final String password, final StorageManager storageManager) { + this.sequentialID = storageManager.generateID(); + this.reconnectAttempts = reconnectAttempts; this.reconnectAttemptsInUse = initialConnectAttempts; @@ -244,6 +248,11 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled this.notificationService = notificationService; } + @Override + public long sequentialID() { + return sequentialID; + } + @Override public synchronized void start() throws Exception { if (started) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java index 26399dc048..eff8d67689 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java @@ -52,6 +52,8 @@ public class Redistributor implements Consumer { private int count; + private final long sequentialID; + // a Flush executor here is happening inside another executor. // what may cause issues under load. Say you are running out of executors for cases where you don't need to wait at all. // So, instead of using a future we will use a plain ReusableLatch here @@ -64,6 +66,8 @@ public class Redistributor implements Consumer { final int batchSize) { this.queue = queue; + this.sequentialID = storageManager.generateID(); + this.storageManager = storageManager; this.postOffice = postOffice; @@ -73,6 +77,11 @@ public class Redistributor implements Consumer { this.batchSize = batchSize; } + @Override + public long sequentialID() { + return sequentialID; + } + @Override public Filter getFilter() { return null; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 3552b93096..9e33602078 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -77,6 +77,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { private final long id; + private final long sequentialID; + protected final Queue messageQueue; private final Filter filter; @@ -180,6 +182,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { final ActiveMQServer server) throws Exception { this.id = id; + this.sequentialID = server.getStorageManager().generateID(); + this.filter = filter; this.session = session; @@ -232,6 +236,12 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { // ServerConsumer implementation // ---------------------------------------------------------------------- + + @Override + public long sequentialID() { + return sequentialID; + } + @Override public Object getProtocolData() { return protocolData; @@ -343,6 +353,10 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } final Message message = ref.getMessage(); + if (!message.acceptsConsumer(sequentialID())) { + return HandleStatus.NO_MATCH; + } + if (filter != null && !filter.match(message)) { if (logger.isTraceEnabled()) { logger.trace("Reference " + ref + " is a noMatch on consumer " + this); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java index 78b3e091f9..968c31b465 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java @@ -58,6 +58,11 @@ public class DummyServerConsumer implements ServerConsumer { } + @Override + public long sequentialID() { + return 0; + } + @Override public Object getProtocolContext() { return null; diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java index 665686c402..1db8347920 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java @@ -82,6 +82,11 @@ public class FakeConsumer implements Consumer { delayCountdown = numReferences; } + @Override + public long sequentialID() { + return 0; + } + public synchronized List getReferences() { return references; }