This commit is contained in:
Clebert Suconic 2019-09-06 14:26:04 -04:00
commit 85170ace50
2 changed files with 39 additions and 21 deletions

View File

@ -162,6 +162,7 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding {
return true;
} else {
for (Filter filter : filters) {
assert filter != null : "filters contains a null filter";
if (filter.match(message)) {
return true;
}
@ -203,7 +204,7 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding {
@Override
public synchronized void addConsumer(final SimpleString filterString) throws Exception {
if (filterString != null) {
if (filterString != null && !filterString.isEmpty()) {
// There can actually be many consumers on the same queue with the same filter, so we need to maintain a ref
// count
@ -223,7 +224,7 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding {
@Override
public synchronized void removeConsumer(final SimpleString filterString) throws Exception {
if (filterString != null) {
if (filterString != null && !filterString.isEmpty()) {
Integer i = filterCounts.get(filterString);
if (i != null) {

View File

@ -16,6 +16,10 @@
*/
package org.apache.activemq.artemis.tests.unit.core.server.cluster.impl;
import java.util.ArrayList;
import java.util.List;
import java.util.function.IntFunction;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
@ -27,19 +31,9 @@ import org.junit.Test;
public class RemoteQueueBindImplTest extends ActiveMQTestBase {
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
@Test
public void testAddRemoveConsumer() throws Exception {
private void testAddRemoveConsumerWithFilter(IntFunction<SimpleString> filterFactory,
int size,
int expectedSize) throws Exception {
final long id = RandomUtil.randomLong();
final SimpleString address = RandomUtil.randomSimpleString();
final SimpleString uniqueName = RandomUtil.randomSimpleString();
@ -51,17 +45,40 @@ public class RemoteQueueBindImplTest extends ActiveMQTestBase {
final int distance = 0;
RemoteQueueBindingImpl binding = new RemoteQueueBindingImpl(id, address, uniqueName, routingName, remoteQueueID, filterString, storeAndForwardQueue, bridgeName, distance, MessageLoadBalancingType.ON_DEMAND);
for (int i = 0; i < 100; i++) {
binding.addConsumer(new SimpleString("B" + i + "<A"));
final List<SimpleString> filters = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
final SimpleString filter = filterFactory.apply(i);
filters.add(filter);
binding.addConsumer(filter);
}
assertEquals(100, binding.getFilters().size());
assertEquals(expectedSize, binding.getFilters().size());
for (int i = 0; i < 100; i++) {
binding.removeConsumer(new SimpleString("B" + i + "<A"));
for (int i = 0; i < size; i++) {
binding.removeConsumer(filters.get(i));
}
assertEquals(0, binding.getFilters().size());
}
@Test
public void testAddRemoveConsumer() throws Exception {
testAddRemoveConsumerWithFilter(i -> new SimpleString("B" + i + "<A"), 100, 100);
}
@Test
public void testAddRemoveConsumerUsingSameFilter() throws Exception {
testAddRemoveConsumerWithFilter(i -> new SimpleString("B" + 0 + "<A"), 100, 1);
}
@Test
public void testAddRemoveConsumerUsingEmptyFilters() throws Exception {
testAddRemoveConsumerWithFilter(i -> new SimpleString(""), 1, 0);
}
@Test
public void testAddRemoveConsumerUsingNullFilters() throws Exception {
testAddRemoveConsumerWithFilter(i -> null, 1, 0);
}
}