diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index e6440106c6..8500ecf1fb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -1671,7 +1671,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { private void parseGroupingHandlerConfiguration(final Element node, final Configuration mainConfiguration) { String name = node.getAttribute("name"); String type = getString(node, "type", null, Validators.NOT_NULL_OR_EMPTY); - String address = getString(node, "address", null, Validators.NOT_NULL_OR_EMPTY); + String address = getString(node, "address", "", Validators.NO_CHECK); Integer timeout = getInteger(node, "timeout", ActiveMQDefaultConfiguration.getDefaultGroupingHandlerTimeout(), Validators.GT_ZERO); Long groupTimeout = getLong(node, "group-timeout", ActiveMQDefaultConfiguration.getDefaultGroupingHandlerGroupTimeout(), Validators.MINUS_ONE_OR_GT_ZERO); Long reaperPeriod = getLong(node, "reaper-period", ActiveMQDefaultConfiguration.getDefaultGroupingHandlerReaperPeriod(), Validators.GT_ZERO); diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index f67cd48d9a..802afca51c 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -2525,7 +2525,7 @@ - + A reference to a cluster connection address diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredGroupingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredGroupingTest.java index 1c59990a3d..f393faf936 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredGroupingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredGroupingTest.java @@ -25,6 +25,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; @@ -286,6 +287,53 @@ public class ClusteredGroupingTest extends ClusterTestBase { } + /** + * This is the same test as testGroupingSimple() just with the "address" removed from the cluster-connection and grouping-handler + */ + @Test + public void testGroupingSimpleWithNoAddress() throws Exception { + setupServer(0, isFileStorage(), isNetty()); + setupServer(1, isFileStorage(), isNetty()); + setupServer(2, isFileStorage(), isNetty()); + + setupClusterConnection("cluster0", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2); + + setupClusterConnection("cluster1", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2); + + setupClusterConnection("cluster2", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1); + + servers[0].getConfiguration().setGroupingHandlerConfiguration(new GroupingHandlerConfiguration().setName(new SimpleString("grouparbitrator")).setType(GroupingHandlerConfiguration.TYPE.LOCAL).setTimeout(5000).setGroupTimeout(-1).setReaperPeriod(ActiveMQDefaultConfiguration.getDefaultGroupingHandlerReaperPeriod())); + servers[1].getConfiguration().setGroupingHandlerConfiguration(new GroupingHandlerConfiguration().setName(new SimpleString("grouparbitrator")).setType(GroupingHandlerConfiguration.TYPE.REMOTE).setTimeout(5000).setGroupTimeout(-1).setReaperPeriod(ActiveMQDefaultConfiguration.getDefaultGroupingHandlerReaperPeriod())); + servers[2].getConfiguration().setGroupingHandlerConfiguration(new GroupingHandlerConfiguration().setName(new SimpleString("grouparbitrator")).setType(GroupingHandlerConfiguration.TYPE.REMOTE).setTimeout(5000).setGroupTimeout(-1).setReaperPeriod(ActiveMQDefaultConfiguration.getDefaultGroupingHandlerReaperPeriod())); + + startServers(0, 1, 2); + + setupSessionFactory(0, isNetty()); + setupSessionFactory(1, isNetty()); + setupSessionFactory(2, isNetty()); + + createQueue(0, "queues.testaddress", "queue0", null, false); + createQueue(1, "queues.testaddress", "queue0", null, false); + createQueue(2, "queues.testaddress", "queue0", null, false); + + addConsumer(0, 0, "queue0", null); + addConsumer(1, 1, "queue0", null); + addConsumer(2, 2, "queue0", null); + + waitForBindings(0, "queues.testaddress", 1, 1, true); + waitForBindings(1, "queues.testaddress", 1, 1, true); + waitForBindings(2, "queues.testaddress", 1, 1, true); + + waitForBindings(0, "queues.testaddress", 2, 2, false); + waitForBindings(1, "queues.testaddress", 2, 2, false); + waitForBindings(2, "queues.testaddress", 2, 2, false); + + sendWithProperty(0, "queues.testaddress", 10, false, Message.HDR_GROUP_ID, new SimpleString("id1")); + + verifyReceiveAll(10, 0); + + } + // Fail a node where there's a consumer only.. with messages being sent by a node that is not the local @Test public void testGroupingSimpleFail2nd() throws Exception {