ARTEMIS-1081 Implementing AMQP UndeliverableHere

This commit is contained in:
Clebert Suconic 2017-03-28 17:59:41 -04:00 committed by Justin Bertram
parent 746220e11e
commit 1f4473e8d7
9 changed files with 82 additions and 2 deletions

View File

@ -251,6 +251,13 @@ public interface Message {
/** It will generate a new instance of the message encode, being a deep copy, new properties, new everything */ /** It will generate a new instance of the message encode, being a deep copy, new properties, new everything */
Message copy(long newID); Message copy(long newID);
default boolean acceptsConsumer(long uniqueConsumerID) {
return true;
}
default void rejectConsumer(long uniqueConsumerID) {
}
/** /**
* Returns the messageID. * Returns the messageID.
* <br> * <br>

View File

@ -81,6 +81,8 @@ public class AMQPMessage extends RefCountMessage {
private long scheduledTime = -1; private long scheduledTime = -1;
private String connectionID; private String connectionID;
Set<Object> rejectedConsumers;
public AMQPMessage(long messageFormat, byte[] data) { public AMQPMessage(long messageFormat, byte[] data) {
this.data = Unpooled.wrappedBuffer(data); this.data = Unpooled.wrappedBuffer(data);
this.messageFormat = messageFormat; this.messageFormat = messageFormat;
@ -323,6 +325,26 @@ public class AMQPMessage extends RefCountMessage {
return AMQPMessagePersister.getInstance(); return AMQPMessagePersister.getInstance();
} }
@Override
public synchronized boolean acceptsConsumer(long consumer) {
if (rejectedConsumers == null) {
return true;
} else {
return !rejectedConsumers.contains(consumer);
}
}
@Override
public synchronized void rejectConsumer(long consumer) {
if (rejectedConsumers == null) {
rejectedConsumers = new HashSet<>();
}
rejectedConsumers.add(consumer);
}
private synchronized void partialDecode(ByteBuffer buffer) { private synchronized void partialDecode(ByteBuffer buffer) {
DecoderImpl decoder = TLSEncode.getDecoder(); DecoderImpl decoder = TLSEncode.getDecoder();
decoder.setByteBuffer(buffer); decoder.setByteBuffer(buffer);

View File

@ -30,6 +30,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.impl.CompositeAddress; import org.apache.activemq.artemis.core.postoffice.impl.CompositeAddress;
import org.apache.activemq.artemis.core.server.AddressQueryResult; import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
@ -78,7 +79,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
private static final Symbol SHARED = Symbol.valueOf("shared"); private static final Symbol SHARED = Symbol.valueOf("shared");
private static final Symbol GLOBAL = Symbol.valueOf("global"); private static final Symbol GLOBAL = Symbol.valueOf("global");
private Object brokerConsumer; private Consumer brokerConsumer;
protected final AMQPSessionContext protonSession; protected final AMQPSessionContext protonSession;
protected final Sender sender; protected final Sender sender;
@ -391,7 +392,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
boolean browseOnly = !multicast && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY); boolean browseOnly = !multicast && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY);
try { try {
brokerConsumer = sessionSPI.createSender(this, queue, multicast ? null : selector, browseOnly); brokerConsumer = (Consumer)sessionSPI.createSender(this, queue, multicast ? null : selector, browseOnly);
} catch (ActiveMQAMQPResourceLimitExceededException e1) { } catch (ActiveMQAMQPResourceLimitExceededException e1) {
throw new ActiveMQAMQPResourceLimitExceededException(e1.getMessage()); throw new ActiveMQAMQPResourceLimitExceededException(e1.getMessage());
} catch (Exception e) { } catch (Exception e) {
@ -553,6 +554,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} else if (remoteState instanceof Modified) { } else if (remoteState instanceof Modified) {
try { try {
Modified modification = (Modified) remoteState; Modified modification = (Modified) remoteState;
if (Boolean.TRUE.equals(modification.getUndeliverableHere())) {
message.rejectConsumer(((Consumer)brokerConsumer).sequentialID());
}
if (Boolean.TRUE.equals(modification.getDeliveryFailed())) { if (Boolean.TRUE.equals(modification.getDeliveryFailed())) {
sessionSPI.cancel(brokerConsumer, message, true); sessionSPI.cancel(brokerConsumer, message, true);
} else { } else {

View File

@ -68,4 +68,7 @@ public interface Consumer {
* disconnect the consumer * disconnect the consumer
*/ */
void disconnect(); void disconnect();
/** an unique sequential ID for this consumer */
long sequentialID();
} }

