ARTEMIS-2007 - allow redistribution if there are unmatched messages pending on a queue and there is new remote demand

This commit is contained in:
gtully 2021-06-24 16:24:48 +01:00 committed by Gary Tully
parent 13df6a8fb9
commit 224b89810d
10 changed files with 594 additions and 122 deletions

View File

@ -211,8 +211,10 @@ public class MQTTSubscriptionManager {
Set<Consumer> queueConsumers;
if (queue != null && (queueConsumers = (Set<Consumer>) queue.getConsumers()) != null) {
for (Consumer consumer : queueConsumers) {
((ServerConsumer) consumer).close(false);
consumerQoSLevels.remove(((ServerConsumer) consumer).getID());
if (consumer instanceof ServerConsumer) {
((ServerConsumer) consumer).close(false);
consumerQoSLevels.remove(((ServerConsumer) consumer).getID());
}
}
}
}

View File

@ -1481,7 +1481,7 @@ public interface ActiveMQServerLogger extends BasicLogger {
void unableToFlushDeliveries(@Cause Exception e);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222237, value = "Unable to flush deliveries", format = Message.Format.MESSAGE_FORMAT)
@Message(id = 222237, value = "Unable to stop redistributor", format = Message.Format.MESSAGE_FORMAT)
void unableToCancelRedistributor(@Cause Exception e);
@LogMessage(level = Logger.Level.WARN)

View File

