This commit is contained in:
Clebert Suconic 2021-03-18 18:08:42 -04:00
commit 02ec0bfe42
2 changed files with 32 additions and 21 deletions

View File

@ -437,9 +437,7 @@ public final class BindingsImpl implements Bindings {
} }
} }
Filter filter = binding.getFilter(); if (matchBinding(message, binding)) {
if (filter == null || filter.match(message)) {
// bindings.length == 1 ==> only a local queue so we don't check for matching consumers (it's an // bindings.length == 1 ==> only a local queue so we don't check for matching consumers (it's an
// unnecessary overhead) // unnecessary overhead)
if (length == 1 || (binding.isConnected() && (messageLoadBalancingType.equals(MessageLoadBalancingType.STRICT) || binding.isHighAcceptPriority(message)))) { if (length == 1 || (binding.isConnected() && (messageLoadBalancingType.equals(MessageLoadBalancingType.STRICT) || binding.isHighAcceptPriority(message)))) {
@ -486,29 +484,21 @@ public final class BindingsImpl implements Bindings {
if (pos != startPos) { if (pos != startPos) {
routingNamePositions.put(routingName, pos); routingNamePositions.put(routingName, pos);
} }
if (messageLoadBalancingType.equals(MessageLoadBalancingType.OFF) && theBinding instanceof RemoteQueueBinding) {
if (exclusivelyRemote(bindings)) {
theBinding = null;
} else {
theBinding = getNextBinding(message, routingName, bindings);
}
}
return theBinding; return theBinding;
} }
private boolean exclusivelyRemote(List<Binding> bindings) { private boolean matchBinding(Message message, Binding binding) {
boolean result = true; if (messageLoadBalancingType.equals(MessageLoadBalancingType.OFF) && binding instanceof RemoteQueueBinding) {
return false;
for (Binding binding : bindings) {
if (!(binding instanceof RemoteQueueBinding)) {
result = false;
break;
}
} }
return result; Filter filter = binding.getFilter();
if (filter == null || filter.match(message)) {
return true;
} else {
return false;
}
} }
private void routeUsingStrictOrdering(final Message message, private void routeUsingStrictOrdering(final Message message,

View File

@ -66,6 +66,27 @@ public class RemoteBindingWithoutLoadBalancingTest extends ClusterTestBase {
send(1, "queues.testaddress", 1, false, null); send(1, "queues.testaddress", 1, false, null);
} }
@Test
public void testStackOverflowWithLocalConsumerAndFilter() throws Exception {
setupCluster();
startServers();
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
createQueue(0, "queues.testaddress", "queue0", "0", true);
createQueue(1, "queues.testaddress", "queue0", "1", true);
waitForBindings(0, "queues.testaddress", 1, 0, true);
waitForBindings(1, "queues.testaddress", 1, 0, false);
for (int i = 0; i < 10; i++) {
send(1, "queues.testaddress", 10, false, "" + i % 2);
}
}
@Test @Test
public void testStackOverflowJMS() throws Exception { public void testStackOverflowJMS() throws Exception {
final String QUEUE_NAME = "queues.queue0"; final String QUEUE_NAME = "queues.queue0";