This commit is contained in:
Justin Bertram 2017-02-14 11:26:55 -06:00
commit ab48344ac3
3 changed files with 88 additions and 47 deletions

View File

@ -17,9 +17,12 @@
package org.apache.activemq.artemis.core.protocol.mqtt; package org.apache.activemq.artemis.core.protocol.mqtt;
import java.nio.charset.Charset;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode; import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
@ -75,7 +78,10 @@ public class MQTTConnectionManager {
session.setIsClean(cleanSession); session.setIsClean(cleanSession);
if (will) { if (will) {
ServerMessage w = MQTTUtil.createServerMessageFromString(session, willMessage, willTopic, willQosLevel, willRetain); byte[] payload = willMessage.getBytes(Charset.forName("UTF-8"));
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(payload.length);
buf.writeBytes(payload);
ServerMessage w = MQTTUtil.createServerMessageFromByteBuf(session, willTopic, willRetain, willQosLevel, buf);
session.getSessionState().setWillMessage(w); session.getSessionState().setWillMessage(w);
} }
@ -113,38 +119,37 @@ public class MQTTConnectionManager {
return (ServerSessionImpl) serverSession; return (ServerSessionImpl) serverSession;
} }
void disconnect() { synchronized void disconnect() {
if (session == null) { if (session == null) {
return; return;
} }
try { try {
if (session.getSessionState() != null) { session.stop();
String clientId = session.getSessionState().getClientId(); session.getConnection().destroy();
if (clientId != null)
CONNECTED_CLIENTS.remove(clientId);
if (session.getState().isWill()) { if (session.getState().isWill()) {
session.getConnectionManager().sendWill(); session.getConnectionManager().sendWill();
} }
}
session.stop();
session.getConnection().disconnect(false);
session.getConnection().destroy();
} catch (Exception e) { } catch (Exception e) {
/* FIXME Failure during disconnect would leave the session state in an unrecoverable state. We should handle
errors more gracefully.
*/
log.error("Error disconnecting client: " + e.getMessage()); log.error("Error disconnecting client: " + e.getMessage());
} finally {
if (session.getSessionState() != null) {
session.getSessionState().setAttached(false);
String clientId = session.getSessionState().getClientId();
if (clientId != null) {
CONNECTED_CLIENTS.remove(clientId);
}
}
} }
} }
private void sendWill() throws Exception { private void sendWill() throws Exception {
session.getServerSession().send(session.getSessionState().getWillMessage(), true); session.getServer().getPostOffice().route(session.getSessionState().getWillMessage(), true);
session.getSessionState().deleteWillMessage(); session.getSessionState().deleteWillMessage();
} }
private MQTTSessionState getSessionState(String clientId) throws InterruptedException { private MQTTSessionState getSessionState(String clientId) throws InterruptedException {
synchronized (MQTTSession.SESSIONS) {
/* [MQTT-3.1.2-6] If CleanSession is set to 1, the Client and Server MUST discard any previous Session and /* [MQTT-3.1.2-6] If CleanSession is set to 1, the Client and Server MUST discard any previous Session and
* start a new one This Session lasts as long as the Network Connection. State data associated with this Session * start a new one This Session lasts as long as the Network Connection. State data associated with this Session
* MUST NOT be reused in any subsequent Session */ * MUST NOT be reused in any subsequent Session */
@ -153,10 +158,6 @@ public class MQTTConnectionManager {
a new one. */ a new one. */
MQTTSessionState state = MQTTSession.SESSIONS.get(clientId); MQTTSessionState state = MQTTSession.SESSIONS.get(clientId);
if (state != null) { if (state != null) {
// TODO Add a count down latch for handling wait during attached session state.
while (state.getAttached()) {
Thread.sleep(1000);
}
return state; return state;
} else { } else {
state = new MQTTSessionState(clientId); state = new MQTTSessionState(clientId);
@ -164,7 +165,6 @@ public class MQTTConnectionManager {
return state; return state;
} }
} }
}
private String validateClientId(String clientId, boolean cleanSession) { private String validateClientId(String clientId, boolean cleanSession) {
if (clientId == null || clientId.isEmpty()) { if (clientId == null || clientId.isEmpty()) {

View File

@ -115,7 +115,6 @@ public class MQTTUtil {
message.setAddress(address); message.setAddress(address);
message.putBooleanProperty(new SimpleString(MQTT_MESSAGE_RETAIN_KEY), retain); message.putBooleanProperty(new SimpleString(MQTT_MESSAGE_RETAIN_KEY), retain);
message.putIntProperty(new SimpleString(MQTT_QOS_LEVEL_KEY), qos); message.putIntProperty(new SimpleString(MQTT_QOS_LEVEL_KEY), qos);
// For JMS Consumption
message.setType(Message.BYTES_TYPE); message.setType(Message.BYTES_TYPE);
return message; return message;
} }
@ -133,16 +132,6 @@ public class MQTTUtil {
return message; return message;
} }
public static ServerMessage createServerMessageFromString(MQTTSession session,
String payload,
String topic,
int qos,
boolean retain) {
ServerMessage message = createServerMessage(session, new SimpleString(topic), retain, qos);
message.getBodyBuffer().writeString(payload);
return message;
}
public static ServerMessage createPubRelMessage(MQTTSession session, SimpleString address, int messageId) { public static ServerMessage createPubRelMessage(MQTTSession session, SimpleString address, int messageId) {
ServerMessage message = createServerMessage(session, address, false, 1); ServerMessage message = createServerMessage(session, address, false, 1);
message.putIntProperty(new SimpleString(MQTTUtil.MQTT_MESSAGE_ID_KEY), messageId); message.putIntProperty(new SimpleString(MQTTUtil.MQTT_MESSAGE_ID_KEY), messageId);

View File

@ -45,6 +45,7 @@ import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.tests.util.Wait;
import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.MQTTException;
import org.fusesource.mqtt.client.Message; import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS; import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic; import org.fusesource.mqtt.client.Topic;
@ -997,6 +998,39 @@ public class MQTTTest extends MQTTTestSupport {
newConnection.disconnect(); newConnection.disconnect();
} }
@Test(timeout = 60 * 1000)
public void testClientConnectionFailureSendsWillMessage() throws Exception {
getServer().createQueue(SimpleString.toSimpleString("will"), RoutingType.MULTICAST, SimpleString.toSimpleString("will"), null, true, false);
MQTT mqtt = createMQTTConnection("1", false);
mqtt.setKeepAlive((short) 1);
mqtt.setWillMessage("test message");
mqtt.setWillTopic("will");
mqtt.setWillQos(QoS.AT_LEAST_ONCE);
final BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return connection.isConnected();
}
});
MQTT mqtt2 = createMQTTConnection("2", false);
BlockingConnection connection2 = mqtt2.blockingConnection();
connection2.connect();
connection2.subscribe(new Topic[]{new Topic("will", QoS.AT_LEAST_ONCE)});
// kill transport
connection.kill();
// FIXME Wait for the previous connection to timeout. This is not required in ActiveMQ. Needs investigating.
Thread.sleep(10000);
Message m = connection2.receive(1000, TimeUnit.MILLISECONDS);
assertEquals("test message", new String(m.getPayload()));
}
@Test(timeout = 60 * 1000) @Test(timeout = 60 * 1000)
public void testCleanSession() throws Exception { public void testCleanSession() throws Exception {
final String CLIENTID = "cleansession"; final String CLIENTID = "cleansession";
@ -1779,4 +1813,22 @@ public class MQTTTest extends MQTTTestSupport {
Message message = connection2.receive(); Message message = connection2.receive();
assertEquals(payload, new String(message.getPayload())); assertEquals(payload, new String(message.getPayload()));
} }
@Test
public void testDuplicateIDReturnsError() throws Exception {
String clientId = "clientId";
MQTT mqtt = createMQTTConnection();
mqtt.setClientId(clientId);
mqtt.blockingConnection().connect();
MQTTException e = null;
try {
MQTT mqtt2 = createMQTTConnection();
mqtt2.setClientId(clientId);
mqtt2.blockingConnection().connect();
} catch (MQTTException mqttE) {
e = mqttE;
}
assertTrue(e.getMessage().contains("CONNECTION_REFUSED_IDENTIFIER_REJECTED"));
}
} }