This closes #2417
This commit is contained in:
commit
ff2073a7ed
|
@ -1239,7 +1239,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
|
|||
return;
|
||||
}
|
||||
|
||||
RemoteQueueBinding binding = new RemoteQueueBindingImpl(server.getStorageManager().generateID(), queueAddress, clusterName, routingName, queueID, filterString, queue, bridge.getName(), distance + 1);
|
||||
RemoteQueueBinding binding = new RemoteQueueBindingImpl(server.getStorageManager().generateID(), queueAddress, clusterName, routingName, queueID, filterString, queue, bridge.getName(), distance + 1, messageLoadBalancingType);
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Adding binding " + clusterName + " into " + ClusterConnectionImpl.this);
|
||||
|
|
|
@ -62,6 +62,8 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding {
|
|||
|
||||
private final int distance;
|
||||
|
||||
private final MessageLoadBalancingType messageLoadBalancingType;
|
||||
|
||||
private boolean connected = true;
|
||||
|
||||
public RemoteQueueBindingImpl(final long id,
|
||||
|
@ -72,7 +74,8 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding {
|
|||
final SimpleString filterString,
|
||||
final Queue storeAndForwardQueue,
|
||||
final SimpleString bridgeName,
|
||||
final int distance) throws Exception {
|
||||
final int distance,
|
||||
final MessageLoadBalancingType messageLoadBalancingType) throws Exception {
|
||||
this.id = id;
|
||||
|
||||
this.address = address;
|
||||
|
@ -90,6 +93,8 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding {
|
|||
idsHeaderName = Message.HDR_ROUTE_TO_IDS.concat(bridgeName);
|
||||
|
||||
this.distance = distance;
|
||||
|
||||
this.messageLoadBalancingType = messageLoadBalancingType;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -149,7 +154,7 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding {
|
|||
|
||||
@Override
|
||||
public synchronized boolean isHighAcceptPriority(final Message message) {
|
||||
if (consumerCount == 0) {
|
||||
if (consumerCount == 0 || messageLoadBalancingType.equals(MessageLoadBalancingType.OFF)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
|
@ -16,7 +16,16 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.cluster.distribution;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.junit.Wait;
|
||||
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -60,6 +69,37 @@ public class RemoteBindingWithoutLoadBalancingTest extends ClusterTestBase {
|
|||
send(1, "queues.testaddress", 1, false, null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStackOverflowJMS() throws Exception {
|
||||
final String QUEUE_NAME = "queues.queue0";
|
||||
|
||||
setupCluster();
|
||||
|
||||
startServers();
|
||||
|
||||
ConnectionFactory cf1 = new ActiveMQConnectionFactory("vm://0");
|
||||
Connection c1 = cf1.createConnection();
|
||||
c1.start();
|
||||
Session s1 = c1.createSession();
|
||||
MessageConsumer mc1 = s1.createConsumer(s1.createQueue(QUEUE_NAME));
|
||||
|
||||
waitForBindings(0, QUEUE_NAME, 1, 1, true);
|
||||
waitForBindings(1, QUEUE_NAME, 1, 1, false);
|
||||
|
||||
ConnectionFactory cf2 = new ActiveMQConnectionFactory("vm://1");
|
||||
Connection c2 = cf2.createConnection();
|
||||
Session s2 = c2.createSession();
|
||||
MessageProducer mp2 = s2.createProducer(s2.createQueue(QUEUE_NAME));
|
||||
mp2.send(s2.createMessage());
|
||||
|
||||
waitForBindings(1, QUEUE_NAME, 1, 0, true);
|
||||
|
||||
assertTrue(Wait.waitFor(() -> servers[1].locateQueue(SimpleString.toSimpleString(QUEUE_NAME)).getMessageCount() == 1, 2000, 100));
|
||||
|
||||
c1.close();
|
||||
c2.close();
|
||||
}
|
||||
|
||||
protected void setupCluster() throws Exception {
|
||||
setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.OFF, 1, isNetty(), 0, 1);
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.unit.core.server.cluster.impl;
|
|||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl;
|
||||
import org.apache.activemq.artemis.tests.unit.core.postoffice.impl.FakeQueue;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
|
@ -48,7 +49,7 @@ public class RemoteQueueBindImplTest extends ActiveMQTestBase {
|
|||
final Queue storeAndForwardQueue = new FakeQueue(null);
|
||||
final SimpleString bridgeName = RandomUtil.randomSimpleString();
|
||||
final int distance = 0;
|
||||
RemoteQueueBindingImpl binding = new RemoteQueueBindingImpl(id, address, uniqueName, routingName, remoteQueueID, filterString, storeAndForwardQueue, bridgeName, distance);
|
||||
RemoteQueueBindingImpl binding = new RemoteQueueBindingImpl(id, address, uniqueName, routingName, remoteQueueID, filterString, storeAndForwardQueue, bridgeName, distance, MessageLoadBalancingType.ON_DEMAND);
|
||||
|
||||
for (int i = 0; i < 100; i++) {
|
||||
binding.addConsumer(new SimpleString("B" + i + "<A"));
|
||||
|
|
Loading…
Reference in New Issue