ARTEMIS-3777 fix MQTT request/response + nolocal

Removing the connection ID property from the actual *message* breaks the
nolocal functionality. Removing the property isn't necessary in the
first place so this commit reomves that code.
This commit is contained in:
Justin Bertram 2022-04-13 11:45:14 -05:00 committed by clebertsuconic
parent 1e1397935a
commit 7d11cf81ba
2 changed files with 71 additions and 4 deletions

View File

@ -45,7 +45,6 @@ import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.jboss.logging.Logger;
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.CONTENT_TYPE;
@ -395,9 +394,6 @@ public class MQTTPublishManager {
isRetain = false;
}
// [MQTT-3.8.3-3] remove property used for no-local implementation
message.removeProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME);
if (session.getState().getClientTopicAliasMaximum() != null) {
Integer alias = session.getState().getServerTopicAlias(address);
if (alias == null) {

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -34,6 +35,7 @@ import org.eclipse.paho.mqttv5.client.MqttAsyncClient;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.MqttSubscription;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.eclipse.paho.mqttv5.common.packet.MqttSubAck;
import org.jboss.logging.Logger;
import org.junit.Test;
@ -82,6 +84,75 @@ public class SubscribeTests extends MQTT5TestSupport {
client.close();
}
/*
* [MQTT-3.8.3-3]
*
* This test was adapted from Test.test_request_response in client_test5.py at https://github.com/eclipse/paho.mqtt.testing/tree/master/interoperability
*
* It involves 2 clients subscribing to and performing a request/response on the same topic so it's imperative they
* don't receive the messages that they send themselves.
*/
@Test(timeout = DEFAULT_TIMEOUT)
public void testRequestResponseNoLocal() throws Exception {
final String TOPIC = RandomUtil.randomString();
final String REQUEST = "request";
final String RESPONSE = "response";
final CountDownLatch aclientLatch = new CountDownLatch(2);
final CountDownLatch bclientLatch = new CountDownLatch(1);
MqttClient aclient = createPahoClient("aclientid");
aclient.connect();
aclient.setCallback(new DefaultMqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
assertEquals(RESPONSE, new String(message.getPayload()));
aclientLatch.countDown();
}
});
MqttClient bclient = createPahoClient("bclientid");
bclient.connect();
bclient.setCallback(new DefaultMqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
assertEquals(REQUEST, new String(message.getPayload()));
bclientLatch.countDown();
MqttMessage m = new MqttMessage();
m.setPayload(RESPONSE.getBytes(StandardCharsets.UTF_8));
m.setQos(1);
MqttProperties properties = new MqttProperties();
properties.setResponseTopic(TOPIC);
properties.setCorrelationData("334".getBytes(StandardCharsets.UTF_8));
m.setProperties(properties);
bclient.publish(TOPIC, m);
}
});
MqttSubscription sub = new MqttSubscription(TOPIC, 2);
sub.setNoLocal(true);
aclient.subscribe(new MqttSubscription[]{sub});
bclient.subscribe(new MqttSubscription[]{sub});
MqttMessage m = new MqttMessage();
m.setPayload(REQUEST.getBytes(StandardCharsets.UTF_8));
m.setQos(1);
MqttProperties properties = new MqttProperties();
properties.setResponseTopic(TOPIC);
properties.setCorrelationData("334".getBytes(StandardCharsets.UTF_8));
m.setProperties(properties);
aclient.publish(TOPIC, m);
assertTrue(bclientLatch.await(2, TimeUnit.SECONDS));
Wait.assertEquals(1L, () -> aclientLatch.getCount(), 2000, 100);
assertFalse(aclientLatch.await(2, TimeUnit.SECONDS));
aclient.disconnect();
aclient.close();
bclient.disconnect();
bclient.close();
}
/*
* [MQTT-3.8.4-1] When the Server receives a SUBSCRIBE packet from a Client, the Server MUST respond with a SUBACK packet.
*/