@ -380,7 +380,7 @@ public interface Queue extends Bindable,CriticalComponent {
void addRedistributor(long delay);
void cancelRedistributor() throws Exception;
void cancelRedistributor();
boolean hasMatchingConsumer(Message message);

View File

@ -113,7 +113,6 @@ import org.apache.activemq.artemis.utils.collections.NodeStore;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.apache.activemq.artemis.utils.collections.PriorityLinkedList;
import org.apache.activemq.artemis.utils.collections.PriorityLinkedListImpl;
import org.apache.activemq.artemis.utils.collections.SingletonIterator;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl;
import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
@ -188,6 +187,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private boolean mirrorController;
private volatile boolean hasUnMatchedPending = false;
// Messages will first enter intermediateMessageReferences
// Before they are added to messageReferences
// This is to avoid locking the queue on the producer
@ -260,7 +261,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private final SimpleString address;
private ConsumerHolder<Redistributor> redistributor;
// redistributor goes in the consumers list, this signals its presence and allows for easy comparison/check
private volatile ConsumerHolder<Redistributor> redistributor;
private ScheduledFuture<?> redistributorFuture;
@ -1326,12 +1328,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
getExecutor().execute(new Runnable() {
@Override
public void run() {
try {
cancelRedistributor();
} catch (Exception e) {
// nothing that could be done anyway.. just logging
ActiveMQServerLogger.LOGGER.unableToCancelRedistributor(e);
}
cancelRedistributor();
}
});
@ -1391,7 +1388,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return false;
}
if (consumers.size() >= consumersBeforeDispatch) {
if (getConsumerCount() >= consumersBeforeDispatch) {
if (dispatchingUpdater.compareAndSet(this, BooleanUtil.toInt(false), BooleanUtil.toInt(true))) {
dispatchStartTimeUpdater.set(this, System.currentTimeMillis());
}
@ -1419,7 +1416,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
try (ArtemisCloseable metric = measureCritical(CRITICAL_CONSUMER)) {
synchronized (this) {
if (maxConsumers != MAX_CONSUMERS_UNLIMITED && consumers.size() >= maxConsumers) {
if (maxConsumers != MAX_CONSUMERS_UNLIMITED && getConsumerCount() >= maxConsumers) {
throw ActiveMQMessageBundle.BUNDLE.maxConsumerLimitReachedForQueue(address, name);
}
@ -1488,7 +1485,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (consumerRemoved) {
consumerRemovedTimestampUpdater.set(this, System.currentTimeMillis());
if (consumers.size() == 0) {
if (getConsumerCount() == 0) {
stopDispatch();
}
}
@ -1525,11 +1522,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
supports = false;
}
}
if (redistributor != null) {
if (!redistributor.consumer.supportsDirectDelivery()) {
supports = false;
}
}
return supports;
}
@ -1540,10 +1532,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (redistributor != null) {
// Just prompt delivery
deliverAsync();
return;
}
if (delay > 0) {
if (consumers.isEmpty()) {
if (consumers.isEmpty() || hasUnMatchedPending) {
DelayedAddRedistributor dar = new DelayedAddRedistributor(executor);
redistributorFuture = scheduledExecutor.schedule(dar, delay, TimeUnit.MILLISECONDS);
@ -1562,17 +1555,34 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
@Override
public synchronized void cancelRedistributor() throws Exception {
if (redistributor != null) {
redistributor.consumer.stop();
redistributor = null;
}
public synchronized void cancelRedistributor() {
clearRedistributorFuture();
if (redistributor != null) {
try {
redistributor.consumer.stop();
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.unableToCancelRedistributor(e);
} finally {
consumers.remove(redistributor);
redistributor = null;
}
}
}
@Override
public int getConsumerCount() {
// we don't want to count the redistributor, it is an internal transient entry in the consumer list
if (redistributor != null) {
synchronized (this) {
final int size = consumers.size();
if (size > 0 && redistributor != null) {
return size - 1;
} else {
return size;
}
}
}
return consumers.size();
}
@ -1775,7 +1785,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
public Map<String, List<MessageReference>> getDeliveringMessages() {
final Iterator<ConsumerHolder<? extends Consumer>> consumerHolderIterator;
synchronized (this) {
consumerHolderIterator = redistributor == null ? consumers.iterator() : SingletonIterator.newInstance(redistributor);
consumerHolderIterator = consumers.iterator();
}
Map<String, List<MessageReference>> mapReturn = new HashMap<>();
@ -2767,9 +2777,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
for (ConsumerHolder holder : this.consumers) {
holder.resetIterator();
}
if (redistributor != null) {
redistributor.resetIterator();
}
}
@Override
@ -2972,6 +2979,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
// Either the iterator is empty or the consumer is busy
int noDelivery = 0;
// track filters not matching, used to track when all consumers can't match, redistribution is then an option
int numNoMatch = 0;
int numAttempts = 0;
int handled = 0;
long timeout = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(DELIVERY_TIMEOUT);
@ -2998,9 +3009,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
MessageReference ref;
// filter evaluation or transformation may cause properties to be lazyDecoded, we need to reflect that
int existingMemoryEstimate = 0;
Consumer handledconsumer = null;
synchronized (this) {
@ -3015,14 +3023,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
ConsumerHolder<? extends Consumer> holder;
if (redistributor == null) {
if (consumers.hasNext()) {
holder = consumers.next();
} else {
break;
}
if (consumers.hasNext()) {
holder = consumers.next();
} else {
holder = redistributor;
break;
}
Consumer consumer = holder.consumer;
@ -3032,6 +3036,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
holder.iter = messageReferences.iterator();
}
// LVQ support
ref = nextDelivery();
boolean nextDelivery = false;
if (ref != null) {
@ -3059,7 +3064,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
logger.trace("Queue " + this.getName() + " is delivering reference " + ref);
}
existingMemoryEstimate = ref.getMessageMemoryEstimate();
final SimpleString groupID = extractGroupID(ref);
groupConsumer = getGroupConsumer(groupID);
@ -3067,15 +3071,18 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
consumer = groupConsumer;
}
numAttempts++;
HandleStatus status = handle(ref, consumer);
if (status == HandleStatus.HANDLED) {
// if a message was delivered, any previous negative attemps need to be cleared
// if a message was delivered, any previous negative attempts need to be cleared
// this is to avoid breaks on the loop when checking for any other factors.
noDelivery = 0;
numNoMatch = 0;
numAttempts = 0;
if (redistributor == null) {
if (consumer != redistributor) {
ref = handleMessageGroup(ref, consumer, groupConsumer, groupID);
}
@ -3104,13 +3111,21 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
noDelivery++;
numNoMatch = 0;
numAttempts = 0;
// no consumers.reset() b/c we skip this consumer
} else if (status == HandleStatus.NO_MATCH) {
// nothing to be done on this case, the iterators will just jump next
consumers.reset();
numNoMatch++;
// every attempt resulted in noMatch for number of consumers means we tried all consumers for a single message
if (numNoMatch == numAttempts && numAttempts == consumers.size()) {
hasUnMatchedPending = true;
// one hit of unmatched message is enough, no need to reset counters
}
}
}
if (redistributor != null || groupConsumer != null) {
if (groupConsumer != null) {
if (noDelivery > 0) {
break;
}
@ -3303,15 +3318,14 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
private void internalAddRedistributor(final ArtemisExecutor executor) {
// create the redistributor only once if there are no local consumers
if (consumers.isEmpty() && redistributor == null) {
if (redistributor == null && (consumers.isEmpty() || hasUnMatchedPending)) {
if (logger.isTraceEnabled()) {
logger.trace("QueueImpl::Adding redistributor on queue " + this.toString());
}
redistributor = (new ConsumerHolder(new Redistributor(this, storageManager, postOffice, executor, QueueImpl.REDISTRIBUTOR_BATCH_SIZE)));
redistributor = new ConsumerHolder(new Redistributor(this, storageManager, postOffice, executor, QueueImpl.REDISTRIBUTOR_BATCH_SIZE));
redistributor.consumer.start();
consumers.add(redistributor);
hasUnMatchedPending = false;
deliverAsync();
}
@ -3749,9 +3763,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
consumers.reset();
while (consumers.hasNext() || redistributor != null) {
while (consumers.hasNext()) {
ConsumerHolder<? extends Consumer> holder = redistributor == null ? consumers.next() : redistributor;
ConsumerHolder<? extends Consumer> holder = consumers.next();
Consumer consumer = holder.consumer;
final SimpleString groupID = extractGroupID(ref);
@ -3764,7 +3778,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
HandleStatus status = handle(ref, consumer);
if (status == HandleStatus.HANDLED) {
final MessageReference reference;
if (redistributor == null) {
if (consumer != redistributor) {
reference = handleMessageGroup(ref, consumer, groupConsumer, groupID);
} else {
reference = ref;
@ -4527,12 +4541,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
logger.debug(getAddress() + ":" + getName() + " has " + queueMessages + " message(s) and " + getConsumerCount() + " consumer(s) and is receiving messages at a rate of " + queueRate + " msgs/second.");
}
if (consumers.size() == 0) {
final int consumerCount = getConsumerCount();
if (consumerCount == 0) {
logger.debug("There are no consumers, no need to check slow consumer's rate");
return;
} else {
float queueThreshold = thresholdInMsgPerSecond * consumers.size();
float queueThreshold = thresholdInMsgPerSecond * consumerCount;
if (queueRate < queueThreshold && queueMessages < queueThreshold) {
if (logger.isDebugEnabled()) {

View File

@ -1475,7 +1475,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
public void cancelRedistributor() throws Exception {
public void cancelRedistributor() {
}

View File

@ -634,9 +634,7 @@ specified. The following shows all the available configuration options
Keep in mind that this message forwarding/balancing is what we call
"initial distribution." It is different than *redistribution* which
is [discussed below](#message-redistribution). This distinction is
important because redistribution is configured differently and has
unique semantics (e.g. it *does not* support filters (selectors)).
is [discussed below](#message-redistribution).
Default is `ON_DEMAND`.
@ -823,14 +821,14 @@ consumers on the queue the message won't get consumed and we have a
This is where message redistribution comes in. With message
redistribution Apache ActiveMQ Artemis can be configured to automatically
*redistribute* messages from queues which have no consumers back to
*redistribute* messages from queues which have no consumers or consumers
with filters that don't match messages. The messages are re-routed to
other nodes in the cluster which do have matching consumers. To enable
this functionality `message-load-balancing` must be `ON_DEMAND`.
Message redistribution can be configured to kick in immediately after
the last consumer on a queue is closed, or to wait a configurable delay
after the last consumer on a queue is closed before redistributing. By
default message redistribution is disabled.
the need to redistribute is detected, or to wait a configurable delay before redistributing.
By default, message redistribution is disabled.
Message redistribution can be configured on a per address basis, by
specifying the redistribution delay in the address settings. For more
@ -855,68 +853,15 @@ The attribute `match` can be an exact match or it can be a string that
conforms to the Apache ActiveMQ Artemis wildcard syntax (described in [Wildcard Syntax](wildcard-syntax.md)).
The element `redistribution-delay` defines the delay in milliseconds
after the last consumer is closed on a queue before redistributing
messages from that queue to other nodes of the cluster which do have
matching consumers. A delay of zero means the messages will be
immediately redistributed. A value of `-1` signifies that messages will
never be redistributed. The default value is `-1`.
between detecting the need for redistribution and actually attempting redistribution.
A delay of zero means the messages will be immediately redistributed.
A value of `-1` signifies that messages will never be redistributed. The default value is `-1`.
It often makes sense to introduce a delay before redistributing as it's
a common case that a consumer closes but another one quickly is created
on the same queue, in such a case you probably don't want to
redistribute immediately since the new consumer will arrive shortly.
#### Redistribution and filters (selectors)
Although "initial distribution" (described above) does support filters
(selectors), redistribution does *not* support filters. Consider this
scenario:
1. A cluster of 2 nodes - `A` and `B` - using a `redistribution-delay` of
`0` and a `message-load-balancing` of `ON_DEMAND`.
1. `A` and `B` each has the queue `foo`.
1. A producer sends a message which is routed to queue `foo` on node `A`.
The message has property named `myProperty` with a value of `10`.
1. A consumer connects to queue `foo` on node `A` with the filter
`myProperty=5`. This filter doesn't match the message.
1. A consumer connects to queue `foo` on node `B` with the filter
`myProperty=10`. This filter *does* match the message .
Despite the fact that the filter of the consumer on queue `foo` on node `B`
matches the message, the message will *not* be redistributed from node `A` to
node `B` because a consumer for the queue exists on node `A`.
Not supporting redistribution based on filters was an explicit design decision
in order to avoid two main problems - queue scanning and unnecessary
redistribution.
From a performance perspective a consumer with a filter on a queue is already
costly due to the scanning that the broker must do on the queue to find
matching messages. In general, this is a bit of an anti-pattern as it turns
the broker into something akin to a database where you can "select" the data
you want using a filter. If brokers are configured in a cluster and a consumer
with a filter connects and no matches are found after scanning the local queue
then potentially every instance of that queue in the cluster would need to be
scanned. This turns into a bit of a scalability nightmare with lots of consumers
(especially short-lived consumers) with filters connecting & disconnecting
frequently. The time & computing resources used for queue scanning would go
through the roof.
It is also possible to get into a pathological situation where short-lived
consumers with filters connect to nodes around the cluster and messages get
redistributed back and forth between nodes without ever actually being consumed.
One common use-case for consumers with filters (selectors) on queues is
request/reply using a correlation ID. Following the standard pattern can be
problematic in a cluster due to the lack of redistribution based on filters
already described. However, there is a simple way to ensure an application
using this request/reply pattern gets its reply even when using a correlation
ID filter in a cluster - create the consumer before the request is sent. This
will ensure that when the reply is sent it will be routed the proper cluster
node since "*initial* distribution" (described above) does support filters.
For example, in the scenario outlined above if steps 3 and 5 were switched
(i.e. if the consumers were created before the message was sent) then the
consumer on node `B` would in fact receive the message.
## Cluster topologies

View File

@ -0,0 +1,406 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.jms.cluster;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.tests.util.JMSClusteredTestBase;
import org.junit.Before;
import org.junit.Test;
public class SelectorRedistributionClusterTest extends JMSClusteredTestBase {
private final String myQueue = "myQueue";
@Override
@Before
public void setUp() throws Exception {
super.setUp();
jmsServer1.getActiveMQServer().setIdentity("Server 1");
jmsServer2.getActiveMQServer().setIdentity("Server 2");
}
@Override
protected boolean enablePersistence() {
return true;
}
@Test
public void testSelectorRoutingReDistributionOnNoConsumer() throws Exception {
server1.getAddressSettingsRepository().getMatch("#").setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0);
server2.getAddressSettingsRepository().getMatch("#").setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0);
Connection conn1 = cf1.createConnection();
Connection conn2 = cf2.createConnection();
conn1.start();
conn2.start();
try {
Session session1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Session session2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
javax.jms.Queue jmsQueue = session1.createQueue(myQueue);
MessageProducer prod1 = session1.createProducer(jmsQueue);
prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
TextMessage textMessage = session1.createTextMessage("m1");
textMessage.setIntProperty("N", 10);
// remote demand with a filter in advance of send, so routing sees remote filter match and can ignore the local consumer
MessageConsumer cons2 = session2.createConsumer(jmsQueue, "N = 10");
// local consumer that does not match message
MessageConsumer cons1 = session1.createConsumer(jmsQueue, "N = 0");
// verify cluster notifications have completed before send
waitForBindings(server1, myQueue, false, 1, 1, 4000);
prod1.send(textMessage);
TextMessage received = (TextMessage) cons2.receive(4000);
assertNotNull(received);
assertEquals("m1", received.getText());
// lets check some redistribution back by close with no acknowledge
session2.close();
// consumer on server 1 does not match, redistribution not done yet, message still available to local consumer
session2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
cons2 = session2.createConsumer(jmsQueue, "N = 10");
received = (TextMessage) cons2.receive(4000);
assertNotNull(received);
// have to create consumer matching filter on server1 in advance such that redistribution happens fast
MessageConsumer cons11 = session1.createConsumer(jmsQueue, "N = 10");
// now expect redistribution
session2.close();
// get it from server1
received = (TextMessage) cons11.receive(4000);
assertNotNull(received);
assertEquals("m1", received.getText());
// done
received.acknowledge();
} finally {
conn1.close();
conn2.close();
}
}
@Test
public void testSelectorRoutingNoReDistributionNewMessageSkipsTillLocalClose() throws Exception {
server1.getAddressSettingsRepository().getMatch("#").setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0);
server2.getAddressSettingsRepository().getMatch("#").setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0);
Connection conn1 = cf1.createConnection();
Connection conn2 = cf2.createConnection();
conn1.start();
conn2.start();
try {
Session session1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Session session11 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Session session2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
javax.jms.Queue jmsQueue = session1.createQueue(myQueue);
MessageProducer prod1 = session1.createProducer(jmsQueue);
prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
TextMessage textMessage = session1.createTextMessage("m1");
textMessage.setIntProperty("N", 10);
// remote demand with a filter in advance of send
MessageConsumer cons2 = session2.createConsumer(jmsQueue, "N = 10");
// local consumer that does not match message
MessageConsumer cons1 = session1.createConsumer(jmsQueue, "N = 0");
// local consumer that matches message
MessageConsumer cons11 = session11.createConsumer(jmsQueue, "N = 10");
// verify cluster notifications have completed before send
waitForBindings(server1, myQueue, false, 1, 1, 4000);
prod1.send(textMessage);
TextMessage received = (TextMessage) cons11.receive(4000);
assertNotNull(received);
assertEquals("m1", received.getText());
// lets check some redistribution by close with no acknowledge so session rolls back delivery
session11.close();
// nothing for the existing remote binding
received = (TextMessage) cons2.receiveNoWait();
assertNull(received);
// send a second message, it will get routed to the remote binding
textMessage = session1.createTextMessage("m2");
textMessage.setIntProperty("N", 10);
prod1.send(textMessage);
received = (TextMessage) cons2.receive(4000);
assertNotNull(received);
assertEquals("m2", received.getText());
received.acknowledge();
// release the local consumer such that redistribution kicks in
session1.close();
received = (TextMessage) cons2.receive(4000);
assertNotNull(received);
assertEquals("m1", received.getText());
// done
received.acknowledge();
} finally {
conn1.close();
conn2.close();
}
}
@Test
public void testSelectorRoutingReDistributionDoesNotBlockLocalConsumer() throws Exception {
server1.getAddressSettingsRepository().getMatch("#").setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0);
server2.getAddressSettingsRepository().getMatch("#").setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0);
Connection conn1 = cf1.createConnection();
Connection conn2 = cf2.createConnection();
conn1.start();
conn2.start();
try {
Session session1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Session session11 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Session session2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
javax.jms.Queue jmsQueue = session1.createQueue(myQueue);
MessageProducer prod1 = session1.createProducer(jmsQueue);
prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
TextMessage textMessage = session1.createTextMessage("m1");
textMessage.setIntProperty("N", 10);
// local consumers that does not match message
MessageConsumer cons1_0 = session1.createConsumer(jmsQueue, "N = 0");
MessageConsumer cons1_1 = session1.createConsumer(jmsQueue, "N = 1");
// local consumer that matches message
MessageConsumer cons1_10 = session11.createConsumer(jmsQueue, "N = 10");
prod1.send(textMessage);
TextMessage received = (TextMessage) cons1_10.receive(4000);
assertNotNull(received);
assertEquals("m1", received.getText());
// lets check some redistribution by close with no acknowledge so session rolls back delivery
session11.close();
// remote demand with a filter, consumer moved triggers redistribution event b/c all local consumers don't match
MessageConsumer cons2_10 = session2.createConsumer(jmsQueue, "N = 10");
received = (TextMessage) cons2_10.receive(4000);
assertNotNull(received);
assertEquals("m1", received.getText());
received.acknowledge();
// check local consumers can still get dispatched
textMessage = session1.createTextMessage("m2");
textMessage.setIntProperty("N", 0);
prod1.send(textMessage);
textMessage = session1.createTextMessage("m3");
textMessage.setIntProperty("N", 1);
prod1.send(textMessage);
received = (TextMessage) cons1_0.receive(4000);
assertNotNull(received);
assertEquals("m2", received.getText());
received.acknowledge();
received = (TextMessage) cons1_1.receive(4000);
assertNotNull(received);
assertEquals("m3", received.getText());
received.acknowledge();
// verify redistributor still kicks in too
textMessage = session1.createTextMessage("m4");
textMessage.setIntProperty("N", 10);
prod1.send(textMessage);
received = (TextMessage) cons2_10.receive(4000);
assertNotNull(received);
assertEquals("m4", received.getText());
received.acknowledge();
} finally {
conn1.close();
conn2.close();
}
}
@Test
public void testSelectorRoutingReDistributionOnConsumerMove() throws Exception {
server1.getAddressSettingsRepository().getMatch("#").setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0);
server2.getAddressSettingsRepository().getMatch("#").setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0);
Connection conn1 = cf1.createConnection();
Connection conn2 = cf2.createConnection();
conn1.start();
conn2.start();
try {
Session session1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Session session11 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Session session2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
javax.jms.Queue jmsQueue = session1.createQueue(myQueue);
MessageProducer prod1 = session1.createProducer(jmsQueue);
prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
TextMessage textMessage = session1.createTextMessage("m1");
textMessage.setIntProperty("N", 10);
// local consumers that does not match message
MessageConsumer cons1 = session1.createConsumer(jmsQueue, "N = 0");
MessageConsumer cons12 = session1.createConsumer(jmsQueue, "N = 1");
// local consumer that matches message
MessageConsumer cons111 = session11.createConsumer(jmsQueue, "N = 10");
prod1.send(textMessage);
TextMessage received = (TextMessage) cons111.receive(4000);
assertNotNull(received);
assertEquals("m1", received.getText());
// lets check some redistribution by close with no acknowledge so session rolls back delivery
session11.close();
// remote demand with a filter, consumer moved triggers redistribution event b/c all local consumers don't match
MessageConsumer cons2 = session2.createConsumer(jmsQueue, "N = 10");
received = (TextMessage) cons2.receive(4000);
assertNotNull(received);
assertEquals("m1", received.getText());
received.acknowledge();
} finally {
conn1.close();
conn2.close();
}
}
@Test
public void testSelectorRoutingReDistributionOnLocalNoMatchConsumerCloseNeedsNewRemoteDemand() throws Exception {
server1.getAddressSettingsRepository().getMatch("#").setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0);
server2.getAddressSettingsRepository().getMatch("#").setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0);
Connection conn1 = cf1.createConnection();
Connection conn2 = cf2.createConnection();
conn1.start();
conn2.start();
try {
Session session1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Session session1_n_10 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Session session1_n_1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Session session2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
javax.jms.Queue jmsQueue = session1.createQueue(myQueue);
MessageProducer prod1 = session1.createProducer(jmsQueue);
prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
TextMessage textMessage = session1.createTextMessage("m1");
textMessage.setIntProperty("N", 10);
// remote demand with a filter
MessageConsumer consumer2_n_10 = session2.createConsumer(jmsQueue, "N = 10");
// local consumers that does not match message
MessageConsumer consumer1_n_0 = session1.createConsumer(jmsQueue, "N = 0");
MessageConsumer consumer1_n_1 = session1_n_1.createConsumer(jmsQueue, "N = 1");
// local consumer that matches message
MessageConsumer consumer1_n_10 = session1_n_10.createConsumer(jmsQueue, "N = 10");
// verify cluster notifications have completed before send
waitForBindings(server1, myQueue, false, 1, 1, 4000);
waitForBindings(server1, myQueue, true, 1, 3, 4000);
prod1.send(textMessage);
TextMessage received = (TextMessage) consumer1_n_10.receive(4000);
assertNotNull(received);
assertEquals("m1", received.getText());
// lets prepare some non matching message for redistribution by close with no acknowledge so session rolls back delivery
session1_n_10.close();
// verify no redistribution event yet
assertNull(consumer2_n_10.receiveNoWait());
// local remove consumer event will not trigger redistribution
session1_n_1.close();
// verify no redistribution event yet
assertNull(consumer2_n_10.receiveNoWait());
// force a redistribution event on new remote consumer creation (that won't match in this case), trigger redistribution
MessageConsumer consumer2_n_0 = session2.createConsumer(jmsQueue, "N = 0");
// verify redistribution to remote
received = (TextMessage) consumer2_n_10.receive(4000);
assertNotNull(received);
assertEquals("m1", received.getText());
received.acknowledge();
} finally {
conn1.close();
conn2.close();
}
}
}

View File

@ -439,7 +439,7 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
public void cancelRedistributor() throws Exception {
public void cancelRedistributor() {
// no-op
}

View File

@ -967,6 +967,104 @@ public class QueueImplTest extends ActiveMQTestBase {
Assert.assertEquals(20, queue.getDeliveringCount());
}
@Test
public void testNoMatchConsumersAllowsRedistribution() throws Exception {
QueueImpl queue = getTemporaryQueue();
final int numMessages = 2;
List<MessageReference> refs = new ArrayList<>();
for (int i = 0; i < numMessages; i++) {
MessageReference ref = generateReference(queue, i);
ref.getMessage().putStringProperty("color", "red");
refs.add(ref);
queue.addTail(ref);
}
Assert.assertEquals(numMessages, getMessageCount(queue));
queue.deliverNow();
Assert.assertEquals(numMessages, getMessageCount(queue));
FakeConsumer consumer = new FakeConsumer(FilterImpl.createFilter("color = 'green'"));
queue.addConsumer(consumer);
FakeConsumer consumer2 = new FakeConsumer(FilterImpl.createFilter("color = 'orange'"));
queue.addConsumer(consumer2);
queue.deliverNow();
Assert.assertEquals(0, consumer.getReferences().size());
Assert.assertEquals(0, consumer2.getReferences().size());
// verify redistributor is doing some work....
try {
// should attempt to add due to unmatched
queue.addRedistributor(0);
fail("expect error on attempt to add addRedistributor - npe b/c no storage etc");
} catch (NullPointerException expected) {
}
// verify with odd number as check depends on order/reset/wrap of consumers
FakeConsumer consumer3 = new FakeConsumer(FilterImpl.createFilter("color = 'blue'"));
queue.addConsumer(consumer3);
queue.deliverNow();
Assert.assertEquals(0, consumer.getReferences().size());
Assert.assertEquals(0, consumer2.getReferences().size());
Assert.assertEquals(0, consumer3.getReferences().size());
// verify redistributor is doing some work....
try {
// should attempt to add due to unmatched
queue.addRedistributor(0);
fail("expect error on attempt to add addRedistributor - npe b/c no storage etc");
} catch (NullPointerException expected) {
}
Assert.assertEquals(numMessages, getMessageCount(queue));
}
@Test
public void testNoMatchOn3AllowsRedistribution() throws Exception {
QueueImpl queue = getTemporaryQueue();
int i = 0;
MessageReference ref = generateReference(queue, i++);
ref.getMessage().putStringProperty("color", "red");
queue.addTail(ref);
ref = generateReference(queue, i++);
ref.getMessage().putStringProperty("color", "red");
queue.addTail(ref);
ref = generateReference(queue, i++);
ref.getMessage().putStringProperty("color", "blue");
queue.addTail(ref);
Assert.assertEquals(3, getMessageCount(queue));
FakeConsumer consumerRed = new FakeConsumer(FilterImpl.createFilter("color = 'red'"));
queue.addConsumer(consumerRed);
FakeConsumer consumerOrange = new FakeConsumer(FilterImpl.createFilter("color = 'orange'"));
queue.addConsumer(consumerOrange);
queue.deliverNow();
Assert.assertEquals(2, consumerRed.getReferences().size());
Assert.assertEquals(0, consumerOrange.getReferences().size());
// verify redistributor is doing some work....
try {
// should attempt to add due to unmatched
queue.addRedistributor(0);
fail("expect error on attempt to add addRedistributor - npe b/c no storage etc");
} catch (NullPointerException expected) {
}
}
// Private ------------------------------------------------------------------------------
private void testConsumerWithFilters(final boolean direct) throws Exception {

View File

@ -157,4 +157,11 @@ public class FakeConsumer implements Consumer {
return Collections.emptyList();
}
@Override
public String toString() {
if (filter != null) {
return filter + ", " + super.toString();
}
return super.toString();
}
}