On MQTT Websocket close, a LWT message will be properly sent if
configured and a disconnect packet was not received

(cherry picked from commit bd442a3388)
This commit is contained in:
Christopher L. Shannon (cshannon) 2016-07-05 17:47:49 +00:00
parent 1dfd0eeb60
commit fda982dccb
5 changed files with 269 additions and 7 deletions

View File

@ -19,6 +19,7 @@ package org.apache.activemq.transport.ws.jetty9;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.transport.ws.AbstractMQTTSocket; import org.apache.activemq.transport.ws.AbstractMQTTSocket;
import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.ByteSequence;
@ -36,6 +37,7 @@ public class MQTTSocket extends AbstractMQTTSocket implements WebSocketListener
private final int ORDERLY_CLOSE_TIMEOUT = 10; private final int ORDERLY_CLOSE_TIMEOUT = 10;
private Session session; private Session session;
final AtomicBoolean receivedDisconnect = new AtomicBoolean();
public MQTTSocket(String remoteAddress) { public MQTTSocket(String remoteAddress) {
super(remoteAddress); super(remoteAddress);
@ -71,6 +73,9 @@ public class MQTTSocket extends AbstractMQTTSocket implements WebSocketListener
try { try {
receiveCounter += length; receiveCounter += length;
MQTTFrame frame = (MQTTFrame)wireFormat.unmarshal(new ByteSequence(bytes, offset, length)); MQTTFrame frame = (MQTTFrame)wireFormat.unmarshal(new ByteSequence(bytes, offset, length));
if (frame.messageType() == DISCONNECT.TYPE) {
receivedDisconnect.set(true);
}
getProtocolConverter().onMQTTCommand(frame); getProtocolConverter().onMQTTCommand(frame);
} catch (Exception e) { } catch (Exception e) {
onException(IOExceptionSupport.create(e)); onException(IOExceptionSupport.create(e));
@ -84,6 +89,10 @@ public class MQTTSocket extends AbstractMQTTSocket implements WebSocketListener
try { try {
if (protocolLock.tryLock() || protocolLock.tryLock(ORDERLY_CLOSE_TIMEOUT, TimeUnit.SECONDS)) { if (protocolLock.tryLock() || protocolLock.tryLock(ORDERLY_CLOSE_TIMEOUT, TimeUnit.SECONDS)) {
LOG.debug("MQTT WebSocket closed: code[{}] message[{}]", arg0, arg1); LOG.debug("MQTT WebSocket closed: code[{}] message[{}]", arg0, arg1);
//Check if we received a disconnect packet before closing
if (!receivedDisconnect.get()) {
getProtocolConverter().onTransportError();
}
getProtocolConverter().onMQTTCommand(new DISCONNECT().encode()); getProtocolConverter().onMQTTCommand(new DISCONNECT().encode());
} }
} catch (Exception e) { } catch (Exception e) {

View File

@ -85,14 +85,16 @@ public class MQTTWSConnection extends WebSocketAdapter implements WebSocketListe
} }
public void connect(String clientId) throws Exception { public void connect(String clientId) throws Exception {
checkConnected();
CONNECT command = new CONNECT(); CONNECT command = new CONNECT();
command.clientId(new UTF8Buffer(clientId)); command.clientId(new UTF8Buffer(clientId));
command.cleanSession(false); command.cleanSession(false);
command.version(3); command.version(3);
command.keepAlive((short) 0); command.keepAlive((short) 0);
connect(command);
}
public void connect(CONNECT command) throws Exception {
checkConnected();
ByteSequence payload = wireFormat.marshal(command.encode()); ByteSequence payload = wireFormat.marshal(command.encode());
connection.getRemote().sendBytes(ByteBuffer.wrap(payload.data)); connection.getRemote().sendBytes(ByteBuffer.wrap(payload.data));

View File

@ -0,0 +1,251 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.ws;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.fusesource.mqtt.codec.CONNECT;
import org.fusesource.mqtt.codec.PUBACK;
import org.fusesource.mqtt.codec.PUBLISH;
import org.fusesource.mqtt.codec.SUBSCRIBE;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
/**
* This shows that last will and testament messages work with MQTT over WS.
* This test is modeled after org.apache.activemq.transport.mqtt.MQTTWillTest
*/
@RunWith(Parameterized.class)
public class MQTTWSTransportWillTest extends WSTransportTestSupport {
protected WebSocketClient wsClient;
protected MQTTWSConnection wsMQTTConnection1;
protected MQTTWSConnection wsMQTTConnection2;
protected ClientUpgradeRequest request;
private String willTopic = "willTopic";
private String payload = "last will";
private boolean closeWithDisconnect;
//Test both with a proper disconnect and without
@Parameters(name="closeWithDisconnect={0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{true},
{false}
});
}
public MQTTWSTransportWillTest(boolean closeWithDisconnect) {
this.closeWithDisconnect = closeWithDisconnect;
}
@Override
@Before
public void setUp() throws Exception {
//turn off advisory support
broker = createBroker(true, false);
wsClient = new WebSocketClient(new SslContextFactory(true));
wsClient.start();
request = new ClientUpgradeRequest();
request.setSubProtocols("mqttv3.1");
wsMQTTConnection1 = new MQTTWSConnection();
wsMQTTConnection2 = new MQTTWSConnection();
wsClient.connect(wsMQTTConnection1, wsConnectUri, request);
if (!wsMQTTConnection1.awaitConnection(30, TimeUnit.SECONDS)) {
throw new IOException("Could not connect to MQTT WS endpoint");
}
wsClient.connect(wsMQTTConnection2, wsConnectUri, request);
if (!wsMQTTConnection2.awaitConnection(30, TimeUnit.SECONDS)) {
throw new IOException("Could not connect to MQTT WS endpoint");
}
}
@Override
@After
public void tearDown() throws Exception {
if (wsMQTTConnection1 != null) {
wsMQTTConnection1.close();
wsMQTTConnection1 = null;
}
if (wsMQTTConnection2 != null) {
wsMQTTConnection2.close();
wsMQTTConnection2 = null;
}
wsClient.stop();
wsClient = null;
super.tearDown();
}
@Test(timeout = 60000)
public void testWill() throws Exception {
//connect with will retain false
CONNECT command = getWillConnectCommand(false);
//connect both connections
wsMQTTConnection1.connect(command);
wsMQTTConnection2.connect();
//Subscribe to topics
SUBSCRIBE subscribe = new SUBSCRIBE();
subscribe.topics(new Topic[] {new Topic("#", QoS.EXACTLY_ONCE) });
wsMQTTConnection2.sendFrame(subscribe.encode());
wsMQTTConnection2.receive(5, TimeUnit.SECONDS);
//Test message send/receive
wsMQTTConnection1.sendFrame(getTestMessage((short) 125).encode());
assertMessageReceived(wsMQTTConnection2);
//close the first connection without sending a proper disconnect frame first
//if closeWithDisconnect is false
if (closeWithDisconnect) {
wsMQTTConnection1.disconnect();
}
wsMQTTConnection1.close();
//Make sure LWT message is not received
if (closeWithDisconnect) {
assertNull(wsMQTTConnection2.receive(5, TimeUnit.SECONDS));
//make sure LWT is received
} else {
assertWillTopicReceived(wsMQTTConnection2);
}
}
@Test(timeout = 60 * 1000)
public void testRetainWillMessage() throws Exception {
//create connection with will retain true
CONNECT command = getWillConnectCommand(true);
wsMQTTConnection1.connect(command);
wsMQTTConnection2.connect();
//set to at most once to test will retain
SUBSCRIBE subscribe = new SUBSCRIBE();
subscribe.topics(new Topic[] {new Topic("#", QoS.AT_MOST_ONCE) });
wsMQTTConnection2.sendFrame(subscribe.encode());
wsMQTTConnection2.receive(5, TimeUnit.SECONDS);
//Test message send/receive
PUBLISH pub = getTestMessage((short) 127);
wsMQTTConnection1.sendFrame(pub.encode());
assertMessageReceived(wsMQTTConnection2);
PUBACK ack = new PUBACK();
ack.messageId(pub.messageId());
wsMQTTConnection2.sendFrame(ack.encode());
//Properly close connection 2 and improperly close connection 1 for LWT test
wsMQTTConnection2.disconnect();
wsMQTTConnection2.close();
Thread.sleep(1000);
//close the first connection without sending a proper disconnect frame first
//if closeWithoutDisconnect is false
if (closeWithDisconnect) {
wsMQTTConnection1.disconnect();
}
wsMQTTConnection1.close();
Thread.sleep(1000);
//Do the reconnect of the websocket after close
wsMQTTConnection2 = new MQTTWSConnection();
wsClient.connect(wsMQTTConnection2, wsConnectUri, request);
if (!wsMQTTConnection2.awaitConnection(30, TimeUnit.SECONDS)) {
throw new IOException("Could not connect to MQTT WS endpoint");
}
//Make sure the will message is received on reconnect
wsMQTTConnection2.connect();
wsMQTTConnection2.sendFrame(subscribe.encode());
wsMQTTConnection2.receive(5, TimeUnit.SECONDS);
//Make sure LWT message not received
if (closeWithDisconnect) {
assertNull(wsMQTTConnection2.receive(5, TimeUnit.SECONDS));
//make sure LWT is received
} else {
assertWillTopicReceived(wsMQTTConnection2);
}
}
private PUBLISH getTestMessage(short id) {
PUBLISH publish = new PUBLISH();
publish.dup(false);
publish.messageId(id);
publish.qos(QoS.AT_LEAST_ONCE);
publish.payload(new Buffer("hello world".getBytes()));
publish.topicName(new UTF8Buffer("test"));
return publish;
}
private CONNECT getWillConnectCommand(boolean willRetain) {
CONNECT command = new CONNECT();
command.clientId(new UTF8Buffer("clientId"));
command.cleanSession(false);
command.version(3);
command.keepAlive((short) 0);
command.willMessage(new UTF8Buffer(payload));
command.willQos(QoS.AT_LEAST_ONCE);
command.willTopic(new UTF8Buffer(willTopic));
command.willRetain(willRetain);
return command;
}
private void assertMessageReceived(MQTTWSConnection wsMQTTConnection2) throws Exception {
PUBLISH msg = new PUBLISH();
msg.decode(wsMQTTConnection2.receive(5, TimeUnit.SECONDS));
assertNotNull(msg);
assertEquals("hello world", msg.payload().ascii().toString());
assertEquals("test", msg.topicName().toString());
}
private void assertWillTopicReceived(MQTTWSConnection wsMQTTConnection2) throws Exception {
PUBLISH willMsg = new PUBLISH();
willMsg.decode(wsMQTTConnection2.receive(5, TimeUnit.SECONDS));
assertNotNull(willMsg);
assertEquals(payload, willMsg.payload().ascii().toString());
assertEquals(willTopic, willMsg.topicName().toString());
}
}

View File

@ -55,7 +55,7 @@ public class WSTransportTestSupport {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
LOG.info("========== Starting test: {} ==========", name.getMethodName()); LOG.info("========== Starting test: {} ==========", name.getMethodName());
broker = createBroker(true); broker = createBroker(true, true);
} }
@After @After
@ -91,7 +91,7 @@ public class WSTransportTestSupport {
} }
protected BrokerService createBroker(boolean deleteMessages) throws Exception { protected BrokerService createBroker(boolean deleteMessages, boolean advisorySupport) throws Exception {
BrokerService broker = new BrokerService(); BrokerService broker = new BrokerService();
@ -105,6 +105,7 @@ public class WSTransportTestSupport {
wsConnectUri = broker.addConnector(getWSConnectorURI()).getPublishableConnectURI(); wsConnectUri = broker.addConnector(getWSConnectorURI()).getPublishableConnectURI();
broker.setAdvisorySupport(advisorySupport);
broker.setUseJmx(true); broker.setUseJmx(true);
broker.getManagementContext().setCreateConnector(false); broker.getManagementContext().setCreateConnector(false);
broker.setPersistent(isPersistent()); broker.setPersistent(isPersistent());

View File

@ -339,8 +339,7 @@ public class MQTTProtocolConverter {
} }
void onMQTTDisconnect() throws MQTTProtocolException { void onMQTTDisconnect() throws MQTTProtocolException {
if (connected.get()) { if (connected.compareAndSet(true, false)) {
connected.set(false);
sendToActiveMQ(connectionInfo.createRemoveCommand(), null); sendToActiveMQ(connectionInfo.createRemoveCommand(), null);
sendToActiveMQ(new ShutdownInfo(), null); sendToActiveMQ(new ShutdownInfo(), null);
} }