ARTEMIS-4196 - set message routing type in mqtt publish

This commit is contained in:
Gary Tully 2023-03-06 11:30:58 +00:00 committed by Justin Bertram
parent ed5322c54f
commit 00cae02ca4
5 changed files with 172 additions and 55 deletions

View File

@ -43,6 +43,7 @@ import org.apache.activemq.artemis.core.protocol.mqtt.exceptions.DisconnectExcep
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerProducer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.UUIDGenerator;
@ -223,8 +224,13 @@ public class MQTTPublishManager {
Transaction tx = session.getServerSession().newTransaction();
try {
if (session.getServer().getAddressInfo(address) == null && session.getServer().getAddressSettingsRepository().getMatch(coreAddress).isAutoCreateAddresses()) {
AddressInfo addressInfo = session.getServer().getAddressInfo(address);
if (addressInfo == null && session.getServer().getAddressSettingsRepository().getMatch(coreAddress).isAutoCreateAddresses()) {
session.getServerSession().createAddress(address, RoutingType.MULTICAST, true);
serverMessage.setRoutingType(RoutingType.MULTICAST);
}
if (addressInfo != null) {
serverMessage.setRoutingType(addressInfo.getRoutingType());
}
session.getServerSession().send(tx, serverMessage, true, senderName, false);

View File

@ -416,8 +416,10 @@ public final class BindingsImpl implements Bindings {
private static boolean matchBinding(final Message message,
final Binding binding,
final MessageLoadBalancingType loadBalancingType) {
if ((loadBalancingType.equals(MessageLoadBalancingType.OFF) || loadBalancingType.equals(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION) && !Objects.equals(message.getRoutingType(), RoutingType.MULTICAST)) && binding instanceof RemoteQueueBinding) {
return false;
if (loadBalancingType.equals(MessageLoadBalancingType.OFF) || loadBalancingType.equals(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION)) {
if (!Objects.equals(message.getRoutingType(), RoutingType.MULTICAST) && binding instanceof RemoteQueueBinding) {
return false;
}
}
final Filter filter = binding.getFilter();

View File

@ -1350,8 +1350,8 @@ public class MQTTTest extends MQTTTestSupport {
connection.start();
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue jmsQueue = s.createQueue(jmsTopicAddress);
MessageConsumer consumer = s.createConsumer(jmsQueue);
javax.jms.Topic jmsTopic = s.createTopic(jmsTopicAddress);
MessageConsumer consumer = s.createConsumer(jmsTopic);
provider.publish(address, RETAINED.getBytes(), AT_LEAST_ONCE, true);

View File

@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.mqtt;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.junit.Test;
public class MqttClusterRemoteSubscribeLoadBalanceOffTest extends ClusterTestBase {
@Override
protected boolean isResolveProtocols() {
return true;
}
public boolean isNetty() {
return true;
}
@Test
public void testPub0Sub1() throws Exception {
final String TOPIC = "test/1";
final String clientId1 = "clientId1";
final String clientId2 = "clientId2";
Topic[] topics = {new Topic(TOPIC, QoS.AT_MOST_ONCE)};
setupServers(TOPIC);
startServers(0, 1);
final BlockingConnection connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1);
final BlockingConnection connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2);
assertTrue("Should be connected", Wait.waitFor(() -> connection1.isConnected(), 5000, 100));
assertTrue("Should be connected", Wait.waitFor(() -> connection2.isConnected(), 5000, 100));
waitForTopology(servers[0], "cluster0", 2, 5000);
waitForTopology(servers[1], "cluster1", 2, 5000);
// Subscribe to topics
connection1.subscribe(topics);
connection2.subscribe(topics);
waitForBindings(0, TOPIC, 1, 1, false);
waitForBindings(1, TOPIC, 1, 1, false);
// Publish Messages
String payload1 = "This is message 1";
String payload2 = "This is message 2";
connection1.publish(TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false);
connection2.publish(TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false);
Message message1 = connection1.receive(5, TimeUnit.SECONDS);
message1.ack();
Message message2 = connection1.receive(5, TimeUnit.SECONDS);
message2.ack();
message1 = connection2.receive(5, TimeUnit.SECONDS);
message1.ack();
message2 = connection2.receive(5, TimeUnit.SECONDS);
message2.ack();
String[] topicsStrings = new String[]{TOPIC};
if (connection1 != null && connection1.isConnected()) {
connection1.unsubscribe(topicsStrings);
connection1.disconnect();
}
if (connection2 != null && connection2.isConnected()) {
connection2.unsubscribe(topicsStrings);
connection2.disconnect();
}
}
private static BlockingConnection retrieveMQTTConnection(String host, String clientId) throws Exception {
MQTT mqtt = new MQTT();
mqtt.setHost(host);
mqtt.setClientId(clientId);
mqtt.setConnectAttemptsMax(0);
mqtt.setReconnectAttemptsMax(0);
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
return connection;
}
private void setupServers(String address) throws Exception {
WildcardConfiguration wildcardConfiguration = createWildCardConfiguration();
CoreAddressConfiguration coreAddressConfiguration = createAddressConfiguration(address);
AddressSettings addressSettings = createAddressSettings();
setupServer(0, false, isNetty());
servers[0].getConfiguration().addAddressConfiguration(coreAddressConfiguration);
servers[0].getConfiguration().addAddressSetting("#", addressSettings);
servers[0].getConfiguration().setWildCardConfiguration(wildcardConfiguration);
setupServer(1, false, isNetty());
servers[1].getConfiguration().addAddressConfiguration(coreAddressConfiguration);
servers[1].getConfiguration().addAddressSetting("#", addressSettings);
servers[1].getConfiguration().setWildCardConfiguration(wildcardConfiguration);
setupClusterConnection("cluster0", "", MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION, 1, isNetty(), 0, 1);
setupClusterConnection("cluster1", "", MessageLoadBalancingType.OFF, 1, isNetty(), 1, 0);
}
private AddressSettings createAddressSettings() {
AddressSettings addressSettings = new AddressSettings();
addressSettings.setRedistributionDelay(0);
addressSettings.setDefaultAddressRoutingType(RoutingType.MULTICAST);
return addressSettings;
}
private CoreAddressConfiguration createAddressConfiguration(String TOPIC) {
CoreAddressConfiguration coreAddressConfiguration = new CoreAddressConfiguration();
coreAddressConfiguration.addRoutingType(RoutingType.MULTICAST);
coreAddressConfiguration.setName(TOPIC);
return coreAddressConfiguration;
}
private WildcardConfiguration createWildCardConfiguration() {
WildcardConfiguration wildcardConfiguration = new WildcardConfiguration();
wildcardConfiguration.setAnyWords('#');
wildcardConfiguration.setDelimiter('/');
wildcardConfiguration.setRoutingEnabled(true);
wildcardConfiguration.setSingleWord('+');
return wildcardConfiguration;
}
}

View File

@ -483,17 +483,8 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
pubConnection.publish("anycast/test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false);
pubConnection.publish("anycast/test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false);
Message message1 = subConnection2.receive(5, TimeUnit.SECONDS);
message1.ack();
Message message2 = subConnection2.receive(5, TimeUnit.SECONDS);
message2.ack();
Message message3 = subConnection2.receive(5, TimeUnit.SECONDS);
message3.ack();
assertEquals(payload1, new String(message1.getPayload()));
assertEquals(payload2, new String(message2.getPayload()));
assertEquals(payload3, new String(message3.getPayload()));
// pub queue gets auto created, the routing type set on the message to reflect that and the
// message does not get routed to the sub queue that has anycast
subConnection2.unsubscribe(new String[]{ANYCAST_TOPIC});
waitForBindings(0, ANYCAST_TOPIC, 1, 0, true);
@ -506,13 +497,6 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
pubConnection.publish("anycast/test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false);
pubConnection.publish("anycast/test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false);
Message message11 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
assertNull(message11);
Message message21 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
assertNull(message21);
Message message31 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
assertNull(message31);
} finally {
String[] topics = new String[]{ANYCAST_TOPIC};
if (subConnection1 != null && subConnection1.isConnected()) {
@ -576,20 +560,8 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
connection1.publish("anycast/test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false);
Message message1 = connection1.receive(5, TimeUnit.SECONDS);
assertNotNull(message1);
message1.ack();
Message message2 = connection2.receive(5, TimeUnit.SECONDS);
assertNotNull(message2);
message2.ack();
Message message3 = connection1.receive(5, TimeUnit.SECONDS);
assertNotNull(message3);
message3.ack();
assertEquals(payload1, new String(message1.getPayload()));
assertEquals(payload2, new String(message2.getPayload()));
assertEquals(payload3, new String(message3.getPayload()));
// the pub queue is auto created and the message multicast routing type won't match the anycast sub queue
// so nothing gets routed to this queue
connection2.unsubscribe(new String[]{ANYCAST_TOPIC});
@ -603,24 +575,6 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
connection1.publish("anycast/test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false);
connection1.publish("anycast/test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false);
Message message11 = connection1.receive(5, TimeUnit.SECONDS);
assertNotNull(message11);
message11.ack();
Message message21 = connection1.receive(5, TimeUnit.SECONDS);
assertNotNull(message21);
message21.ack();
Message message31 = connection1.receive(5, TimeUnit.SECONDS);
assertNotNull(message31);
message31.ack();
String message11String = new String(message11.getPayload());
String message21String = new String(message21.getPayload());
String message31String = new String(message31.getPayload());
assertTrue(payload1.equals(message11String) || payload1.equals(message21String) || payload1.equals(message31String) );
assertTrue(payload2.equals(message11String) || payload2.equals(message21String) || payload2.equals(message31String) );
assertTrue(payload3.equals(message11String) || payload3.equals(message21String) || payload3.equals(message31String) );
} finally {
String[] topics = new String[]{ANYCAST_TOPIC};
if (connection1 != null && connection1.isConnected()) {