This commit is contained in:
Clebert Suconic 2018-02-19 11:33:09 -05:00
commit 26c284bf58
8 changed files with 145 additions and 5 deletions
artemis-server/src/main/java/org/apache/activemq/artemis/core
tests
integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported
unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes

View File

@ -22,6 +22,7 @@ import java.util.Set;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.transaction.Transaction;
@ -76,4 +77,6 @@ public interface AddressManager {
AddressInfo getAddressInfo(SimpleString address);
void updateMessageLoadBalancingTypeForAddress(SimpleString address, MessageLoadBalancingType messageLoadBalancingType) throws Exception;
}

View File

@ -29,6 +29,7 @@ import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.transaction.Transaction;
@ -150,4 +151,6 @@ public interface PostOffice extends ActiveMQComponent {
boolean isAddressBound(SimpleString address) throws Exception;
Set<SimpleString> getAddresses();
void updateMessageLoadBalancingTypeForAddress(SimpleString address, MessageLoadBalancingType messageLoadBalancingType) throws Exception;
}

View File

@ -69,6 +69,7 @@ import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
@ -1005,6 +1006,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
return addressManager.getAddresses();
}
@Override
public void updateMessageLoadBalancingTypeForAddress(SimpleString address, MessageLoadBalancingType messageLoadBalancingType) throws Exception {
addressManager.updateMessageLoadBalancingTypeForAddress(address, messageLoadBalancingType);
}
@Override
public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception {
return addressManager.getMatchingQueue(address, routingType);

View File

@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.postoffice.BindingsFactory;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.CompositeAddress;
@ -346,4 +347,9 @@ public class SimpleAddressManager implements AddressManager {
public AddressInfo getAddressInfo(SimpleString addressName) {
return addressInfoMap.get(addressName);
}
@Override
public void updateMessageLoadBalancingTypeForAddress(SimpleString address, MessageLoadBalancingType messageLoadBalancingType) throws Exception {
getBindingsForRoutingAddress(address).setMessageLoadBalancingType(messageLoadBalancingType);
}
}

View File

@ -23,6 +23,7 @@ import org.apache.activemq.artemis.core.postoffice.Address;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.BindingsFactory;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.transaction.Transaction;
@ -107,6 +108,24 @@ public class WildcardAddressManager extends SimpleAddressManager {
return exists;
}
@Override
public void updateMessageLoadBalancingTypeForAddress(SimpleString address, MessageLoadBalancingType messageLoadBalancingType) throws Exception {
Address add = addAndUpdateAddressMap(address);
Bindings bindingsForRoutingAddress = super.getBindingsForRoutingAddress(address);
if (bindingsForRoutingAddress != null) {
bindingsForRoutingAddress.setMessageLoadBalancingType(messageLoadBalancingType);
}
if (add.containsWildCard()) {
for (Address destAdd : add.getLinkedAddresses()) {
getBindingsForRoutingAddress(destAdd.getAddress()).setMessageLoadBalancingType(messageLoadBalancingType);
}
} else {
for (Address destAdd : add.getLinkedAddresses()) {
super.getBindingsForRoutingAddress(destAdd.getAddress()).setMessageLoadBalancingType(messageLoadBalancingType);
}
}
}
/**
* If the address is a wild card then the binding will be removed from the actual mappings for any linked address.
* otherwise it will be removed as normal.

View File

@ -48,7 +48,6 @@ import org.apache.activemq.artemis.core.client.impl.Topology;
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
@ -1233,9 +1232,13 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
} catch (Exception ignore) {
}
Bindings theBindings = postOffice.getBindingsForAddress(queueAddress);
theBindings.setMessageLoadBalancingType(messageLoadBalancingType);
try {
postOffice.updateMessageLoadBalancingTypeForAddress(queueAddress, messageLoadBalancingType);
} catch (Exception e) {
if (logger.isTraceEnabled()) {
logger.trace(e.getLocalizedMessage(), e);
}
}
}
@ -1256,7 +1259,8 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
RemoteQueueBinding binding = bindings.remove(clusterName);
if (binding == null) {
throw new IllegalStateException("Cannot find binding for queue " + clusterName);
logger.warn("Cannot remove binding, because cannot find binding for queue " + clusterName);
return;
}
postOffice.removeBinding(binding.getUniqueName(), null, false);

View File

@ -112,6 +112,99 @@ public class MqttClusterWildcardTest extends ClusterTestBase {
}
}
@Test
public void wildcardsWithBroker1Disconnected() throws Exception {
BlockingConnection connection1 = null;
BlockingConnection connection2 = null;
BlockingConnection connection3 = null;
final String TOPIC = "test/+/some/#";
try {
WildcardConfiguration wildcardConfiguration = new WildcardConfiguration();
wildcardConfiguration.setAnyWords('#');
wildcardConfiguration.setDelimiter('/');
wildcardConfiguration.setRoutingEnabled(true);
wildcardConfiguration.setSingleWord('+');
setupServer(0, false, isNetty());
servers[0].getConfiguration().setWildCardConfiguration(wildcardConfiguration);
setupClusterConnection("cluster0", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
startServers(0);
connection1 = retrieveMQTTConnection("tcp://localhost:61616");
// Subscribe to topics
Topic[] topics = {new Topic(TOPIC, QoS.AT_MOST_ONCE)};
connection1.subscribe(topics);
waitForBindings(0, TOPIC, 1, 1, true);
waitForBindings(0, TOPIC, 0, 0, false);
// Publish Messages
String payload1 = "This is message 1";
String payload2 = "This is message 2";
String payload3 = "This is message 3";
connection1.publish("test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false);
connection1.publish("test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false);
connection1.publish("test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false);
Message message1 = connection1.receive(5, TimeUnit.SECONDS);
setupServer(1, false, isNetty());
servers[1].getConfiguration().setWildCardConfiguration(wildcardConfiguration);
setupClusterConnection("cluster1", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
startServers(1);
connection2 = retrieveMQTTConnection("tcp://localhost:61617");
connection3 = retrieveMQTTConnection("tcp://localhost:61617");
connection2.subscribe(topics);
connection3.subscribe(new Topic[]{new Topic("teste/1/some/1", QoS.AT_MOST_ONCE)});
waitForBindings(1, TOPIC, 1, 1, false);
waitForBindings(1, TOPIC, 1, 1, true);
connection1.publish("test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false);
connection1.publish("test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false);
connection1.publish("test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false);
Message message2 = connection1.receive(5, TimeUnit.SECONDS);
Message message3 = connection1.receive(5, TimeUnit.SECONDS);
Message message4 = connection2.receive(5, TimeUnit.SECONDS);
Message message5 = connection2.receive(5, TimeUnit.SECONDS);
Message message6 = connection2.receive(5, TimeUnit.SECONDS);
assertEquals(payload1, new String(message1.getPayload()));
assertEquals(payload2, new String(message2.getPayload()));
assertEquals(payload3, new String(message3.getPayload()));
assertEquals(payload1, new String(message4.getPayload()));
assertEquals(payload2, new String(message5.getPayload()));
assertEquals(payload3, new String(message6.getPayload()));
} finally {
String[] topics = new String[]{TOPIC};
if (connection1 != null) {
connection1.unsubscribe(topics);
connection1.disconnect();
}
if (connection2 != null) {
connection2.unsubscribe(topics);
connection2.disconnect();
}
if (connection3 != null) {
connection3.unsubscribe(new String[]{"teste/1/some/1"});
connection3.disconnect();
}
}
}
private static BlockingConnection retrieveMQTTConnection(String host) throws Exception {
MQTT mqtt = new MQTT();
mqtt.setHost(host);

View File

@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
import org.apache.activemq.artemis.core.transaction.Transaction;
@ -68,6 +69,11 @@ public class FakePostOffice implements PostOffice {
return null;
}
@Override
public void updateMessageLoadBalancingTypeForAddress(SimpleString address, MessageLoadBalancingType messageLoadBalancingType) throws Exception {
}
@Override
public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) {