ARTEMIS-780 Added ANYCAST routing to local queues

This commit is contained in:
Martyn Taylor 2016-10-24 14:27:00 +01:00
parent 5e7475115d
commit 2d02a26527
18 changed files with 591 additions and 24 deletions

View File

@ -54,6 +54,8 @@ public interface AddressManager {
AddressInfo addAddressInfo(AddressInfo addressInfo);
AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo);
AddressInfo removeAddressInfo(SimpleString address);
AddressInfo getAddressInfo(SimpleString address);

View File

@ -45,6 +45,8 @@ public interface PostOffice extends ActiveMQComponent {
AddressInfo addAddressInfo(AddressInfo addressInfo);
AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo);
AddressInfo removeAddressInfo(SimpleString address);
AddressInfo getAddressInfo(SimpleString address);

View File

@ -262,6 +262,7 @@ public final class BindingsImpl implements Bindings {
boolean routed = false;
for (Binding binding : exclusiveBindings) {
if (binding.getFilter() == null || binding.getFilter().match(message)) {
binding.getBindable().route(message, context);

View File

@ -24,10 +24,11 @@ import org.apache.activemq.artemis.core.server.Bindable;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
public class LocalQueueBinding implements QueueBinding {
private final SimpleString address;
private final AddressInfo address;
private final Queue queue;
@ -37,7 +38,7 @@ public class LocalQueueBinding implements QueueBinding {
private final SimpleString clusterName;
public LocalQueueBinding(final SimpleString address, final Queue queue, final SimpleString nodeID) {
public LocalQueueBinding(final AddressInfo address, final Queue queue, final SimpleString nodeID) {
this.address = address;
this.queue = queue;
@ -61,7 +62,7 @@ public class LocalQueueBinding implements QueueBinding {
@Override
public SimpleString getAddress() {
return address;
return address.getName();
}
@Override
@ -76,7 +77,7 @@ public class LocalQueueBinding implements QueueBinding {
@Override
public SimpleString getRoutingName() {
return name;
return (address.getRoutingType() == AddressInfo.RoutingType.MULTICAST) ? name : address.getName();
}
@Override

View File

@ -424,6 +424,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
return addressManager.addAddressInfo(addressInfo);
}
@Override
public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) {
return addressManager.addOrUpdateAddressInfo(addressInfo);
}
@Override
public AddressInfo removeAddressInfo(SimpleString address) {
return addressManager.removeAddressInfo(address);

View File

@ -187,6 +187,21 @@ public class SimpleAddressManager implements AddressManager {
return addressInfoMap.putIfAbsent(addressInfo.getName(), addressInfo);
}
@Override
public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) {
AddressInfo from = addAddressInfo(addressInfo);
return (from == null) ? addressInfo : updateAddressInfo(from, addressInfo);
}
private AddressInfo updateAddressInfo(AddressInfo from, AddressInfo to) {
synchronized (from) {
from.setRoutingType(to.getRoutingType());
from.setDefaultMaxConsumers(to.getDefaultMaxConsumers());
from.setDefaultDeleteOnNoConsumers(to.isDefaultDeleteOnNoConsumers());
return from;
}
}
@Override
public AddressInfo removeAddressInfo(SimpleString address) {
return addressInfoMap.remove(address);

View File

@ -425,7 +425,7 @@ public interface ActiveMQServer extends ActiveMQComponent {
void removeClientConnection(String clientId);
AddressInfo addAddressInfo(AddressInfo addressInfo);
AddressInfo createOrUpdateAddressInfo(AddressInfo addressInfo);
AddressInfo removeAddressInfo(SimpleString address);
}

View File

@ -2163,7 +2163,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
info.setDefaultDeleteOnNoConsumers(config.getDefaultDeleteOnNoConsumers());
info.setDefaultMaxConsumers(config.getDefaultMaxConsumers());
addAddressInfo(info);
createOrUpdateAddressInfo(info);
deployQueuesFromListCoreQueueConfiguration(config.getQueueConfigurations());
}
}
@ -2267,8 +2267,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
@Override
public AddressInfo addAddressInfo(AddressInfo addressInfo) {
return postOffice.addAddressInfo(addressInfo);
public AddressInfo createOrUpdateAddressInfo(AddressInfo addressInfo) {
return postOffice.addOrUpdateAddressInfo(addressInfo);
}
@Override
@ -2278,7 +2278,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
@Override
public AddressInfo getAddressInfo(SimpleString address) {
return postOffice.removeAddressInfo(address);
return postOffice.getAddressInfo(address);
}
private Queue createQueue(final SimpleString addressName,
@ -2314,15 +2314,13 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).build();
final Queue queue = queueFactory.createQueueWith(queueConfig);
addAddressInfo(new AddressInfo(queue.getAddress()));
if (transientQueue) {
queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queue.getName()));
} else if (queue.isAutoCreated()) {
queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(this.getJMSQueueDeleter(), queue.getName()));
}
final QueueBinding localQueueBinding = new LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId());
final QueueBinding localQueueBinding = new LocalQueueBinding(getAddressInfo(queue.getAddress()), queue, nodeManager.getNodeId());
if (queue.isDurable()) {
storageManager.addQueueBinding(txID, localQueueBinding);

View File

@ -22,7 +22,7 @@ public class AddressInfo {
private final SimpleString name;
private RoutingType routingType = RoutingType.Multicast;
private RoutingType routingType = RoutingType.MULTICAST;
private boolean defaultDeleteOnNoConsumers;
@ -61,13 +61,13 @@ public class AddressInfo {
}
public enum RoutingType {
Multicast, Anycast;
MULTICAST, ANYCAST;
public byte getType() {
switch (this) {
case Multicast:
case MULTICAST:
return 0;
case Anycast:
case ANYCAST:
return 1;
default:
return -1;
@ -77,9 +77,9 @@ public class AddressInfo {
public static RoutingType getType(byte type) {
switch (type) {
case 0:
return Multicast;
return MULTICAST;
case 1:
return Anycast;
return ANYCAST;
default:
return null;
}

View File

@ -155,7 +155,8 @@ public class PostOfficeJournalLoader implements JournalLoader {
}
}
final Binding binding = new LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId());
final Binding binding = new LocalQueueBinding(postOffice.getAddressInfo(queue.getAddress()), queue, nodeManager.getNodeId());
queues.put(queue.getID(), queue);
postOffice.addBinding(binding);
managementService.registerAddress(queue.getAddress());

View File

@ -68,6 +68,10 @@ public class QueueFactoryImpl implements QueueFactory {
@Override
public Queue createQueueWith(final QueueConfig config) {
// Add default address info if one doesn't exist
postOffice.addAddressInfo(new AddressInfo(config.address()));
final AddressSettings addressSettings = addressSettingsRepository.getMatch(config.address().toString());
final Queue queue;
if (addressSettings.isLastValueQueue()) {
@ -89,6 +93,10 @@ public class QueueFactoryImpl implements QueueFactory {
final boolean durable,
final boolean temporary,
final boolean autoCreated) {
// Add default address info if one doesn't exist
postOffice.addAddressInfo(new AddressInfo(address));
AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());
Queue queue;

View File

@ -376,7 +376,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
// Addr 1
CoreAddressConfiguration addressConfiguration = conf.getAddressConfigurations().get(0);
assertEquals("addr1", addressConfiguration.getName());
assertEquals(AddressInfo.RoutingType.Anycast, addressConfiguration.getRoutingType());
assertEquals(AddressInfo.RoutingType.ANYCAST, addressConfiguration.getRoutingType());
assertEquals(2, addressConfiguration.getQueueConfigurations().size());
// Addr 1 Queue 1
@ -402,7 +402,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
// Addr 2
addressConfiguration = conf.getAddressConfigurations().get(1);
assertEquals("addr2", addressConfiguration.getName());
assertEquals(AddressInfo.RoutingType.Multicast, addressConfiguration.getRoutingType());
assertEquals(AddressInfo.RoutingType.MULTICAST, addressConfiguration.getRoutingType());
assertEquals(2, addressConfiguration.getQueueConfigurations().size());
// Addr 2 Queue 1

View File

@ -16,6 +16,245 @@
*/
package org.apache.activemq.artemis.tests.integration.addressing;
public class AddressingTest {
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
public class AddressingTest extends ActiveMQTestBase {
private ActiveMQServer server;
private ClientSessionFactory sessionFactory;
@Before
public void setup() throws Exception {
server = createServer(true);
server.start();
server.waitForActivation(10, TimeUnit.SECONDS);
ServerLocator sl = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
sessionFactory = sl.createSessionFactory();
addSessionFactory(sessionFactory);
}
@Test
public void testMulticastRouting() throws Exception {
SimpleString sendAddress = new SimpleString("test.address");
List<String> testAddresses = Arrays.asList("test.address", "test.#", "test.*");
for (String consumeAddress : testAddresses) {
// For each address, create 2 Queues with the same address, assert both queues receive message
AddressInfo addressInfo = new AddressInfo(new SimpleString(consumeAddress));
addressInfo.setRoutingType(AddressInfo.RoutingType.MULTICAST);
server.createOrUpdateAddressInfo(addressInfo);
Queue q1 = server.createQueue(new SimpleString(consumeAddress), new SimpleString(consumeAddress + ".1"), null, true, false);
Queue q2 = server.createQueue(new SimpleString(consumeAddress), new SimpleString(consumeAddress + ".2"), null, true, false);
ClientSession session = sessionFactory.createSession();
session.start();
ClientConsumer consumer1 = session.createConsumer(q1.getName());
ClientConsumer consumer2 = session.createConsumer(q2.getName());
ClientProducer producer = session.createProducer(sendAddress);
ClientMessage m = session.createMessage(ClientMessage.TEXT_TYPE, true);
m.getBodyBuffer().writeString("TestMessage");
producer.send(m);
assertNotNull(consumer1.receive(2000));
assertNotNull(consumer2.receive(2000));
q1.deleteQueue();
q2.deleteQueue();
}
}
@Test
public void testAnycastRouting() throws Exception {
SimpleString sendAddress = new SimpleString("test.address");
List<String> testAddresses = Arrays.asList("test.address", "test.#", "test.*");
for (String consumeAddress : testAddresses) {
// For each address, create 2 Queues with the same address, assert one queue receive message
AddressInfo addressInfo = new AddressInfo(new SimpleString(consumeAddress));
addressInfo.setRoutingType(AddressInfo.RoutingType.ANYCAST);
server.createOrUpdateAddressInfo(addressInfo);
Queue q1 = server.createQueue(new SimpleString(consumeAddress), new SimpleString(consumeAddress + ".1"), null, true, false);
Queue q2 = server.createQueue(new SimpleString(consumeAddress), new SimpleString(consumeAddress + ".2"), null, true, false);
ClientSession session = sessionFactory.createSession();
session.start();
ClientConsumer consumer1 = session.createConsumer(q1.getName());
ClientConsumer consumer2 = session.createConsumer(q2.getName());
ClientProducer producer = session.createProducer(sendAddress);
ClientMessage m = session.createMessage(ClientMessage.TEXT_TYPE, true);
m.getBodyBuffer().writeString("TestMessage");
producer.send(m);
int count = 0;
count = (consumer1.receive(1000) == null) ? count : count + 1;
count = (consumer2.receive(1000) == null) ? count : count + 1;
assertEquals(1, count);
q1.deleteQueue();
q2.deleteQueue();
}
}
@Test
public void testAnycastRoutingRoundRobin() throws Exception {
SimpleString address = new SimpleString("test.address");
AddressInfo addressInfo = new AddressInfo(address);
addressInfo.setRoutingType(AddressInfo.RoutingType.ANYCAST);
server.createOrUpdateAddressInfo(addressInfo);
Queue q1 = server.createQueue(address, address.concat(".1"), null, true, false);
Queue q2 = server.createQueue(address, address.concat(".2"), null, true, false);
Queue q3 = server.createQueue(address, address.concat(".3"), null, true, false);
ClientSession session = sessionFactory.createSession();
session.start();
ClientProducer producer = session.createProducer(address);
ClientConsumer consumer1 = session.createConsumer(q1.getName());
ClientConsumer consumer2 = session.createConsumer(q2.getName());
ClientConsumer consumer3 = session.createConsumer(q3.getName());
List<ClientConsumer> consumers = new ArrayList<>(Arrays.asList(new ClientConsumer[] {consumer1, consumer2, consumer3}));
List<String> messages = new ArrayList<>();
messages.add("Message1");
messages.add("Message2");
messages.add("Message3");
ClientMessage clientMessage;
for (String message : messages) {
clientMessage = session.createMessage(true);
clientMessage.getBodyBuffer().writeString(message);
producer.send(clientMessage);
}
String m;
for (ClientConsumer consumer : consumers) {
clientMessage = consumer.receive(1000);
m = clientMessage.getBodyBuffer().readString();
messages.remove(m);
}
assertTrue(messages.isEmpty());
// Check we don't receive more messages
int count = 0;
for (ClientConsumer consumer : consumers) {
count = (consumer.receive(1000) == null) ? count : count + 1;
}
assertEquals(0, count);
}
@Test
public void testMulticastRoutingBackwardsCompat() throws Exception {
SimpleString sendAddress = new SimpleString("test.address");
List<String> testAddresses = Arrays.asList("test.address", "test.#", "test.*");
for (String consumeAddress : testAddresses) {
// For each address, create 2 Queues with the same address, assert both queues receive message
Queue q1 = server.createQueue(new SimpleString(consumeAddress), new SimpleString(consumeAddress + ".1"), null, true, false);
Queue q2 = server.createQueue(new SimpleString(consumeAddress), new SimpleString(consumeAddress + ".2"), null, true, false);
ClientSession session = sessionFactory.createSession();
session.start();
ClientConsumer consumer1 = session.createConsumer(q1.getName());
ClientConsumer consumer2 = session.createConsumer(q2.getName());
ClientProducer producer = session.createProducer(sendAddress);
ClientMessage m = session.createMessage(ClientMessage.TEXT_TYPE, true);
m.getBodyBuffer().writeString("TestMessage");
producer.send(m);
assertNotNull(consumer1.receive(2000));
assertNotNull(consumer2.receive(2000));
q1.deleteQueue();
q2.deleteQueue();
}
}
@Ignore
@Test
public void testDeleteQueueOnNoConsumersTrue() {
fail("Not Implemented");
}
@Ignore
@Test
public void testDeleteQueueOnNoConsumersFalse() {
fail("Not Implemented");
}
@Ignore
@Test
public void testLimitOnMaxConsumers() {
fail("Not Implemented");
}
@Ignore
@Test
public void testUnlimitedMaxConsumers() {
fail("Not Implemented");
}
@Ignore
@Test
public void testDefaultMaxConsumersFromAddress() {
fail("Not Implemented");
}
@Ignore
@Test
public void testDefaultDeleteOnNoConsumersFromAddress() {
fail("Not Implemented");
}
}

View File

@ -353,7 +353,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
long txID = server.getStorageManager().generateID();
// Forcing a situation where the server would unexpectedly create a duplicated queue. The server should still start normally
LocalQueueBinding newBinding = new LocalQueueBinding(QUEUE, new QueueImpl(queueID, QUEUE, QUEUE, null, null, true, false, false, null, null, null, null, null), server.getNodeID());
LocalQueueBinding newBinding = new LocalQueueBinding(server.getAddressInfo(QUEUE), new QueueImpl(queueID, QUEUE, QUEUE, null, null, true, false, false, null, null, null, null, null), server.getNodeID());
server.getStorageManager().addQueueBinding(txID, newBinding);
server.getStorageManager().commitBindings(txID);

View File

@ -0,0 +1,276 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.cluster.distribution;
import java.util.List;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Test;
public class AnycastRoutingWithClusterTest extends ClusterTestBase {
/**
* Test anycast address with single distributed queue in a 3 node cluster environment. Messages should be
* "round robin"'d across the each queue
* @throws Exception
*/
@Test
public void testAnycastAddressOneQueueRoutingMultiNode() throws Exception {
String address = "test.address";
String queueName = "test.queue";
String clusterAddress = "test";
for (int i = 0; i < 3; i++) {
setupServer(i, isFileStorage(), isNetty());
}
setupClusterConnection("cluster0", clusterAddress, MessageLoadBalancingType.STRICT, 1, isNetty(), 0, 1, 2);
setupClusterConnection("cluster1", clusterAddress, MessageLoadBalancingType.STRICT, 1, isNetty(), 1, 0, 2);
setupClusterConnection("cluster2", clusterAddress, MessageLoadBalancingType.STRICT, 1, isNetty(), 2, 0, 1);
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
startServers(0, 1, 2);
List<Queue> queues;
for (int i = 0; i < 3; i++) {
createAddressInfo(i, address, AddressInfo.RoutingType.ANYCAST, -1, false);
setupSessionFactory(i, isNetty());
createQueue(i, address, queueName, null, false);
addConsumer(i, i, queueName, null);
}
for (int i = 0; i < 3; i++) {
waitForBindings(i, address, 1, 1, true);
waitForBindings(i, address, 2, 2, false);
}
final int noMessages = 30;
send(0, address, noMessages, true, null, null);
for (int s = 0; s < 3; s++) {
final Queue queue = servers[s].locateQueue(new SimpleString(queueName));
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return queue.getMessageCount() == noMessages / 3;
}
});
}
// Each consumer should receive noMessages / noServers
for (int i = 0; i < noMessages / 3; i++) {
for (int c = 0; c < 3; c++) {
assertNotNull(consumers[c].consumer.receive(1000));
}
}
}
/**
* Test anycast address with N queues in a 3 node cluster environment. Messages should be "round robin"'d across the
* each queue.
* @throws Exception
*/
@Test
public void testAnycastAddressMultiQueuesRoutingMultiNode() throws Exception {
String address = "test.address";
String queueNamePrefix = "test.queue";
String clusterAddress = "test";
for (int i = 0; i < 3; i++) {
setupServer(i, isFileStorage(), isNetty());
}
setupClusterConnection("cluster0", clusterAddress, MessageLoadBalancingType.STRICT, 1, isNetty(), 0, 1, 2);
setupClusterConnection("cluster1", clusterAddress, MessageLoadBalancingType.STRICT, 1, isNetty(), 1, 0, 2);
setupClusterConnection("cluster2", clusterAddress, MessageLoadBalancingType.STRICT, 1, isNetty(), 2, 0, 1);
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
startServers(0, 1, 2);
List<Queue> queues;
for (int i = 0; i < 3; i++) {
createAddressInfo(i, address, AddressInfo.RoutingType.ANYCAST, -1, false);
setupSessionFactory(i, isNetty());
createQueue(i, address, queueNamePrefix + i, null, false);
addConsumer(i, i, queueNamePrefix + i, null);
}
for (int i = 0; i < 3; i++) {
waitForBindings(i, address, 1, 1, true);
waitForBindings(i, address, 2, 2, false);
}
final int noMessages = 30;
send(0, address, noMessages, true, null, null);
for (int s = 0; s < 3; s++) {
final Queue queue = servers[s].locateQueue(new SimpleString(queueNamePrefix + s));
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return queue.getMessageCount() == noMessages / 3;
}
});
}
// Each consumer should receive noMessages / noServers
for (int i = 0; i < noMessages / 3; i++) {
for (int c = 0; c < 3; c++) {
assertNotNull(consumers[c].consumer.receive(1000));
}
}
}
/**
* Test anycast address with N queues in a 3 node cluster environment. Messages should be "round robin"'d across the
* each queue.
* @throws Exception
*/
@Test
public void testAnycastAddressMultiQueuesWithFilterRoutingMultiNode() throws Exception {
String address = "test.address";
String queueNamePrefix = "test.queue";
String clusterAddress = "test";
for (int i = 0; i < 3; i++) {
setupServer(i, isFileStorage(), isNetty());
}
setupClusterConnection("cluster0", clusterAddress, MessageLoadBalancingType.STRICT, 1, isNetty(), 0, 1, 2);
setupClusterConnection("cluster1", clusterAddress, MessageLoadBalancingType.STRICT, 1, isNetty(), 1, 0, 2);
setupClusterConnection("cluster2", clusterAddress, MessageLoadBalancingType.STRICT, 1, isNetty(), 2, 0, 1);
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
startServers(0, 1, 2);
List<Queue> queues;
for (int i = 0; i < 3; i++) {
createAddressInfo(i, address, AddressInfo.RoutingType.ANYCAST, -1, false);
setupSessionFactory(i, isNetty());
}
String filter1 = "giraffe";
String filter2 = "platypus";
createQueue(0, address, queueNamePrefix + 0, filter1, false);
createQueue(1, address, queueNamePrefix + 1, filter1, false);
createQueue(2, address, queueNamePrefix + 2, filter2, false);
for (int i = 0; i < 3; i++) {
addConsumer(i, i, queueNamePrefix + i, null);
}
for (int i = 0; i < 3; i++) {
waitForBindings(i, address, 1, 1, true);
waitForBindings(i, address, 2, 2, false);
}
final int noMessages = 30;
send(0, address, noMessages, true, filter1, null);
// Each consumer should receive noMessages / noServers
for (int i = 0; i < noMessages / 2; i++) {
for (int c = 0; c < 2; c++) {
assertNotNull(consumers[c].consumer.receive(1000));
}
}
assertNull(consumers[2].consumer.receive(1000));
}
/**
* Test multicast address that with N queues in a 3 node cluster environment. Each queue should receive all messages
* sent from the client.
* @throws Exception
*/
@Test
public void testMulitcastAddressMultiQueuesRoutingMultiNode() throws Exception {
String address = "test.address";
String queueNamePrefix = "test.queue";
String clusterAddress = "test";
for (int i = 0; i < 3; i++) {
setupServer(i, isFileStorage(), isNetty());
}
setupClusterConnection("cluster0", clusterAddress, MessageLoadBalancingType.STRICT, 1, isNetty(), 0, 1, 2);
setupClusterConnection("cluster1", clusterAddress, MessageLoadBalancingType.STRICT, 1, isNetty(), 1, 0, 2);
setupClusterConnection("cluster2", clusterAddress, MessageLoadBalancingType.STRICT, 1, isNetty(), 2, 0, 1);
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
startServers(0, 1, 2);
List<Queue> queues;
for (int i = 0; i < 3; i++) {
createAddressInfo(i, address, AddressInfo.RoutingType.MULTICAST, -1, false);
setupSessionFactory(i, isNetty());
createQueue(i, address, queueNamePrefix + i, null, false);
addConsumer(i, i, queueNamePrefix + i, null);
}
for (int i = 0; i < 3; i++) {
waitForBindings(i, address, 1, 1, true);
waitForBindings(i, address, 2, 2, false);
}
final int noMessages = 30;
send(0, address, noMessages, true, null, null);
for (int s = 0; s < 3; s++) {
final Queue queue = servers[s].locateQueue(new SimpleString(queueNamePrefix + s));
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return queue.getMessageCount() == noMessages;
}
});
}
// Each consumer should receive noMessages
for (int i = 0; i < noMessages; i++) {
for (int c = 0; c < 3; c++) {
assertNotNull(consumers[c].consumer.receive(1000));
}
}
}
private boolean isNetty() {
return true;
}
}

View File

@ -78,6 +78,7 @@ import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancing
import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum;
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
@ -518,6 +519,19 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
session.close();
}
protected void createAddressInfo(final int node,
final String address,
final AddressInfo.RoutingType routingType,
final int defaulMaxConsumers,
boolean defaultDeleteOnNoConsumers) {
AddressInfo addressInfo = new AddressInfo(new SimpleString(address));
addressInfo.setRoutingType(routingType);
addressInfo.setDefaultMaxConsumers(defaulMaxConsumers);
addressInfo.setDefaultDeleteOnNoConsumers(defaultDeleteOnNoConsumers);
servers[node].createOrUpdateAddressInfo(addressInfo);
}
protected void deleteQueue(final int node, final String queueName) throws Exception {
ClientSessionFactory sf = sfs[node];

View File

@ -83,7 +83,7 @@ public class TopicCleanupTest extends JMSTestBase {
final Queue queue = new QueueImpl(storage.generateID(), SimpleString.toSimpleString("jms.topic.topic"), SimpleString.toSimpleString("jms.topic.topic"), FilterImpl.createFilter(ActiveMQServerImpl.GENERIC_IGNORED_FILTER), null, true, false, false, server.getScheduledPool(), server.getPostOffice(), storage, server.getAddressSettingsRepository(), server.getExecutorFactory().getExecutor());
LocalQueueBinding binding = new LocalQueueBinding(queue.getAddress(), queue, server.getNodeID());
LocalQueueBinding binding = new LocalQueueBinding(server.getAddressInfo(queue.getAddress()), queue, server.getNodeID());
storage.addQueueBinding(txid, binding);

View File

@ -65,6 +65,11 @@ public class FakePostOffice implements PostOffice {
return null;
}
@Override
public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) {
return null;
}
@Override
public AddressInfo removeAddressInfo(SimpleString address) {