From f4396da9fd9f7ecc7d9c0b02118d1eb1b76af523 Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Mon, 5 Nov 2018 10:42:32 -0600 Subject: [PATCH] ARTEMIS-2108 fix another potential StackOverflow --- .../cluster/impl/ClusterConnectionImpl.java | 2 +- .../cluster/impl/RemoteQueueBindingImpl.java | 9 ++++- ...RemoteBindingWithoutLoadBalancingTest.java | 40 +++++++++++++++++++ .../cluster/impl/RemoteQueueBindImplTest.java | 3 +- 4 files changed, 50 insertions(+), 4 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java index 849575848e..ef991efa42 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java @@ -1239,7 +1239,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn return; } - RemoteQueueBinding binding = new RemoteQueueBindingImpl(server.getStorageManager().generateID(), queueAddress, clusterName, routingName, queueID, filterString, queue, bridge.getName(), distance + 1); + RemoteQueueBinding binding = new RemoteQueueBindingImpl(server.getStorageManager().generateID(), queueAddress, clusterName, routingName, queueID, filterString, queue, bridge.getName(), distance + 1, messageLoadBalancingType); if (logger.isTraceEnabled()) { logger.trace("Adding binding " + clusterName + " into " + ClusterConnectionImpl.this); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java index 2161f7251e..f7ea2cc4be 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java @@ -62,6 +62,8 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding { private final int distance; + private final MessageLoadBalancingType messageLoadBalancingType; + private boolean connected = true; public RemoteQueueBindingImpl(final long id, @@ -72,7 +74,8 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding { final SimpleString filterString, final Queue storeAndForwardQueue, final SimpleString bridgeName, - final int distance) throws Exception { + final int distance, + final MessageLoadBalancingType messageLoadBalancingType) throws Exception { this.id = id; this.address = address; @@ -90,6 +93,8 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding { idsHeaderName = Message.HDR_ROUTE_TO_IDS.concat(bridgeName); this.distance = distance; + + this.messageLoadBalancingType = messageLoadBalancingType; } @Override @@ -149,7 +154,7 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding { @Override public synchronized boolean isHighAcceptPriority(final Message message) { - if (consumerCount == 0) { + if (consumerCount == 0 || messageLoadBalancingType.equals(MessageLoadBalancingType.OFF)) { return false; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/RemoteBindingWithoutLoadBalancingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/RemoteBindingWithoutLoadBalancingTest.java index da06175cb7..0322f38359 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/RemoteBindingWithoutLoadBalancingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/RemoteBindingWithoutLoadBalancingTest.java @@ -16,7 +16,16 @@ */ package org.apache.activemq.artemis.tests.integration.cluster.distribution; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.junit.Wait; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.junit.Before; import org.junit.Test; @@ -60,6 +69,37 @@ public class RemoteBindingWithoutLoadBalancingTest extends ClusterTestBase { send(1, "queues.testaddress", 1, false, null); } + @Test + public void testStackOverflowJMS() throws Exception { + final String QUEUE_NAME = "queues.queue0"; + + setupCluster(); + + startServers(); + + ConnectionFactory cf1 = new ActiveMQConnectionFactory("vm://0"); + Connection c1 = cf1.createConnection(); + c1.start(); + Session s1 = c1.createSession(); + MessageConsumer mc1 = s1.createConsumer(s1.createQueue(QUEUE_NAME)); + + waitForBindings(0, QUEUE_NAME, 1, 1, true); + waitForBindings(1, QUEUE_NAME, 1, 1, false); + + ConnectionFactory cf2 = new ActiveMQConnectionFactory("vm://1"); + Connection c2 = cf2.createConnection(); + Session s2 = c2.createSession(); + MessageProducer mp2 = s2.createProducer(s2.createQueue(QUEUE_NAME)); + mp2.send(s2.createMessage()); + + waitForBindings(1, QUEUE_NAME, 1, 0, true); + + assertTrue(Wait.waitFor(() -> servers[1].locateQueue(SimpleString.toSimpleString(QUEUE_NAME)).getMessageCount() == 1, 2000, 100)); + + c1.close(); + c2.close(); + } + protected void setupCluster() throws Exception { setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.OFF, 1, isNetty(), 0, 1); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java index 66528da343..77a8ee523a 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.unit.core.server.cluster.impl; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl; import org.apache.activemq.artemis.tests.unit.core.postoffice.impl.FakeQueue; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; @@ -48,7 +49,7 @@ public class RemoteQueueBindImplTest extends ActiveMQTestBase { final Queue storeAndForwardQueue = new FakeQueue(null); final SimpleString bridgeName = RandomUtil.randomSimpleString(); final int distance = 0; - RemoteQueueBindingImpl binding = new RemoteQueueBindingImpl(id, address, uniqueName, routingName, remoteQueueID, filterString, storeAndForwardQueue, bridgeName, distance); + RemoteQueueBindingImpl binding = new RemoteQueueBindingImpl(id, address, uniqueName, routingName, remoteQueueID, filterString, storeAndForwardQueue, bridgeName, distance, MessageLoadBalancingType.ON_DEMAND); for (int i = 0; i < 100; i++) { binding.addConsumer(new SimpleString("B" + i + "