This commit is contained in:
Michael Andre Pearce 2018-02-26 19:13:23 +00:00
commit fb62fe9499
3 changed files with 50 additions and 2 deletions

View File

@ -1671,7 +1671,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
private void parseGroupingHandlerConfiguration(final Element node, final Configuration mainConfiguration) {
String name = node.getAttribute("name");
String type = getString(node, "type", null, Validators.NOT_NULL_OR_EMPTY);
String address = getString(node, "address", null, Validators.NOT_NULL_OR_EMPTY);
String address = getString(node, "address", "", Validators.NO_CHECK);
Integer timeout = getInteger(node, "timeout", ActiveMQDefaultConfiguration.getDefaultGroupingHandlerTimeout(), Validators.GT_ZERO);
Long groupTimeout = getLong(node, "group-timeout", ActiveMQDefaultConfiguration.getDefaultGroupingHandlerGroupTimeout(), Validators.MINUS_ONE_OR_GT_ZERO);
Long reaperPeriod = getLong(node, "reaper-period", ActiveMQDefaultConfiguration.getDefaultGroupingHandlerReaperPeriod(), Validators.GT_ZERO);

View File

@ -2525,7 +2525,7 @@
</xsd:restriction>
</xsd:simpleType>
</xsd:element>
<xsd:element name="address" type="xsd:string" maxOccurs="1" minOccurs="1">
<xsd:element name="address" type="xsd:string" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
A reference to a cluster connection address

View File

@ -25,6 +25,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -286,6 +287,53 @@ public class ClusteredGroupingTest extends ClusterTestBase {
}
/**
* This is the same test as testGroupingSimple() just with the "address" removed from the cluster-connection and grouping-handler
*/
@Test
public void testGroupingSimpleWithNoAddress() throws Exception {
setupServer(0, isFileStorage(), isNetty());
setupServer(1, isFileStorage(), isNetty());
setupServer(2, isFileStorage(), isNetty());
setupClusterConnection("cluster0", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2);
setupClusterConnection("cluster1", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2);
setupClusterConnection("cluster2", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1);
servers[0].getConfiguration().setGroupingHandlerConfiguration(new GroupingHandlerConfiguration().setName(new SimpleString("grouparbitrator")).setType(GroupingHandlerConfiguration.TYPE.LOCAL).setTimeout(5000).setGroupTimeout(-1).setReaperPeriod(ActiveMQDefaultConfiguration.getDefaultGroupingHandlerReaperPeriod()));
servers[1].getConfiguration().setGroupingHandlerConfiguration(new GroupingHandlerConfiguration().setName(new SimpleString("grouparbitrator")).setType(GroupingHandlerConfiguration.TYPE.REMOTE).setTimeout(5000).setGroupTimeout(-1).setReaperPeriod(ActiveMQDefaultConfiguration.getDefaultGroupingHandlerReaperPeriod()));
servers[2].getConfiguration().setGroupingHandlerConfiguration(new GroupingHandlerConfiguration().setName(new SimpleString("grouparbitrator")).setType(GroupingHandlerConfiguration.TYPE.REMOTE).setTimeout(5000).setGroupTimeout(-1).setReaperPeriod(ActiveMQDefaultConfiguration.getDefaultGroupingHandlerReaperPeriod()));
startServers(0, 1, 2);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
createQueue(0, "queues.testaddress", "queue0", null, false);
createQueue(1, "queues.testaddress", "queue0", null, false);
createQueue(2, "queues.testaddress", "queue0", null, false);
addConsumer(0, 0, "queue0", null);
addConsumer(1, 1, "queue0", null);
addConsumer(2, 2, "queue0", null);
waitForBindings(0, "queues.testaddress", 1, 1, true);
waitForBindings(1, "queues.testaddress", 1, 1, true);
waitForBindings(2, "queues.testaddress", 1, 1, true);
waitForBindings(0, "queues.testaddress", 2, 2, false);
waitForBindings(1, "queues.testaddress", 2, 2, false);
waitForBindings(2, "queues.testaddress", 2, 2, false);
sendWithProperty(0, "queues.testaddress", 10, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
verifyReceiveAll(10, 0);
}
// Fail a node where there's a consumer only.. with messages being sent by a node that is not the local
@Test
public void testGroupingSimpleFail2nd() throws Exception {