diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java index c23f9945ec..b6db14881e 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java @@ -211,8 +211,10 @@ public class MQTTSubscriptionManager { Set queueConsumers; if (queue != null && (queueConsumers = (Set) queue.getConsumers()) != null) { for (Consumer consumer : queueConsumers) { - ((ServerConsumer) consumer).close(false); - consumerQoSLevels.remove(((ServerConsumer) consumer).getID()); + if (consumer instanceof ServerConsumer) { + ((ServerConsumer) consumer).close(false); + consumerQoSLevels.remove(((ServerConsumer) consumer).getID()); + } } } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index a056c84550..6bae2956b6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1481,7 +1481,7 @@ public interface ActiveMQServerLogger extends BasicLogger { void unableToFlushDeliveries(@Cause Exception e); @LogMessage(level = Logger.Level.WARN) - @Message(id = 222237, value = "Unable to flush deliveries", format = Message.Format.MESSAGE_FORMAT) + @Message(id = 222237, value = "Unable to stop redistributor", format = Message.Format.MESSAGE_FORMAT) void unableToCancelRedistributor(@Cause Exception e); @LogMessage(level = Logger.Level.WARN) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index dedf1904a2..aa43170371 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -380,7 +380,7 @@ public interface Queue extends Bindable,CriticalComponent { void addRedistributor(long delay); - void cancelRedistributor() throws Exception; + void cancelRedistributor(); boolean hasMatchingConsumer(Message message); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index e0d81266b6..6909cf6b50 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -113,7 +113,6 @@ import org.apache.activemq.artemis.utils.collections.NodeStore; import org.apache.activemq.artemis.utils.collections.LinkedListIterator; import org.apache.activemq.artemis.utils.collections.PriorityLinkedList; import org.apache.activemq.artemis.utils.collections.PriorityLinkedListImpl; -import org.apache.activemq.artemis.utils.collections.SingletonIterator; import org.apache.activemq.artemis.utils.collections.TypedProperties; import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl; import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer; @@ -188,6 +187,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private boolean mirrorController; + private volatile boolean hasUnMatchedPending = false; + // Messages will first enter intermediateMessageReferences // Before they are added to messageReferences // This is to avoid locking the queue on the producer @@ -260,7 +261,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private final SimpleString address; - private ConsumerHolder redistributor; + // redistributor goes in the consumers list, this signals its presence and allows for easy comparison/check + private volatile ConsumerHolder redistributor; private ScheduledFuture redistributorFuture; @@ -1326,12 +1328,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { getExecutor().execute(new Runnable() { @Override public void run() { - try { - cancelRedistributor(); - } catch (Exception e) { - // nothing that could be done anyway.. just logging - ActiveMQServerLogger.LOGGER.unableToCancelRedistributor(e); - } + cancelRedistributor(); } }); @@ -1391,7 +1388,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return false; } - if (consumers.size() >= consumersBeforeDispatch) { + if (getConsumerCount() >= consumersBeforeDispatch) { if (dispatchingUpdater.compareAndSet(this, BooleanUtil.toInt(false), BooleanUtil.toInt(true))) { dispatchStartTimeUpdater.set(this, System.currentTimeMillis()); } @@ -1419,7 +1416,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { try (ArtemisCloseable metric = measureCritical(CRITICAL_CONSUMER)) { synchronized (this) { - if (maxConsumers != MAX_CONSUMERS_UNLIMITED && consumers.size() >= maxConsumers) { + if (maxConsumers != MAX_CONSUMERS_UNLIMITED && getConsumerCount() >= maxConsumers) { throw ActiveMQMessageBundle.BUNDLE.maxConsumerLimitReachedForQueue(address, name); } @@ -1488,7 +1485,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (consumerRemoved) { consumerRemovedTimestampUpdater.set(this, System.currentTimeMillis()); - if (consumers.size() == 0) { + if (getConsumerCount() == 0) { stopDispatch(); } } @@ -1525,11 +1522,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { supports = false; } } - if (redistributor != null) { - if (!redistributor.consumer.supportsDirectDelivery()) { - supports = false; - } - } return supports; } @@ -1540,10 +1532,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (redistributor != null) { // Just prompt delivery deliverAsync(); + return; } if (delay > 0) { - if (consumers.isEmpty()) { + if (consumers.isEmpty() || hasUnMatchedPending) { DelayedAddRedistributor dar = new DelayedAddRedistributor(executor); redistributorFuture = scheduledExecutor.schedule(dar, delay, TimeUnit.MILLISECONDS); @@ -1562,17 +1555,34 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } @Override - public synchronized void cancelRedistributor() throws Exception { - if (redistributor != null) { - redistributor.consumer.stop(); - redistributor = null; - } - + public synchronized void cancelRedistributor() { clearRedistributorFuture(); + + if (redistributor != null) { + try { + redistributor.consumer.stop(); + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.unableToCancelRedistributor(e); + } finally { + consumers.remove(redistributor); + redistributor = null; + } + } } @Override public int getConsumerCount() { + // we don't want to count the redistributor, it is an internal transient entry in the consumer list + if (redistributor != null) { + synchronized (this) { + final int size = consumers.size(); + if (size > 0 && redistributor != null) { + return size - 1; + } else { + return size; + } + } + } return consumers.size(); } @@ -1775,7 +1785,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { public Map> getDeliveringMessages() { final Iterator> consumerHolderIterator; synchronized (this) { - consumerHolderIterator = redistributor == null ? consumers.iterator() : SingletonIterator.newInstance(redistributor); + consumerHolderIterator = consumers.iterator(); } Map> mapReturn = new HashMap<>(); @@ -2767,9 +2777,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { for (ConsumerHolder holder : this.consumers) { holder.resetIterator(); } - if (redistributor != null) { - redistributor.resetIterator(); - } } @Override @@ -2972,6 +2979,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { // Either the iterator is empty or the consumer is busy int noDelivery = 0; + // track filters not matching, used to track when all consumers can't match, redistribution is then an option + int numNoMatch = 0; + int numAttempts = 0; + int handled = 0; long timeout = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(DELIVERY_TIMEOUT); @@ -2998,9 +3009,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { MessageReference ref; - // filter evaluation or transformation may cause properties to be lazyDecoded, we need to reflect that - int existingMemoryEstimate = 0; - Consumer handledconsumer = null; synchronized (this) { @@ -3015,14 +3023,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } ConsumerHolder holder; - if (redistributor == null) { - if (consumers.hasNext()) { - holder = consumers.next(); - } else { - break; - } + if (consumers.hasNext()) { + holder = consumers.next(); } else { - holder = redistributor; + break; } Consumer consumer = holder.consumer; @@ -3032,6 +3036,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { holder.iter = messageReferences.iterator(); } + // LVQ support ref = nextDelivery(); boolean nextDelivery = false; if (ref != null) { @@ -3059,7 +3064,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { logger.trace("Queue " + this.getName() + " is delivering reference " + ref); } - existingMemoryEstimate = ref.getMessageMemoryEstimate(); final SimpleString groupID = extractGroupID(ref); groupConsumer = getGroupConsumer(groupID); @@ -3067,15 +3071,18 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { consumer = groupConsumer; } + numAttempts++; HandleStatus status = handle(ref, consumer); if (status == HandleStatus.HANDLED) { - // if a message was delivered, any previous negative attemps need to be cleared + // if a message was delivered, any previous negative attempts need to be cleared // this is to avoid breaks on the loop when checking for any other factors. noDelivery = 0; + numNoMatch = 0; + numAttempts = 0; - if (redistributor == null) { + if (consumer != redistributor) { ref = handleMessageGroup(ref, consumer, groupConsumer, groupID); } @@ -3104,13 +3111,21 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } noDelivery++; + numNoMatch = 0; + numAttempts = 0; + // no consumers.reset() b/c we skip this consumer } else if (status == HandleStatus.NO_MATCH) { - // nothing to be done on this case, the iterators will just jump next consumers.reset(); + numNoMatch++; + // every attempt resulted in noMatch for number of consumers means we tried all consumers for a single message + if (numNoMatch == numAttempts && numAttempts == consumers.size()) { + hasUnMatchedPending = true; + // one hit of unmatched message is enough, no need to reset counters + } } } - if (redistributor != null || groupConsumer != null) { + if (groupConsumer != null) { if (noDelivery > 0) { break; } @@ -3303,15 +3318,14 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } private void internalAddRedistributor(final ArtemisExecutor executor) { - // create the redistributor only once if there are no local consumers - if (consumers.isEmpty() && redistributor == null) { + if (redistributor == null && (consumers.isEmpty() || hasUnMatchedPending)) { if (logger.isTraceEnabled()) { logger.trace("QueueImpl::Adding redistributor on queue " + this.toString()); } - - redistributor = (new ConsumerHolder(new Redistributor(this, storageManager, postOffice, executor, QueueImpl.REDISTRIBUTOR_BATCH_SIZE))); - + redistributor = new ConsumerHolder(new Redistributor(this, storageManager, postOffice, executor, QueueImpl.REDISTRIBUTOR_BATCH_SIZE)); redistributor.consumer.start(); + consumers.add(redistributor); + hasUnMatchedPending = false; deliverAsync(); } @@ -3749,9 +3763,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { consumers.reset(); - while (consumers.hasNext() || redistributor != null) { + while (consumers.hasNext()) { - ConsumerHolder holder = redistributor == null ? consumers.next() : redistributor; + ConsumerHolder holder = consumers.next(); Consumer consumer = holder.consumer; final SimpleString groupID = extractGroupID(ref); @@ -3764,7 +3778,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { HandleStatus status = handle(ref, consumer); if (status == HandleStatus.HANDLED) { final MessageReference reference; - if (redistributor == null) { + if (consumer != redistributor) { reference = handleMessageGroup(ref, consumer, groupConsumer, groupID); } else { reference = ref; @@ -4527,12 +4541,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { logger.debug(getAddress() + ":" + getName() + " has " + queueMessages + " message(s) and " + getConsumerCount() + " consumer(s) and is receiving messages at a rate of " + queueRate + " msgs/second."); } - - if (consumers.size() == 0) { + final int consumerCount = getConsumerCount(); + if (consumerCount == 0) { logger.debug("There are no consumers, no need to check slow consumer's rate"); return; } else { - float queueThreshold = thresholdInMsgPerSecond * consumers.size(); + float queueThreshold = thresholdInMsgPerSecond * consumerCount; if (queueRate < queueThreshold && queueMessages < queueThreshold) { if (logger.isDebugEnabled()) { diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index 068a442bcc..f3b215f349 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -1475,7 +1475,7 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override - public void cancelRedistributor() throws Exception { + public void cancelRedistributor() { } diff --git a/docs/user-manual/en/clusters.md b/docs/user-manual/en/clusters.md index 3c039c27df..d88a7c7c37 100644 --- a/docs/user-manual/en/clusters.md +++ b/docs/user-manual/en/clusters.md @@ -634,9 +634,7 @@ specified. The following shows all the available configuration options Keep in mind that this message forwarding/balancing is what we call "initial distribution." It is different than *redistribution* which - is [discussed below](#message-redistribution). This distinction is - important because redistribution is configured differently and has - unique semantics (e.g. it *does not* support filters (selectors)). + is [discussed below](#message-redistribution). Default is `ON_DEMAND`. @@ -823,14 +821,14 @@ consumers on the queue the message won't get consumed and we have a This is where message redistribution comes in. With message redistribution Apache ActiveMQ Artemis can be configured to automatically -*redistribute* messages from queues which have no consumers back to +*redistribute* messages from queues which have no consumers or consumers +with filters that don't match messages. The messages are re-routed to other nodes in the cluster which do have matching consumers. To enable this functionality `message-load-balancing` must be `ON_DEMAND`. Message redistribution can be configured to kick in immediately after -the last consumer on a queue is closed, or to wait a configurable delay -after the last consumer on a queue is closed before redistributing. By -default message redistribution is disabled. +the need to redistribute is detected, or to wait a configurable delay before redistributing. +By default, message redistribution is disabled. Message redistribution can be configured on a per address basis, by specifying the redistribution delay in the address settings. For more @@ -855,68 +853,15 @@ The attribute `match` can be an exact match or it can be a string that conforms to the Apache ActiveMQ Artemis wildcard syntax (described in [Wildcard Syntax](wildcard-syntax.md)). The element `redistribution-delay` defines the delay in milliseconds -after the last consumer is closed on a queue before redistributing -messages from that queue to other nodes of the cluster which do have -matching consumers. A delay of zero means the messages will be -immediately redistributed. A value of `-1` signifies that messages will -never be redistributed. The default value is `-1`. +between detecting the need for redistribution and actually attempting redistribution. +A delay of zero means the messages will be immediately redistributed. +A value of `-1` signifies that messages will never be redistributed. The default value is `-1`. It often makes sense to introduce a delay before redistributing as it's a common case that a consumer closes but another one quickly is created on the same queue, in such a case you probably don't want to redistribute immediately since the new consumer will arrive shortly. -#### Redistribution and filters (selectors) - -Although "initial distribution" (described above) does support filters -(selectors), redistribution does *not* support filters. Consider this -scenario: - - 1. A cluster of 2 nodes - `A` and `B` - using a `redistribution-delay` of - `0` and a `message-load-balancing` of `ON_DEMAND`. - 1. `A` and `B` each has the queue `foo`. - 1. A producer sends a message which is routed to queue `foo` on node `A`. - The message has property named `myProperty` with a value of `10`. - 1. A consumer connects to queue `foo` on node `A` with the filter - `myProperty=5`. This filter doesn't match the message. - 1. A consumer connects to queue `foo` on node `B` with the filter - `myProperty=10`. This filter *does* match the message . - -Despite the fact that the filter of the consumer on queue `foo` on node `B` -matches the message, the message will *not* be redistributed from node `A` to -node `B` because a consumer for the queue exists on node `A`. - -Not supporting redistribution based on filters was an explicit design decision -in order to avoid two main problems - queue scanning and unnecessary -redistribution. - -From a performance perspective a consumer with a filter on a queue is already -costly due to the scanning that the broker must do on the queue to find -matching messages. In general, this is a bit of an anti-pattern as it turns -the broker into something akin to a database where you can "select" the data -you want using a filter. If brokers are configured in a cluster and a consumer -with a filter connects and no matches are found after scanning the local queue -then potentially every instance of that queue in the cluster would need to be -scanned. This turns into a bit of a scalability nightmare with lots of consumers -(especially short-lived consumers) with filters connecting & disconnecting -frequently. The time & computing resources used for queue scanning would go -through the roof. - -It is also possible to get into a pathological situation where short-lived -consumers with filters connect to nodes around the cluster and messages get -redistributed back and forth between nodes without ever actually being consumed. - -One common use-case for consumers with filters (selectors) on queues is -request/reply using a correlation ID. Following the standard pattern can be -problematic in a cluster due to the lack of redistribution based on filters -already described. However, there is a simple way to ensure an application -using this request/reply pattern gets its reply even when using a correlation -ID filter in a cluster - create the consumer before the request is sent. This -will ensure that when the reply is sent it will be routed the proper cluster -node since "*initial* distribution" (described above) does support filters. -For example, in the scenario outlined above if steps 3 and 5 were switched -(i.e. if the consumers were created before the message was sent) then the -consumer on node `B` would in fact receive the message. ## Cluster topologies diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/SelectorRedistributionClusterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/SelectorRedistributionClusterTest.java new file mode 100644 index 0000000000..38cd430258 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/SelectorRedistributionClusterTest.java @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.jms.cluster; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.artemis.tests.util.JMSClusteredTestBase; +import org.junit.Before; +import org.junit.Test; + +public class SelectorRedistributionClusterTest extends JMSClusteredTestBase { + + private final String myQueue = "myQueue"; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + jmsServer1.getActiveMQServer().setIdentity("Server 1"); + jmsServer2.getActiveMQServer().setIdentity("Server 2"); + } + + @Override + protected boolean enablePersistence() { + return true; + } + + @Test + public void testSelectorRoutingReDistributionOnNoConsumer() throws Exception { + server1.getAddressSettingsRepository().getMatch("#").setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0); + server2.getAddressSettingsRepository().getMatch("#").setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0); + Connection conn1 = cf1.createConnection(); + Connection conn2 = cf2.createConnection(); + conn1.start(); + conn2.start(); + + try { + Session session1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + Session session2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + javax.jms.Queue jmsQueue = session1.createQueue(myQueue); + + MessageProducer prod1 = session1.createProducer(jmsQueue); + + prod1.setDeliveryMode(DeliveryMode.PERSISTENT); + + TextMessage textMessage = session1.createTextMessage("m1"); + textMessage.setIntProperty("N", 10); + + + // remote demand with a filter in advance of send, so routing sees remote filter match and can ignore the local consumer + MessageConsumer cons2 = session2.createConsumer(jmsQueue, "N = 10"); + // local consumer that does not match message + MessageConsumer cons1 = session1.createConsumer(jmsQueue, "N = 0"); + + // verify cluster notifications have completed before send + waitForBindings(server1, myQueue, false, 1, 1, 4000); + + prod1.send(textMessage); + + TextMessage received = (TextMessage) cons2.receive(4000); + assertNotNull(received); + assertEquals("m1", received.getText()); + + // lets check some redistribution back by close with no acknowledge + session2.close(); + + // consumer on server 1 does not match, redistribution not done yet, message still available to local consumer + session2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE); + cons2 = session2.createConsumer(jmsQueue, "N = 10"); + received = (TextMessage) cons2.receive(4000); + assertNotNull(received); + + // have to create consumer matching filter on server1 in advance such that redistribution happens fast + MessageConsumer cons11 = session1.createConsumer(jmsQueue, "N = 10"); + + // now expect redistribution + session2.close(); + + // get it from server1 + received = (TextMessage) cons11.receive(4000); + assertNotNull(received); + assertEquals("m1", received.getText()); + + // done + received.acknowledge(); + + } finally { + conn1.close(); + conn2.close(); + } + } + + @Test + public void testSelectorRoutingNoReDistributionNewMessageSkipsTillLocalClose() throws Exception { + server1.getAddressSettingsRepository().getMatch("#").setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0); + server2.getAddressSettingsRepository().getMatch("#").setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0); + Connection conn1 = cf1.createConnection(); + Connection conn2 = cf2.createConnection(); + conn1.start(); + conn2.start(); + + try { + Session session1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Session session11 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + Session session2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + javax.jms.Queue jmsQueue = session1.createQueue(myQueue); + + MessageProducer prod1 = session1.createProducer(jmsQueue); + + prod1.setDeliveryMode(DeliveryMode.PERSISTENT); + + TextMessage textMessage = session1.createTextMessage("m1"); + textMessage.setIntProperty("N", 10); + + + // remote demand with a filter in advance of send + MessageConsumer cons2 = session2.createConsumer(jmsQueue, "N = 10"); + // local consumer that does not match message + MessageConsumer cons1 = session1.createConsumer(jmsQueue, "N = 0"); + // local consumer that matches message + MessageConsumer cons11 = session11.createConsumer(jmsQueue, "N = 10"); + + // verify cluster notifications have completed before send + waitForBindings(server1, myQueue, false, 1, 1, 4000); + + prod1.send(textMessage); + + TextMessage received = (TextMessage) cons11.receive(4000); + assertNotNull(received); + assertEquals("m1", received.getText()); + + // lets check some redistribution by close with no acknowledge so session rolls back delivery + session11.close(); + + // nothing for the existing remote binding + received = (TextMessage) cons2.receiveNoWait(); + assertNull(received); + + // send a second message, it will get routed to the remote binding + textMessage = session1.createTextMessage("m2"); + textMessage.setIntProperty("N", 10); + + prod1.send(textMessage); + + received = (TextMessage) cons2.receive(4000); + assertNotNull(received); + assertEquals("m2", received.getText()); + received.acknowledge(); + + // release the local consumer such that redistribution kicks in + session1.close(); + + received = (TextMessage) cons2.receive(4000); + assertNotNull(received); + assertEquals("m1", received.getText()); + + // done + received.acknowledge(); + + } finally { + conn1.close(); + conn2.close(); + } + } + + + @Test + public void testSelectorRoutingReDistributionDoesNotBlockLocalConsumer() throws Exception { + server1.getAddressSettingsRepository().getMatch("#").setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0); + server2.getAddressSettingsRepository().getMatch("#").setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0); + Connection conn1 = cf1.createConnection(); + Connection conn2 = cf2.createConnection(); + conn1.start(); + conn2.start(); + + try { + Session session1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Session session11 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + Session session2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + javax.jms.Queue jmsQueue = session1.createQueue(myQueue); + + MessageProducer prod1 = session1.createProducer(jmsQueue); + + prod1.setDeliveryMode(DeliveryMode.PERSISTENT); + + TextMessage textMessage = session1.createTextMessage("m1"); + textMessage.setIntProperty("N", 10); + + + // local consumers that does not match message + MessageConsumer cons1_0 = session1.createConsumer(jmsQueue, "N = 0"); + MessageConsumer cons1_1 = session1.createConsumer(jmsQueue, "N = 1"); + + // local consumer that matches message + MessageConsumer cons1_10 = session11.createConsumer(jmsQueue, "N = 10"); + + prod1.send(textMessage); + + TextMessage received = (TextMessage) cons1_10.receive(4000); + assertNotNull(received); + assertEquals("m1", received.getText()); + + // lets check some redistribution by close with no acknowledge so session rolls back delivery + session11.close(); + + // remote demand with a filter, consumer moved triggers redistribution event b/c all local consumers don't match + MessageConsumer cons2_10 = session2.createConsumer(jmsQueue, "N = 10"); + + received = (TextMessage) cons2_10.receive(4000); + assertNotNull(received); + assertEquals("m1", received.getText()); + received.acknowledge(); + + // check local consumers can still get dispatched + textMessage = session1.createTextMessage("m2"); + textMessage.setIntProperty("N", 0); + prod1.send(textMessage); + + textMessage = session1.createTextMessage("m3"); + textMessage.setIntProperty("N", 1); + prod1.send(textMessage); + + + received = (TextMessage) cons1_0.receive(4000); + assertNotNull(received); + assertEquals("m2", received.getText()); + received.acknowledge(); + + received = (TextMessage) cons1_1.receive(4000); + assertNotNull(received); + assertEquals("m3", received.getText()); + received.acknowledge(); + + // verify redistributor still kicks in too + textMessage = session1.createTextMessage("m4"); + textMessage.setIntProperty("N", 10); + prod1.send(textMessage); + + received = (TextMessage) cons2_10.receive(4000); + assertNotNull(received); + assertEquals("m4", received.getText()); + received.acknowledge(); + + } finally { + conn1.close(); + conn2.close(); + } + } + + + @Test + public void testSelectorRoutingReDistributionOnConsumerMove() throws Exception { + server1.getAddressSettingsRepository().getMatch("#").setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0); + server2.getAddressSettingsRepository().getMatch("#").setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0); + Connection conn1 = cf1.createConnection(); + Connection conn2 = cf2.createConnection(); + conn1.start(); + conn2.start(); + + try { + Session session1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Session session11 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + Session session2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + javax.jms.Queue jmsQueue = session1.createQueue(myQueue); + + MessageProducer prod1 = session1.createProducer(jmsQueue); + + prod1.setDeliveryMode(DeliveryMode.PERSISTENT); + + TextMessage textMessage = session1.createTextMessage("m1"); + textMessage.setIntProperty("N", 10); + + + // local consumers that does not match message + MessageConsumer cons1 = session1.createConsumer(jmsQueue, "N = 0"); + MessageConsumer cons12 = session1.createConsumer(jmsQueue, "N = 1"); + + // local consumer that matches message + MessageConsumer cons111 = session11.createConsumer(jmsQueue, "N = 10"); + + prod1.send(textMessage); + + TextMessage received = (TextMessage) cons111.receive(4000); + assertNotNull(received); + assertEquals("m1", received.getText()); + + // lets check some redistribution by close with no acknowledge so session rolls back delivery + session11.close(); + + // remote demand with a filter, consumer moved triggers redistribution event b/c all local consumers don't match + MessageConsumer cons2 = session2.createConsumer(jmsQueue, "N = 10"); + + received = (TextMessage) cons2.receive(4000); + assertNotNull(received); + assertEquals("m1", received.getText()); + + received.acknowledge(); + + } finally { + conn1.close(); + conn2.close(); + } + } + + @Test + public void testSelectorRoutingReDistributionOnLocalNoMatchConsumerCloseNeedsNewRemoteDemand() throws Exception { + server1.getAddressSettingsRepository().getMatch("#").setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0); + server2.getAddressSettingsRepository().getMatch("#").setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0); + Connection conn1 = cf1.createConnection(); + Connection conn2 = cf2.createConnection(); + conn1.start(); + conn2.start(); + + try { + Session session1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Session session1_n_10 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Session session1_n_1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + Session session2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + javax.jms.Queue jmsQueue = session1.createQueue(myQueue); + + + MessageProducer prod1 = session1.createProducer(jmsQueue); + + prod1.setDeliveryMode(DeliveryMode.PERSISTENT); + + TextMessage textMessage = session1.createTextMessage("m1"); + textMessage.setIntProperty("N", 10); + + + // remote demand with a filter + MessageConsumer consumer2_n_10 = session2.createConsumer(jmsQueue, "N = 10"); + + // local consumers that does not match message + MessageConsumer consumer1_n_0 = session1.createConsumer(jmsQueue, "N = 0"); + MessageConsumer consumer1_n_1 = session1_n_1.createConsumer(jmsQueue, "N = 1"); + + // local consumer that matches message + MessageConsumer consumer1_n_10 = session1_n_10.createConsumer(jmsQueue, "N = 10"); + + // verify cluster notifications have completed before send + waitForBindings(server1, myQueue, false, 1, 1, 4000); + waitForBindings(server1, myQueue, true, 1, 3, 4000); + + prod1.send(textMessage); + + TextMessage received = (TextMessage) consumer1_n_10.receive(4000); + assertNotNull(received); + assertEquals("m1", received.getText()); + + // lets prepare some non matching message for redistribution by close with no acknowledge so session rolls back delivery + session1_n_10.close(); + + // verify no redistribution event yet + assertNull(consumer2_n_10.receiveNoWait()); + + // local remove consumer event will not trigger redistribution + session1_n_1.close(); + + // verify no redistribution event yet + assertNull(consumer2_n_10.receiveNoWait()); + + // force a redistribution event on new remote consumer creation (that won't match in this case), trigger redistribution + MessageConsumer consumer2_n_0 = session2.createConsumer(jmsQueue, "N = 0"); + + // verify redistribution to remote + received = (TextMessage) consumer2_n_10.receive(4000); + assertNotNull(received); + assertEquals("m1", received.getText()); + + received.acknowledge(); + + } finally { + conn1.close(); + conn2.close(); + } + } +} diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java index 06f78b2525..fb2fdc572a 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java @@ -439,7 +439,7 @@ public class FakeQueue extends CriticalComponentImpl implements Queue { } @Override - public void cancelRedistributor() throws Exception { + public void cancelRedistributor() { // no-op } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java index 47555df6e4..0493c8bfef 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java @@ -967,6 +967,104 @@ public class QueueImplTest extends ActiveMQTestBase { Assert.assertEquals(20, queue.getDeliveringCount()); } + + @Test + public void testNoMatchConsumersAllowsRedistribution() throws Exception { + QueueImpl queue = getTemporaryQueue(); + + final int numMessages = 2; + List refs = new ArrayList<>(); + + for (int i = 0; i < numMessages; i++) { + MessageReference ref = generateReference(queue, i); + ref.getMessage().putStringProperty("color", "red"); + refs.add(ref); + + queue.addTail(ref); + } + + Assert.assertEquals(numMessages, getMessageCount(queue)); + queue.deliverNow(); + + Assert.assertEquals(numMessages, getMessageCount(queue)); + + FakeConsumer consumer = new FakeConsumer(FilterImpl.createFilter("color = 'green'")); + queue.addConsumer(consumer); + + FakeConsumer consumer2 = new FakeConsumer(FilterImpl.createFilter("color = 'orange'")); + queue.addConsumer(consumer2); + + queue.deliverNow(); + Assert.assertEquals(0, consumer.getReferences().size()); + Assert.assertEquals(0, consumer2.getReferences().size()); + + // verify redistributor is doing some work.... + try { + // should attempt to add due to unmatched + queue.addRedistributor(0); + fail("expect error on attempt to add addRedistributor - npe b/c no storage etc"); + } catch (NullPointerException expected) { + } + + // verify with odd number as check depends on order/reset/wrap of consumers + FakeConsumer consumer3 = new FakeConsumer(FilterImpl.createFilter("color = 'blue'")); + queue.addConsumer(consumer3); + + queue.deliverNow(); + + Assert.assertEquals(0, consumer.getReferences().size()); + Assert.assertEquals(0, consumer2.getReferences().size()); + Assert.assertEquals(0, consumer3.getReferences().size()); + + // verify redistributor is doing some work.... + try { + // should attempt to add due to unmatched + queue.addRedistributor(0); + fail("expect error on attempt to add addRedistributor - npe b/c no storage etc"); + } catch (NullPointerException expected) { + } + + Assert.assertEquals(numMessages, getMessageCount(queue)); + } + + @Test + public void testNoMatchOn3AllowsRedistribution() throws Exception { + QueueImpl queue = getTemporaryQueue(); + + int i = 0; + MessageReference ref = generateReference(queue, i++); + ref.getMessage().putStringProperty("color", "red"); + queue.addTail(ref); + + ref = generateReference(queue, i++); + ref.getMessage().putStringProperty("color", "red"); + queue.addTail(ref); + + ref = generateReference(queue, i++); + ref.getMessage().putStringProperty("color", "blue"); + queue.addTail(ref); + + Assert.assertEquals(3, getMessageCount(queue)); + + FakeConsumer consumerRed = new FakeConsumer(FilterImpl.createFilter("color = 'red'")); + queue.addConsumer(consumerRed); + + FakeConsumer consumerOrange = new FakeConsumer(FilterImpl.createFilter("color = 'orange'")); + queue.addConsumer(consumerOrange); + + queue.deliverNow(); + Assert.assertEquals(2, consumerRed.getReferences().size()); + Assert.assertEquals(0, consumerOrange.getReferences().size()); + + // verify redistributor is doing some work.... + try { + // should attempt to add due to unmatched + queue.addRedistributor(0); + fail("expect error on attempt to add addRedistributor - npe b/c no storage etc"); + } catch (NullPointerException expected) { + } + } + // Private ------------------------------------------------------------------------------ private void testConsumerWithFilters(final boolean direct) throws Exception { diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java index 2a5a33061f..fe31cfa091 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java @@ -157,4 +157,11 @@ public class FakeConsumer implements Consumer { return Collections.emptyList(); } + @Override + public String toString() { + if (filter != null) { + return filter + ", " + super.toString(); + } + return super.toString(); + } }