From 59773c284983f7919c044c54f1a6e2039023701f Mon Sep 17 00:00:00 2001 From: Martyn Taylor Date: Mon, 13 Feb 2017 16:21:57 +0000 Subject: [PATCH] ARTEMIS-960 Do not encode Will messages --- .../protocol/mqtt/MQTTConnectionManager.java | 8 ++- .../artemis/core/protocol/mqtt/MQTTUtil.java | 11 ---- .../integration/mqtt/imported/MQTTTest.java | 52 +++++++++++++++++++ 3 files changed, 59 insertions(+), 12 deletions(-) diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java index c623f3b6ab..062eb41dd5 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java @@ -17,9 +17,12 @@ package org.apache.activemq.artemis.core.protocol.mqtt; +import java.nio.charset.Charset; import java.util.Set; import java.util.UUID; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.handler.codec.mqtt.MqttConnectReturnCode; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -75,7 +78,10 @@ public class MQTTConnectionManager { session.setIsClean(cleanSession); if (will) { - ServerMessage w = MQTTUtil.createServerMessageFromString(session, willMessage, willTopic, willQosLevel, willRetain); + byte[] payload = willMessage.getBytes(Charset.forName("UTF-8")); + ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(payload.length); + buf.writeBytes(payload); + ServerMessage w = MQTTUtil.createServerMessageFromByteBuf(session, willTopic, willRetain, willQosLevel, buf); session.getSessionState().setWillMessage(w); } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java index bdc16b1f54..4819006274 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java @@ -115,7 +115,6 @@ public class MQTTUtil { message.setAddress(address); message.putBooleanProperty(new SimpleString(MQTT_MESSAGE_RETAIN_KEY), retain); message.putIntProperty(new SimpleString(MQTT_QOS_LEVEL_KEY), qos); - // For JMS Consumption message.setType(Message.BYTES_TYPE); return message; } @@ -133,16 +132,6 @@ public class MQTTUtil { return message; } - public static ServerMessage createServerMessageFromString(MQTTSession session, - String payload, - String topic, - int qos, - boolean retain) { - ServerMessage message = createServerMessage(session, new SimpleString(topic), retain, qos); - message.getBodyBuffer().writeString(payload); - return message; - } - public static ServerMessage createPubRelMessage(MQTTSession session, SimpleString address, int messageId) { ServerMessage message = createServerMessage(session, address, false, 1); message.putIntProperty(new SimpleString(MQTTUtil.MQTT_MESSAGE_ID_KEY), messageId); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java index d5050daec6..d359f2e6aa 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java @@ -45,6 +45,7 @@ import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.tests.util.Wait; import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.MQTT; +import org.fusesource.mqtt.client.MQTTException; import org.fusesource.mqtt.client.Message; import org.fusesource.mqtt.client.QoS; import org.fusesource.mqtt.client.Topic; @@ -997,6 +998,39 @@ public class MQTTTest extends MQTTTestSupport { newConnection.disconnect(); } + @Test(timeout = 60 * 1000) + public void testClientConnectionFailureSendsWillMessage() throws Exception { + getServer().createQueue(SimpleString.toSimpleString("will"), RoutingType.MULTICAST, SimpleString.toSimpleString("will"), null, true, false); + + MQTT mqtt = createMQTTConnection("1", false); + mqtt.setKeepAlive((short) 1); + mqtt.setWillMessage("test message"); + mqtt.setWillTopic("will"); + mqtt.setWillQos(QoS.AT_LEAST_ONCE); + + final BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisfied() throws Exception { + return connection.isConnected(); + } + }); + + MQTT mqtt2 = createMQTTConnection("2", false); + BlockingConnection connection2 = mqtt2.blockingConnection(); + connection2.connect(); + connection2.subscribe(new Topic[]{new Topic("will", QoS.AT_LEAST_ONCE)}); + + // kill transport + connection.kill(); + + // FIXME Wait for the previous connection to timeout. This is not required in ActiveMQ. Needs investigating. + Thread.sleep(10000); + Message m = connection2.receive(1000, TimeUnit.MILLISECONDS); + assertEquals("test message", new String(m.getPayload())); + } + @Test(timeout = 60 * 1000) public void testCleanSession() throws Exception { final String CLIENTID = "cleansession"; @@ -1779,4 +1813,22 @@ public class MQTTTest extends MQTTTestSupport { Message message = connection2.receive(); assertEquals(payload, new String(message.getPayload())); } + + @Test + public void testDuplicateIDReturnsError() throws Exception { + String clientId = "clientId"; + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId(clientId); + mqtt.blockingConnection().connect(); + + MQTTException e = null; + try { + MQTT mqtt2 = createMQTTConnection(); + mqtt2.setClientId(clientId); + mqtt2.blockingConnection().connect(); + } catch (MQTTException mqttE) { + e = mqttE; + } + assertTrue(e.getMessage().contains("CONNECTION_REFUSED_IDENTIFIER_REJECTED")); + } }