View File

@ -86,6 +86,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
private final UUID nodeUUID; private final UUID nodeUUID;
private final long sequentialID;
private final SimpleString name; private final SimpleString name;
private final Queue queue; private final Queue queue;
@ -170,6 +172,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
final String password, final String password,
final StorageManager storageManager) { final StorageManager storageManager) {
this.sequentialID = storageManager.generateID();
this.reconnectAttempts = reconnectAttempts; this.reconnectAttempts = reconnectAttempts;
this.reconnectAttemptsInUse = initialConnectAttempts; this.reconnectAttemptsInUse = initialConnectAttempts;
@ -244,6 +248,11 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
this.notificationService = notificationService; this.notificationService = notificationService;
} }
@Override
public long sequentialID() {
return sequentialID;
}
@Override @Override
public synchronized void start() throws Exception { public synchronized void start() throws Exception {
if (started) { if (started) {

View File

@ -52,6 +52,8 @@ public class Redistributor implements Consumer {
private int count; private int count;
private final long sequentialID;
// a Flush executor here is happening inside another executor. // a Flush executor here is happening inside another executor.
// what may cause issues under load. Say you are running out of executors for cases where you don't need to wait at all. // what may cause issues under load. Say you are running out of executors for cases where you don't need to wait at all.
// So, instead of using a future we will use a plain ReusableLatch here // So, instead of using a future we will use a plain ReusableLatch here
@ -64,6 +66,8 @@ public class Redistributor implements Consumer {
final int batchSize) { final int batchSize) {
this.queue = queue; this.queue = queue;
this.sequentialID = storageManager.generateID();
this.storageManager = storageManager; this.storageManager = storageManager;
this.postOffice = postOffice; this.postOffice = postOffice;
@ -73,6 +77,11 @@ public class Redistributor implements Consumer {
this.batchSize = batchSize; this.batchSize = batchSize;
} }
@Override
public long sequentialID() {
return sequentialID;
}
@Override @Override
public Filter getFilter() { public Filter getFilter() {
return null; return null;

View File

@ -77,6 +77,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
private final long id; private final long id;
private final long sequentialID;
protected final Queue messageQueue; protected final Queue messageQueue;
private final Filter filter; private final Filter filter;
@ -180,6 +182,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
final ActiveMQServer server) throws Exception { final ActiveMQServer server) throws Exception {
this.id = id; this.id = id;
this.sequentialID = server.getStorageManager().generateID();
this.filter = filter; this.filter = filter;
this.session = session; this.session = session;
@ -232,6 +236,12 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
// ServerConsumer implementation // ServerConsumer implementation
// ---------------------------------------------------------------------- // ----------------------------------------------------------------------
@Override
public long sequentialID() {
return sequentialID;
}
@Override @Override
public Object getProtocolData() { public Object getProtocolData() {
return protocolData; return protocolData;
@ -343,6 +353,10 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
} }
final Message message = ref.getMessage(); final Message message = ref.getMessage();
if (!message.acceptsConsumer(sequentialID())) {
return HandleStatus.NO_MATCH;
}
if (filter != null && !filter.match(message)) { if (filter != null && !filter.match(message)) {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Reference " + ref + " is a noMatch on consumer " + this); logger.trace("Reference " + ref + " is a noMatch on consumer " + this);

View File

@ -58,6 +58,11 @@ public class DummyServerConsumer implements ServerConsumer {
} }
@Override
public long sequentialID() {
return 0;
}
@Override @Override
public Object getProtocolContext() { public Object getProtocolContext() {
return null; return null;

View File

@ -82,6 +82,11 @@ public class FakeConsumer implements Consumer {
delayCountdown = numReferences; delayCountdown = numReferences;
} }
@Override
public long sequentialID() {
return 0;
}
public synchronized List<MessageReference> getReferences() { public synchronized List<MessageReference> getReferences() {
return references; return references;
} }