mirror of
https://github.com/apache/activemq-artemis.git
synced 2025-02-21 01:15:50 +00:00
This closes #1147
This commit is contained in:
commit
056a88dfca
@ -251,6 +251,13 @@ public interface Message {
|
||||
/** It will generate a new instance of the message encode, being a deep copy, new properties, new everything */
|
||||
Message copy(long newID);
|
||||
|
||||
default boolean acceptsConsumer(long uniqueConsumerID) {
|
||||
return true;
|
||||
}
|
||||
|
||||
default void rejectConsumer(long uniqueConsumerID) {
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the messageID.
|
||||
* <br>
|
||||
|
@ -81,6 +81,8 @@ public class AMQPMessage extends RefCountMessage {
|
||||
private long scheduledTime = -1;
|
||||
private String connectionID;
|
||||
|
||||
Set<Object> rejectedConsumers;
|
||||
|
||||
public AMQPMessage(long messageFormat, byte[] data) {
|
||||
this.data = Unpooled.wrappedBuffer(data);
|
||||
this.messageFormat = messageFormat;
|
||||
@ -323,6 +325,26 @@ public class AMQPMessage extends RefCountMessage {
|
||||
return AMQPMessagePersister.getInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized boolean acceptsConsumer(long consumer) {
|
||||
|
||||
if (rejectedConsumers == null) {
|
||||
return true;
|
||||
} else {
|
||||
return !rejectedConsumers.contains(consumer);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void rejectConsumer(long consumer) {
|
||||
if (rejectedConsumers == null) {
|
||||
rejectedConsumers = new HashSet<>();
|
||||
}
|
||||
|
||||
rejectedConsumers.add(consumer);
|
||||
}
|
||||
|
||||
|
||||
private synchronized void partialDecode(ByteBuffer buffer) {
|
||||
DecoderImpl decoder = TLSEncode.getDecoder();
|
||||
decoder.setByteBuffer(buffer);
|
||||
|
@ -336,6 +336,15 @@ public class AMQPSessionCallback implements SessionCallback {
|
||||
}
|
||||
}
|
||||
|
||||
public void reject(Object brokerConsumer, Message message) throws Exception {
|
||||
recoverContext();
|
||||
try {
|
||||
((ServerConsumer) brokerConsumer).reject(message.getMessageID());
|
||||
} finally {
|
||||
resetContext();
|
||||
}
|
||||
}
|
||||
|
||||
public void resumeDelivery(Object consumer) {
|
||||
((ServerConsumer) consumer).receiveCredits(-1);
|
||||
}
|
||||
@ -451,7 +460,7 @@ public class AMQPSessionCallback implements SessionCallback {
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (connection.getLock()) {
|
||||
if (receiver.getRemoteCredit() < threshold) {
|
||||
if (receiver.getRemoteCredit() <= threshold) {
|
||||
receiver.flow(credits);
|
||||
connection.flush();
|
||||
}
|
||||
|
@ -30,6 +30,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.postoffice.impl.CompositeAddress;
|
||||
import org.apache.activemq.artemis.core.server.AddressQueryResult;
|
||||
import org.apache.activemq.artemis.core.server.Consumer;
|
||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||
import org.apache.activemq.artemis.core.server.QueueQueryResult;
|
||||
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
|
||||
@ -78,7 +79,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
||||
private static final Symbol SHARED = Symbol.valueOf("shared");
|
||||
private static final Symbol GLOBAL = Symbol.valueOf("global");
|
||||
|
||||
private Object brokerConsumer;
|
||||
private Consumer brokerConsumer;
|
||||
|
||||
protected final AMQPSessionContext protonSession;
|
||||
protected final Sender sender;
|
||||
@ -391,7 +392,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
||||
|
||||
boolean browseOnly = !multicast && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY);
|
||||
try {
|
||||
brokerConsumer = sessionSPI.createSender(this, queue, multicast ? null : selector, browseOnly);
|
||||
brokerConsumer = (Consumer)sessionSPI.createSender(this, queue, multicast ? null : selector, browseOnly);
|
||||
} catch (ActiveMQAMQPResourceLimitExceededException e1) {
|
||||
throw new ActiveMQAMQPResourceLimitExceededException(e1.getMessage());
|
||||
} catch (Exception e) {
|
||||
@ -546,13 +547,18 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
||||
}
|
||||
} else if (remoteState instanceof Rejected) {
|
||||
try {
|
||||
sessionSPI.cancel(brokerConsumer, message, true);
|
||||
sessionSPI.reject(brokerConsumer, message);
|
||||
} catch (Exception e) {
|
||||
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
|
||||
}
|
||||
} else if (remoteState instanceof Modified) {
|
||||
try {
|
||||
Modified modification = (Modified) remoteState;
|
||||
|
||||
if (Boolean.TRUE.equals(modification.getUndeliverableHere())) {
|
||||
message.rejectConsumer(((Consumer)brokerConsumer).sequentialID());
|
||||
}
|
||||
|
||||
if (Boolean.TRUE.equals(modification.getDeliveryFailed())) {
|
||||
sessionSPI.cancel(brokerConsumer, message, true);
|
||||
} else {
|
||||
|
@ -68,4 +68,7 @@ public interface Consumer {
|
||||
* disconnect the consumer
|
||||
*/
|
||||
void disconnect();
|
||||
|
||||
/** an unique sequential ID for this consumer */
|
||||
long sequentialID();
|
||||
}
|
||||
|
@ -92,6 +92,8 @@ public interface ServerConsumer extends Consumer {
|
||||
|
||||
void individualAcknowledge(Transaction tx, long messageID) throws Exception;
|
||||
|
||||
void reject(final long messageID) throws Exception;
|
||||
|
||||
void individualCancel(final long messageID, boolean failed) throws Exception;
|
||||
|
||||
void forceDelivery(long sequence);
|
||||
|
@ -86,6 +86,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
|
||||
|
||||
private final UUID nodeUUID;
|
||||
|
||||
private final long sequentialID;
|
||||
|
||||
private final SimpleString name;
|
||||
|
||||
private final Queue queue;
|
||||
@ -170,6 +172,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
|
||||
final String password,
|
||||
final StorageManager storageManager) {
|
||||
|
||||
this.sequentialID = storageManager.generateID();
|
||||
|
||||
this.reconnectAttempts = reconnectAttempts;
|
||||
|
||||
this.reconnectAttemptsInUse = initialConnectAttempts;
|
||||
@ -244,6 +248,11 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
|
||||
this.notificationService = notificationService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long sequentialID() {
|
||||
return sequentialID;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void start() throws Exception {
|
||||
if (started) {
|
||||
|
@ -52,6 +52,8 @@ public class Redistributor implements Consumer {
|
||||
|
||||
private int count;
|
||||
|
||||
private final long sequentialID;
|
||||
|
||||
// a Flush executor here is happening inside another executor.
|
||||
// what may cause issues under load. Say you are running out of executors for cases where you don't need to wait at all.
|
||||
// So, instead of using a future we will use a plain ReusableLatch here
|
||||
@ -64,6 +66,8 @@ public class Redistributor implements Consumer {
|
||||
final int batchSize) {
|
||||
this.queue = queue;
|
||||
|
||||
this.sequentialID = storageManager.generateID();
|
||||
|
||||
this.storageManager = storageManager;
|
||||
|
||||
this.postOffice = postOffice;
|
||||
@ -73,6 +77,11 @@ public class Redistributor implements Consumer {
|
||||
this.batchSize = batchSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long sequentialID() {
|
||||
return sequentialID;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Filter getFilter() {
|
||||
return null;
|
||||
|
@ -77,6 +77,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
||||
|
||||
private final long id;
|
||||
|
||||
private final long sequentialID;
|
||||
|
||||
protected final Queue messageQueue;
|
||||
|
||||
private final Filter filter;
|
||||
@ -180,6 +182,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
||||
final ActiveMQServer server) throws Exception {
|
||||
this.id = id;
|
||||
|
||||
this.sequentialID = server.getStorageManager().generateID();
|
||||
|
||||
this.filter = filter;
|
||||
|
||||
this.session = session;
|
||||
@ -232,6 +236,12 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
||||
// ServerConsumer implementation
|
||||
// ----------------------------------------------------------------------
|
||||
|
||||
|
||||
@Override
|
||||
public long sequentialID() {
|
||||
return sequentialID;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getProtocolData() {
|
||||
return protocolData;
|
||||
@ -343,6 +353,10 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
||||
}
|
||||
final Message message = ref.getMessage();
|
||||
|
||||
if (!message.acceptsConsumer(sequentialID())) {
|
||||
return HandleStatus.NO_MATCH;
|
||||
}
|
||||
|
||||
if (filter != null && !filter.match(message)) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Reference " + ref + " is a noMatch on consumer " + this);
|
||||
@ -910,6 +924,22 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
||||
ref.getQueue().cancel(ref, System.currentTimeMillis());
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public synchronized void reject(final long messageID) throws Exception {
|
||||
if (browseOnly) {
|
||||
return;
|
||||
}
|
||||
|
||||
MessageReference ref = removeReferenceByID(messageID);
|
||||
|
||||
if (ref == null) {
|
||||
return; // nothing to be done
|
||||
}
|
||||
|
||||
ref.getQueue().sendToDeadLetterAddress(null, ref);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void backToDelivering(MessageReference reference) {
|
||||
deliveringRefs.addFirst(reference);
|
||||
|
@ -97,13 +97,13 @@ public class AmqpReceiverDispositionTest extends AmqpClientTestSupport {
|
||||
// Reject is a terminal outcome and should not be redelivered to the rejecting receiver
|
||||
// or any other as it should move to the archived state.
|
||||
receiver1.flow(1);
|
||||
message = receiver1.receive(1, TimeUnit.SECONDS);
|
||||
message = receiver1.receiveNoWait();
|
||||
assertNull("Should not receive message again", message);
|
||||
|
||||
// Attempt to Read the message again with another receiver to validate it is archived.
|
||||
AmqpReceiver receiver2 = session.createReceiver(getTestName());
|
||||
receiver2.flow(1);
|
||||
assertNull(receiver2.receive(3, TimeUnit.SECONDS));
|
||||
assertNull(receiver2.receiveNoWait());
|
||||
|
||||
connection.close();
|
||||
}
|
||||
|
@ -41,7 +41,6 @@ import javax.jms.TopicSession;
|
||||
import javax.jms.TopicSubscriber;
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.lang.reflect.Field;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
@ -74,7 +73,6 @@ import org.apache.activemq.artemis.protocol.amqp.client.AMQPClientConnectionFact
|
||||
import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientConnectionManager;
|
||||
import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager;
|
||||
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
|
||||
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
import org.apache.activemq.artemis.tests.util.Wait;
|
||||
import org.apache.activemq.artemis.utils.ByteUtil;
|
||||
@ -128,6 +126,7 @@ public class ProtonTest extends ProtonTestBase {
|
||||
return Arrays.asList(new Object[][]{{"AMQP", 0}, {"AMQP_ANONYMOUS", 3}});
|
||||
}
|
||||
|
||||
|
||||
ConnectionFactory factory;
|
||||
|
||||
private final int protocol;
|
||||
@ -146,6 +145,14 @@ public class ProtonTest extends ProtonTestBase {
|
||||
private final String address;
|
||||
private Connection connection;
|
||||
|
||||
|
||||
@Override
|
||||
protected ActiveMQServer createAMQPServer(int port) throws Exception {
|
||||
ActiveMQServer server = super.createAMQPServer(port);
|
||||
server.getConfiguration().addAcceptorConfiguration("flow", "tcp://localhost:" + (8 + port) + "?protocols=AMQP;useEpoll=false;amqpCredits=1;amqpMinCredits=1");
|
||||
return server;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
@ -418,14 +425,9 @@ public class ProtonTest extends ProtonTestBase {
|
||||
|
||||
@Test
|
||||
public void testCreditsAreAllocatedOnlyOnceOnLinkCreate() throws Exception {
|
||||
// Only allow 1 credit to be submitted at a time.
|
||||
Field maxCreditAllocation = ProtonServerReceiverContext.class.getDeclaredField("maxCreditAllocation");
|
||||
maxCreditAllocation.setAccessible(true);
|
||||
int originalMaxCreditAllocation = maxCreditAllocation.getInt(null);
|
||||
maxCreditAllocation.setInt(null, 1);
|
||||
|
||||
String destinationAddress = address + 1;
|
||||
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
|
||||
AmqpClient client = new AmqpClient(new URI("tcp://localhost:5680"), userName, password);
|
||||
AmqpConnection amqpConnection = client.connect();
|
||||
try {
|
||||
AmqpSession session = amqpConnection.createSession();
|
||||
@ -433,7 +435,6 @@ public class ProtonTest extends ProtonTestBase {
|
||||
assertTrue(sender.getSender().getCredit() == 1);
|
||||
} finally {
|
||||
amqpConnection.close();
|
||||
maxCreditAllocation.setInt(null, originalMaxCreditAllocation);
|
||||
}
|
||||
}
|
||||
|
||||
@ -609,18 +610,13 @@ public class ProtonTest extends ProtonTestBase {
|
||||
assertTrue(addressSize >= maxSizeBytesRejectThreshold);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test(timeout = 10000)
|
||||
public void testCreditsAreNotAllocatedWhenAddressIsFull() throws Exception {
|
||||
setAddressFullBlockPolicy();
|
||||
|
||||
// Only allow 1 credit to be submitted at a time.
|
||||
Field maxCreditAllocation = ProtonServerReceiverContext.class.getDeclaredField("maxCreditAllocation");
|
||||
maxCreditAllocation.setAccessible(true);
|
||||
int originalMaxCreditAllocation = maxCreditAllocation.getInt(null);
|
||||
maxCreditAllocation.setInt(null, 1);
|
||||
|
||||
String destinationAddress = address + 1;
|
||||
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
|
||||
AmqpClient client = new AmqpClient(new URI("tcp://localhost:5680"), userName, password);
|
||||
AmqpConnection amqpConnection = client.connect();
|
||||
try {
|
||||
AmqpSession session = amqpConnection.createSession();
|
||||
@ -637,7 +633,6 @@ public class ProtonTest extends ProtonTestBase {
|
||||
assertTrue(addressSize >= maxSizeBytes && addressSize <= maxSizeBytesRejectThreshold);
|
||||
} finally {
|
||||
amqpConnection.close();
|
||||
maxCreditAllocation.setInt(null, originalMaxCreditAllocation);
|
||||
}
|
||||
}
|
||||
|
||||
@ -771,6 +766,7 @@ public class ProtonTest extends ProtonTestBase {
|
||||
try {
|
||||
for (int i = 0; i < maxMessages; i++) {
|
||||
sender.send(message);
|
||||
System.out.println("Sent " + i);
|
||||
sentMessages.getAndIncrement();
|
||||
}
|
||||
timeout.countDown();
|
||||
@ -781,13 +777,20 @@ public class ProtonTest extends ProtonTestBase {
|
||||
};
|
||||
|
||||
Thread t = new Thread(sendMessages);
|
||||
t.start();
|
||||
|
||||
timeout.await(5, TimeUnit.SECONDS);
|
||||
try {
|
||||
t.start();
|
||||
|
||||
messagesSent = sentMessages.get();
|
||||
if (errors[0] != null) {
|
||||
throw errors[0];
|
||||
timeout.await(1, TimeUnit.SECONDS);
|
||||
|
||||
messagesSent = sentMessages.get();
|
||||
if (errors[0] != null) {
|
||||
throw errors[0];
|
||||
}
|
||||
} finally {
|
||||
t.interrupt();
|
||||
t.join(1000);
|
||||
Assert.assertFalse(t.isAlive());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -16,7 +16,6 @@
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.amqp;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
@ -54,9 +53,12 @@ public class ProtonTestBase extends ActiveMQTestBase {
|
||||
params.put(TransportConstants.PROTOCOLS_PROP_NAME, "AMQP");
|
||||
HashMap<String, Object> amqpParams = new HashMap<>();
|
||||
configureAmqp(amqpParams);
|
||||
|
||||
amqpServer.getConfiguration().getAcceptorConfigurations().clear();
|
||||
|
||||
TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, "amqp-acceptor", amqpParams);
|
||||
|
||||
amqpServer.getConfiguration().setAcceptorConfigurations(Collections.singleton(transportConfiguration));
|
||||
amqpServer.getConfiguration().getAcceptorConfigurations().add(transportConfiguration);
|
||||
amqpServer.getConfiguration().setName(brokerName);
|
||||
amqpServer.getConfiguration().setJournalDirectory(amqpServer.getConfiguration().getJournalDirectory() + port);
|
||||
amqpServer.getConfiguration().setBindingsDirectory(amqpServer.getConfiguration().getBindingsDirectory() + port);
|
||||
|
@ -58,6 +58,11 @@ public class DummyServerConsumer implements ServerConsumer {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public long sequentialID() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getProtocolContext() {
|
||||
return null;
|
||||
@ -122,6 +127,11 @@ public class DummyServerConsumer implements ServerConsumer {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reject(long messageID) throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void acknowledge(Transaction tx, long messageID) throws Exception {
|
||||
|
||||
|
@ -82,6 +82,11 @@ public class FakeConsumer implements Consumer {
|
||||
delayCountdown = numReferences;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long sequentialID() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
public synchronized List<MessageReference> getReferences() {
|
||||
return references;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user