Added Initial MQTT Protocol Support

This commit is contained in:
Martyn Taylor 2015-07-06 17:01:08 +01:00
parent 077e9e266b
commit 0f82ca754b
29 changed files with 5362 additions and 0 deletions

View File

@ -122,6 +122,11 @@
<artifactId>artemis-hornetq-protocol</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-mqtt-protocol</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-native</artifactId>
@ -176,6 +181,10 @@
<version>${project.version}</version>
<classifier>javadoc</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-mqtt</artifactId>
</dependency>
</dependencies>
<build>

View File

@ -60,6 +60,7 @@
<include>org.apache.activemq:artemis-proton-plug</include>
<include>org.apache.activemq:artemis-hornetq-protocol</include>
<include>org.apache.activemq:artemis-stomp-protocol</include>
<include>org.apache.activemq:artemis-mqtt-protocol</include>
<include>org.apache.activemq:artemis-ra</include>
<include>org.apache.activemq:artemis-selector</include>
<include>org.apache.activemq:artemis-server</include>
@ -86,6 +87,7 @@
<include>commons-collections:commons-collections</include>
<include>org.fusesource.hawtbuf:hawtbuf</include>
<include>org.jgroups:jgroups</include>
<include>io.netty:netty-codec-mqtt</include>
</includes>
<!--excludes>
<exclude>org.apache.activemq:artemis-website</exclude>

View File

@ -0,0 +1,56 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>artemis-protocols</artifactId>
<groupId>org.apache.activemq</groupId>
<version>1.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>artemis-mqtt-protocol</artifactId>
<properties>
<activemq.basedir>${project.basedir}/../..</activemq.basedir>
</properties>
<dependencies>
<dependency>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging-processor</artifactId>
</dependency>
<dependency>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-mqtt</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,241 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.mqtt;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
public class MQTTConnection implements RemotingConnection
{
private final Connection transportConnection;
private final long creationTime;
private AtomicBoolean dataReceived;
private boolean destroyed;
private boolean connected;
private final List<FailureListener> failureListeners = Collections.synchronizedList(new ArrayList<FailureListener>());
private final List<CloseListener> closeListeners = Collections.synchronizedList(new ArrayList<CloseListener>());
public MQTTConnection(Connection transportConnection) throws Exception
{
this.transportConnection = transportConnection;
this.creationTime = System.currentTimeMillis();
this.dataReceived = new AtomicBoolean();
this.destroyed = false;
}
public Object getID()
{
return transportConnection.getID();
}
@Override
public long getCreationTime()
{
return creationTime;
}
@Override
public String getRemoteAddress()
{
return transportConnection.getRemoteAddress();
}
@Override
public void addFailureListener(FailureListener listener)
{
failureListeners.add(listener);
}
@Override
public boolean removeFailureListener(FailureListener listener)
{
return failureListeners.remove(listener);
}
@Override
public void addCloseListener(CloseListener listener)
{
closeListeners.add(listener);
}
@Override
public boolean removeCloseListener(CloseListener listener)
{
return closeListeners.remove(listener);
}
@Override
public List<CloseListener> removeCloseListeners()
{
synchronized (closeListeners)
{
List<CloseListener> deletedCloseListeners = new ArrayList<CloseListener>(closeListeners);
closeListeners.clear();
return deletedCloseListeners;
}
}
@Override
public void setCloseListeners(List<CloseListener> listeners)
{
closeListeners.addAll(listeners);
}
@Override
public List<FailureListener> getFailureListeners()
{
return failureListeners;
}
@Override
public List<FailureListener> removeFailureListeners()
{
synchronized (failureListeners)
{
List<FailureListener> deletedFailureListeners = new ArrayList<FailureListener>(failureListeners);
failureListeners.clear();
return deletedFailureListeners;
}
}
@Override
public void setFailureListeners(List<FailureListener> listeners)
{
synchronized (failureListeners)
{
failureListeners.clear();
failureListeners.addAll(listeners);
}
}
@Override
public ActiveMQBuffer createTransportBuffer(int size)
{
return transportConnection.createTransportBuffer(size);
}
@Override
public void fail(ActiveMQException me)
{
synchronized (failureListeners)
{
for (FailureListener listener : failureListeners)
{
listener.connectionFailed(me, false);
}
}
}
@Override
public void fail(ActiveMQException me, String scaleDownTargetNodeID)
{
synchronized (failureListeners)
{
for (FailureListener listener : failureListeners)
{
//FIXME(mtaylor) How do we check if the node has failed over?
listener.connectionFailed(me, false);
}
}
}
@Override
public void destroy()
{
//TODO(mtaylor) ensure this properly destroys this connection.
destroyed = true;
disconnect(false);
}
@Override
public Connection getTransportConnection()
{
return transportConnection;
}
@Override
public boolean isClient()
{
return false;
}
@Override
public boolean isDestroyed()
{
return destroyed;
}
@Override
public void disconnect(boolean criticalError)
{
transportConnection.forceClose();
}
@Override
public void disconnect(String scaleDownNodeID, boolean criticalError)
{
transportConnection.forceClose();
}
protected void dataReceived()
{
dataReceived.set(true);
}
@Override
public boolean checkDataReceived()
{
return dataReceived.compareAndSet(true, false);
}
@Override
public void flush()
{
transportConnection.checkFlushBatchBuffer();
}
@Override
public void bufferReceived(Object connectionID, ActiveMQBuffer buffer)
{
}
public void setConnected(boolean connected)
{
this.connected = connected;
}
public boolean getConnected()
{
return connected;
}
}

View File

@ -0,0 +1,205 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.mqtt;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import java.util.Set;
import java.util.UUID;
/**
* MQTTConnectionMananager is responsible for handle Connect and Disconnect packets and any resulting behaviour of these
* events.
*/
public class MQTTConnectionManager
{
private MQTTSession session;
//TODO Read in a list of existing client IDs from stored Sessions.
public static Set<String> CONNECTED_CLIENTS = new ConcurrentHashSet<String>();
private MQTTLogger log = MQTTLogger.LOGGER;
public MQTTConnectionManager(MQTTSession session)
{
this.session = session;
MQTTFailureListener failureListener = new MQTTFailureListener(this);
session.getConnection().addFailureListener(failureListener);
}
/**
* Handles the connect packet. See spec for details on each of parameters.
*/
synchronized void connect(String cId, String username, String password, boolean will, String willMessage, String willTopic,
boolean willRetain, int willQosLevel, boolean cleanSession) throws Exception
{
String clientId = validateClientId(cId, cleanSession);
if (clientId == null)
{
session.getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
session.getProtocolHandler().disconnect();
return;
}
session.setSessionState(getSessionState(clientId, cleanSession));
ServerSessionImpl serverSession = createServerSession(username, password);
serverSession.start();
session.setServerSession(serverSession);
if (will)
{
ServerMessage w = MQTTUtil.createServerMessageFromString(session, willMessage, willTopic, willQosLevel, willRetain);
session.getSessionState().setWillMessage(w);
}
session.getConnection().setConnected(true);
session.start();
session.getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_ACCEPTED);
}
/**
* Creates an internal Server Session.
* @param username
* @param password
* @return
* @throws Exception
*/
ServerSessionImpl createServerSession(String username, String password) throws Exception
{
String id = UUIDGenerator.getInstance().generateStringUUID();
ActiveMQServer server = session.getServer();
ServerSession serverSession = server.createSession(id,
username,
password,
ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
session.getConnection(),
MQTTUtil.SESSION_AUTO_COMMIT_SENDS,
MQTTUtil.SESSION_AUTO_COMMIT_ACKS,
MQTTUtil.SESSION_PREACKNOWLEDGE,
MQTTUtil.SESSION_XA,
null,
session.getSessionCallback(),
null, // Session factory
MQTTUtil.SESSION_AUTO_CREATE_QUEUE);
return (ServerSessionImpl) serverSession;
}
void disconnect()
{
try
{
if (session != null && session.getSessionState() != null)
{
String clientId = session.getSessionState().getClientId();
if (clientId != null) CONNECTED_CLIENTS.remove(clientId);
if (session.getState().isWill())
{
session.getConnectionManager().sendWill();
}
}
session.stop();
session.getConnection().disconnect(false);
session.getConnection().destroy();
}
catch (Exception e)
{
/* FIXME Failure during disconnect would leave the session state in an unrecoverable state. We should handle
errors more gracefully.
*/
log.error("Error disconnecting client: " + e.getMessage());
}
}
private void sendWill() throws Exception
{
session.getServerSession().send(session.getSessionState().getWillMessage(), true);
session.getSessionState().deleteWillMessage();
}
private MQTTSessionState getSessionState(String clientId, boolean cleanSession) throws InterruptedException
{
synchronized (MQTTSession.SESSIONS)
{
/* [MQTT-3.1.2-6] If CleanSession is set to 1, the Client and Server MUST discard any previous Session and
* start a new one This Session lasts as long as the Network Connection. State data associated with this Session
* MUST NOT be reused in any subsequent Session */
if (cleanSession)
{
MQTTSession.SESSIONS.remove(clientId);
return new MQTTSessionState(clientId);
}
else
{
/* [MQTT-3.1.2-4] Attach an existing session if one exists (if cleanSession flag is false) otherwise create
a new one. */
MQTTSessionState state = MQTTSession.SESSIONS.get(clientId);
if (state != null)
{
// TODO Add a count down latch for handling wait during attached session state.
while (state.getAttached())
{
Thread.sleep(1000);
}
return state;
}
else
{
state = new MQTTSessionState(clientId);
MQTTSession.SESSIONS.put(clientId, state);
return state;
}
}
}
}
private String validateClientId(String clientId, boolean cleanSession)
{
if (clientId == null || clientId.isEmpty())
{
// [MQTT-3.1.3-7] [MQTT-3.1.3-6] If client does not specify a client ID and clean session is set to 1 create it.
if (cleanSession)
{
clientId = UUID.randomUUID().toString();
}
else
{
// [MQTT-3.1.3-8] Return ID rejected and disconnect if clean session = false and client id is null
return null;
}
}
// If the client ID is not unique (i.e. it has already registered) then do not accept it.
else if (!CONNECTED_CLIENTS.add(clientId))
{
// [MQTT-3.1.3-9] Return ID Rejected if server rejects the client ID
return null;
}
return clientId;
}
}

View File

@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.mqtt;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.remoting.FailureListener;
/**
* Registered with the server and called during connection failure. This class informs the ConnectionManager when a
* connection failure has occurred, which subsequently cleans up any connection data.
*/
public class MQTTFailureListener implements FailureListener
{
private MQTTConnectionManager connectionManager;
public MQTTFailureListener(MQTTConnectionManager connectionManager)
{
this.connectionManager = connectionManager;
}
@Override
public void connectionFailed(ActiveMQException exception, boolean failedOver)
{
connectionManager.disconnect();
}
@Override
public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID)
{
connectionManager.disconnect();
}
}

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.mqtt;
import org.jboss.logging.BasicLogger;
import org.jboss.logging.Logger;
import org.jboss.logging.annotations.MessageLogger;
/**
* Logger Code 83
*
* each message id must be 6 digits long starting with 10, the 3rd digit donates the level so
*
* INF0 1
* WARN 2
* DEBUG 3
* ERROR 4
* TRACE 5
* FATAL 6
*
* so an INFO message would be 101000 to 101999
*/
@MessageLogger(projectCode = "AMQ")
public interface MQTTLogger extends BasicLogger
{
MQTTLogger LOGGER = Logger.getMessageLogger(MQTTLogger.class, MQTTLogger.class.getPackage().getName());
}

View File

@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.mqtt;
/**
* MQTT Acks only hold message ID information. From this we must infer the internal message ID and consumer.
*/
class MQTTMessageInfo
{
private long serverMessageId;
private long consumerId;
private String address;
MQTTMessageInfo(long serverMessageId, long consumerId, String address)
{
this.serverMessageId = serverMessageId;
this.consumerId = consumerId;
this.address = address;
}
long getServerMessageId()
{
return serverMessageId;
}
long getConsumerId()
{
return consumerId;
}
String getAddress()
{
return address;
}
public String toString()
{
return ("ServerMessageId: " + serverMessageId + " ConsumerId: " + consumerId + " addr: " + address);
}
}

View File

@ -0,0 +1,362 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.mqtt;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
/**
* This class is responsible for receiving and sending MQTT packets, delegating behaviour to one of the
* MQTTConnectionManager, MQTTPublishMananger, MQTTSubscriptionManager classes.
*/
public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter
{
private ConnectionEntry connectionEntry;
private MQTTConnection connection;
private MQTTSession session;
private ActiveMQServer server;
// This Channel Handler is not sharable, therefore it can only ever be associated with a single ctx.
private ChannelHandlerContext ctx;
private final MQTTLogger log = MQTTLogger.LOGGER;;
private boolean stopped = false;
public MQTTProtocolHandler(ActiveMQServer server)
{
this.server = server;
}
void setConnection(MQTTConnection connection, ConnectionEntry entry) throws Exception
{
this.connectionEntry = entry;
this.connection = connection;
this.session = new MQTTSession(this, connection);
}
void stop(boolean error)
{
stopped = true;
}
public void channelRead(ChannelHandlerContext ctx, Object msg)
{
try
{
if (stopped)
{
disconnect();
return;
}
MqttMessage message = (MqttMessage) msg;
// Disconnect if Netty codec failed to decode the stream.
if (message.decoderResult().isFailure())
{
log.debug("Bad Message Disconnecting Client.");
disconnect();
return;
}
connection.dataReceived();
MQTTUtil.logMessage(log, message, true);
switch (message.fixedHeader().messageType())
{
case CONNECT:
handleConnect((MqttConnectMessage) message, ctx);
break;
case CONNACK:
handleConnack((MqttConnAckMessage) message);
break;
case PUBLISH:
handlePublish((MqttPublishMessage) message);
break;
case PUBACK:
handlePuback((MqttPubAckMessage) message);
break;
case PUBREC:
handlePubrec(message);
break;
case PUBREL:
handlePubrel(message);
break;
case PUBCOMP:
handlePubcomp(message);
break;
case SUBSCRIBE:
handleSubscribe((MqttSubscribeMessage) message, ctx);
break;
case SUBACK:
handleSuback((MqttSubAckMessage) message);
break;
case UNSUBSCRIBE:
handleUnsubscribe((MqttUnsubscribeMessage) message);
break;
case UNSUBACK:
handleUnsuback((MqttUnsubAckMessage) message);
break;
case PINGREQ:
handlePingreq(message, ctx);
break;
case PINGRESP:
handlePingresp(message);
break;
case DISCONNECT:
handleDisconnect(message);
break;
default:
disconnect();
}
}
catch (Exception e)
{
log.debug("Error processing Control Packet, Disconnecting Client" + e.getMessage());
disconnect();
}
}
/**
* Called during connection.
*
* @param connect
*/
void handleConnect(MqttConnectMessage connect, ChannelHandlerContext ctx) throws Exception
{
this.ctx = ctx;
connectionEntry.ttl = connect.variableHeader().keepAliveTimeSeconds() * 750;
String clientId = connect.payload().clientIdentifier();
session.getConnectionManager().connect(clientId,
connect.payload().userName(),
connect.payload().password(),
connect.variableHeader().isWillFlag(),
connect.payload().willMessage(),
connect.payload().willTopic(),
connect.variableHeader().isWillRetain(),
connect.variableHeader().willQos(),
connect.variableHeader().isCleanSession());
}
void disconnect()
{
session.getConnectionManager().disconnect();
}
void sendConnack(MqttConnectReturnCode returnCode)
{
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK,
false,
MqttQoS.AT_MOST_ONCE,
false,
0);
MqttConnAckVariableHeader varHeader = new MqttConnAckVariableHeader(returnCode);
MqttConnAckMessage message = new MqttConnAckMessage(fixedHeader, varHeader);
ctx.write(message);
ctx.flush();
}
/**
* The server does not instantiate connections therefore any CONNACK received over a connection is an invalid
* control message.
* @param message
*/
void handleConnack(MqttConnAckMessage message)
{
log.debug("Received invalid CONNACK from client: " + session.getSessionState().getClientId());
log.debug("Disconnecting client: " + session.getSessionState().getClientId());
disconnect();
}
void handlePublish(MqttPublishMessage message) throws Exception
{
session.getMqttPublishManager().handleMessage(message.variableHeader().messageId(),
message.variableHeader().topicName(),
message.fixedHeader().qosLevel().value(),
message.payload(),
message.fixedHeader().isRetain());
}
void sendPubAck(int messageId)
{
sendPublishProtocolControlMessage(messageId, MqttMessageType.PUBACK);
}
void sendPubRel(int messageId)
{
sendPublishProtocolControlMessage(messageId, MqttMessageType.PUBREL);
}
void sendPubRec(int messageId)
{
sendPublishProtocolControlMessage(messageId, MqttMessageType.PUBREC);
}
void sendPubComp(int messageId)
{
sendPublishProtocolControlMessage(messageId, MqttMessageType.PUBCOMP);
}
void sendPublishProtocolControlMessage(int messageId, MqttMessageType messageType)
{
MqttQoS qos = (messageType == MqttMessageType.PUBREL) ? MqttQoS.AT_LEAST_ONCE : MqttQoS.AT_MOST_ONCE;
MqttFixedHeader fixedHeader = new MqttFixedHeader(messageType,
false,
qos, // Spec requires 01 in header for rel
false,
0);
MqttPubAckMessage rel = new MqttPubAckMessage(fixedHeader, MqttMessageIdVariableHeader.from(messageId));
ctx.write(rel);
ctx.flush();
}
void handlePuback(MqttPubAckMessage message) throws Exception
{
session.getMqttPublishManager().handlePubAck(message.variableHeader().messageId());
}
void handlePubrec(MqttMessage message) throws Exception
{
int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
session.getMqttPublishManager().handlePubRec(messageId);
}
void handlePubrel(MqttMessage message)
{
int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
session.getMqttPublishManager().handlePubRel(messageId);
}
void handlePubcomp( MqttMessage message) throws Exception
{
int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
session.getMqttPublishManager().handlePubComp(messageId);
}
void handleSubscribe(MqttSubscribeMessage message, ChannelHandlerContext ctx) throws Exception
{
MQTTSubscriptionManager subscriptionManager = session.getSubscriptionManager();
int[] qos = subscriptionManager.addSubscriptions(message.payload().topicSubscriptions());
MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.SUBACK,
false,
MqttQoS.AT_MOST_ONCE,
false,
0);
MqttSubAckMessage ack = new MqttSubAckMessage(header,
message.variableHeader(),
new MqttSubAckPayload(qos));
ctx.write(ack);
ctx.flush();
}
void handleSuback(MqttSubAckMessage message)
{
disconnect();
}
void handleUnsubscribe(MqttUnsubscribeMessage message) throws Exception
{
session.getSubscriptionManager().removeSubscriptions(message.payload().topics());
MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.UNSUBACK,
false,
MqttQoS.AT_MOST_ONCE,
false,
0);
MqttUnsubAckMessage m = new MqttUnsubAckMessage(header, message.variableHeader());
ctx.write(m);
ctx.flush();
}
void handleUnsuback(MqttUnsubAckMessage message)
{
disconnect();
}
void handlePingreq(MqttMessage message, ChannelHandlerContext ctx)
{
ctx.write(new MqttMessage(new MqttFixedHeader(MqttMessageType.PINGRESP,
false,
MqttQoS.AT_MOST_ONCE,
false,
0)));
ctx.flush();
}
void handlePingresp(MqttMessage message)
{
disconnect();
}
void handleDisconnect(MqttMessage message)
{
if (session.getSessionState() != null) session.getState().deleteWillMessage();
disconnect();
}
protected int send(int messageId, String topicName, int qosLevel, ByteBuf payload, int deliveryCount)
{
boolean redelivery = qosLevel == 0 ? false : (deliveryCount > 0);
MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH,
redelivery,
MqttQoS.valueOf(qosLevel),
false,
0);
MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topicName, messageId);
MqttMessage publish = new MqttPublishMessage(header, varHeader, payload);
ctx.write(publish);
ctx.flush();
return 1;
}
ActiveMQServer getServer()
{
return server;
}
}

View File

@ -0,0 +1,140 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.mqtt;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationListener;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import java.util.List;
/**
* MQTTProtocolManager
*/
class MQTTProtocolManager implements ProtocolManager, NotificationListener
{
private ActiveMQServer server;
private MQTTLogger log = MQTTLogger.LOGGER;
public MQTTProtocolManager(ActiveMQServer server)
{
this.server = server;
}
@Override
public void onNotification(Notification notification)
{
// TODO handle notifications
}
@Override
public ProtocolManagerFactory getFactory()
{
return new MQTTProtocolManagerFactory();
}
@Override
public void updateInterceptors(List incomingInterceptors, List outgoingInterceptors)
{
// TODO handle interceptors
}
@Override
public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection)
{
try
{
MQTTConnection mqttConnection = new MQTTConnection(connection);
ConnectionEntry entry = new ConnectionEntry(mqttConnection,
null,
System.currentTimeMillis(),
MQTTUtil.DEFAULT_KEEP_ALIVE_FREQUENCY);
NettyServerConnection nettyConnection = ((NettyServerConnection) connection);
MQTTProtocolHandler protocolHandler = nettyConnection.getChannel().pipeline().get(MQTTProtocolHandler.class);
protocolHandler.setConnection(mqttConnection, entry);
return entry;
}
catch (Exception e)
{
log.error(e);
return null;
}
}
@Override
public void removeHandler(String name)
{
// TODO add support for handlers
}
@Override
public void handleBuffer(RemotingConnection connection, ActiveMQBuffer buffer)
{
connection.bufferReceived(connection.getID(), buffer);
}
@Override
public void addChannelHandlers(ChannelPipeline pipeline)
{
pipeline.addLast(new MqttEncoder());
pipeline.addLast(new MqttDecoder(MQTTUtil.MAX_MESSAGE_SIZE));
pipeline.addLast(new MQTTProtocolHandler(server));
}
@Override
public boolean isProtocol(byte[] array)
{
boolean mqtt311 = array[4] == 77 && // M
array[5] == 81 && // Q
array[6] == 84 && // T
array[7] == 84; // T
// FIXME The actual protocol name is 'MQIsdp' (However we are only passed the first 4 bytes of the protocol name)
boolean mqtt31 = array[4] == 77 && // M
array[5] == 81 && // Q
array[6] == 73 && // I
array[7] == 115; // s
return mqtt311 || mqtt31;
}
@Override
public MessageConverter getConverter()
{
return null;
}
@Override
public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer)
{
}
}

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.mqtt;
import java.util.List;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
public class MQTTProtocolManagerFactory implements ProtocolManagerFactory
{
public static final String MQTT_PROTOCOL_NAME = "MQTT";
private static final String MODULE_NAME = "artemis-mqtt-protocol";
private static final String[] SUPPORTED_PROTOCOLS = {MQTT_PROTOCOL_NAME};
@Override
public ProtocolManager createProtocolManager(ActiveMQServer server, List incomingInterceptors, List outgoingInterceptors)
{
return new MQTTProtocolManager(server);
}
@Override
public List filterInterceptors(List list)
{
// TODO Add support for interceptors.
return null;
}
@Override
public String[] getProtocols()
{
return SUPPORTED_PROTOCOLS;
}
@Override
public String getModuleName()
{
return MODULE_NAME;
}
}

View File

@ -0,0 +1,270 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.mqtt;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.EmptyByteBuf;
import io.netty.handler.codec.mqtt.MqttMessageType;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.journal.IOAsyncTask;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
/**
* Handles MQTT Exactly Once (QoS level 2) Protocol.
*/
public class MQTTPublishManager
{
private static final String MANAGEMENT_QUEUE_PREFIX = "$sys.mqtt.queue.qos2.";
private SimpleString managementAddress;
private ServerConsumer managementConsumer;
private MQTTSession session;
private MQTTLogger log = MQTTLogger.LOGGER;
private final Object lock = new Object();
public MQTTPublishManager(MQTTSession session)
{
this.session = session;
}
synchronized void start() throws Exception
{
createManagementAddress();
createManagementQueue();
createManagementConsumer();
}
synchronized void stop(boolean clean) throws Exception
{
if (managementConsumer != null)
{
managementConsumer.removeItself();
managementConsumer.setStarted(false);
managementConsumer.close(false);
if (clean) session.getServer().destroyQueue(managementAddress);
}
}
private void createManagementConsumer() throws Exception
{
long consumerId = session.getServer().getStorageManager().generateID();
managementConsumer = session.getServerSession().createConsumer(consumerId, managementAddress, null, false, false, -1);
managementConsumer.setStarted(true);
}
private void createManagementAddress()
{
String clientId = session.getSessionState().getClientId();
managementAddress = new SimpleString(MANAGEMENT_QUEUE_PREFIX + clientId);
}
private void createManagementQueue() throws Exception
{
if (session.getServer().locateQueue(managementAddress) == null)
{
session.getServerSession().createQueue(managementAddress, managementAddress, null, false, MQTTUtil.DURABLE_MESSAGES);
}
}
boolean isManagementConsumer(ServerConsumer consumer)
{
return consumer == managementConsumer;
}
private int generateMqttId(int qos)
{
if (qos == 1)
{
return session.getSessionState().generateId();
}
else
{
Integer mqttid = session.getSessionState().generateId();
if (mqttid == null)
{
mqttid = (int) session.getServer().getStorageManager().generateID();
}
return mqttid;
}
}
/** Since MQTT Subscriptions can over lap; a client may receive the same message twice. When this happens the client
* returns a PubRec or PubAck with ID. But we need to know which consumer to ack, since we only have the ID to go on we
* are not able to decide which consumer to ack. Instead we send MQTT messages with different IDs and store a reference
* to original ID and consumer in the Session state. This way we can look up the consumer Id and the message Id from
* the PubAck or PubRec message id. **/
protected void sendMessage(ServerMessage message, ServerConsumer consumer, int deliveryCount) throws Exception
{
// This is to allow retries of PubRel.
if (isManagementConsumer(consumer))
{
sendPubRelMessage(message);
}
else
{
int qos = decideQoS(message, consumer);
if (qos == 0)
{
sendServerMessage((int) message.getMessageID(), (ServerMessageImpl) message, deliveryCount, qos);
session.getServerSession().acknowledge(consumer.getID(), message.getMessageID());
}
else
{
String consumerAddress = consumer.getQueue().getAddress().toString();
Integer mqttid = generateMqttId(qos);
session.getSessionState().addOutbandMessageRef(mqttid, consumerAddress, message.getMessageID(), qos);
sendServerMessage(mqttid, (ServerMessageImpl) message, deliveryCount, qos);
}
}
}
// INBOUND
void handleMessage(int messageId, String topic, int qos, ByteBuf payload, boolean retain) throws Exception
{
synchronized (lock)
{
ServerMessage serverMessage = MQTTUtil.createServerMessageFromByteBuf(session, topic, retain, qos, payload);
if (qos > 0)
{
serverMessage.setDurable(MQTTUtil.DURABLE_MESSAGES);
}
if (qos < 2 || !session.getSessionState().getPubRec().contains(messageId))
{
if (qos == 2) session.getSessionState().getPubRec().add(messageId);
session.getServerSession().send(serverMessage, true);
}
if (retain)
{
boolean reset = payload instanceof EmptyByteBuf || payload.capacity() == 0;
session.getRetainMessageManager().handleRetainedMessage(serverMessage, topic, reset);
}
createMessageAck(messageId, qos);
}
}
void sendPubRelMessage(ServerMessage message)
{
if (message.getIntProperty(MQTTUtil.MQTT_MESSAGE_TYPE_KEY) == MqttMessageType.PUBREL.value())
{
int messageId = message.getIntProperty(MQTTUtil.MQTT_MESSAGE_ID_KEY);
MQTTMessageInfo messageInfo = new MQTTMessageInfo(message.getMessageID(), managementConsumer.getID(), message.getAddress().toString());
session.getSessionState().storeMessageRef(messageId, messageInfo, false);
session.getProtocolHandler().sendPubRel(messageId);
}
}
private void createMessageAck(final int messageId, final int qos)
{
session.getServer().getStorageManager().afterCompleteOperations(new IOAsyncTask()
{
@Override
public void done()
{
if (qos == 1)
{
session.getProtocolHandler().sendPubAck(messageId);
}
else if (qos == 2)
{
session.getProtocolHandler().sendPubRec(messageId);
}
}
@Override
public void onError(int errorCode, String errorMessage)
{
log.error("Pub Sync Failed");
}
});
}
void handlePubRec(int messageId) throws Exception
{
MQTTMessageInfo messageRef = session.getSessionState().getMessageInfo(messageId);
if (messageRef != null)
{
ServerMessage pubRel = MQTTUtil.createPubRelMessage(session, managementAddress, messageId);
session.getServerSession().send(pubRel, true);
session.getServerSession().acknowledge(messageRef.getConsumerId(), messageRef.getServerMessageId());
session.getProtocolHandler().sendPubRel(messageId);
}
}
void handlePubComp(int messageId) throws Exception
{
MQTTMessageInfo messageInfo = session.getSessionState().getMessageInfo(messageId);
// Check to see if this message is stored if not just drop the packet.
if (messageInfo != null)
{
session.getServerSession().acknowledge(managementConsumer.getID(), messageInfo.getServerMessageId());
}
}
void handlePubRel(int messageId)
{
// We don't check to see if a PubRel existed for this message. We assume it did and so send PubComp.
session.getSessionState().getPubRec().remove(messageId);
session.getProtocolHandler().sendPubComp(messageId);
session.getSessionState().removeMessageRef(messageId);
}
void handlePubAck(int messageId) throws Exception
{
Pair<String, Long> pub1MessageInfo = session.getSessionState().removeOutbandMessageRef(messageId, 1);
if (pub1MessageInfo != null)
{
String mqttAddress = MQTTUtil.convertCoreAddressFilterToMQTT(pub1MessageInfo.getA());
ServerConsumer consumer = session.getSubscriptionManager().getConsumerForAddress(mqttAddress);
session.getServerSession().acknowledge(consumer.getID(), pub1MessageInfo.getB());
}
}
private void sendServerMessage(int messageId, ServerMessageImpl message, int deliveryCount, int qos)
{
String address = MQTTUtil.convertCoreAddressFilterToMQTT(message.getAddress().toString()).toString();
//FIXME should we be copying the body buffer here?
ByteBuf payload = message.getBodyBufferCopy().byteBuf();
session.getProtocolHandler().send(messageId, address, qos, payload, deliveryCount);
}
private int decideQoS(ServerMessage message, ServerConsumer consumer)
{
int subscriptionQoS = session.getSubscriptionManager().getConsumerQoSLevels().get(consumer.getID());
int qos = message.getIntProperty(MQTTUtil.MQTT_QOS_LEVEL_KEY);
/* Subscription QoS is the maximum QoS the client is willing to receive for this subscription. If the message QoS
is less than the subscription QoS then use it, otherwise use the subscription qos). */
return subscriptionQoS < qos ? subscriptionQoS : qos;
}
}

View File

@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.mqtt;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerMessage;
import java.util.Iterator;
public class MQTTRetainMessageManager
{
private MQTTSession session;
public MQTTRetainMessageManager(MQTTSession session)
{
this.session = session;
}
/** FIXME
* Retained messages should be handled in the core API. There is currently no support for retained messages
* at the time of writing. Instead we handle retained messages here. This method will create a new queue for
* every address that is used to store retained messages. THere should only ever be one message in the retained
* message queue. When a new subscription is created the queue should be browsed and the message copied onto
* the subscription queue for the consumer. When a new retained message is received the message will be sent to
* the retained queue and the previous retain message consumed to remove it from the queue. */
void handleRetainedMessage(ServerMessage message, String address, boolean reset) throws Exception
{
SimpleString retainAddress = new SimpleString(MQTTUtil.convertMQTTAddressFilterToCoreRetain(address));
if (!session.getServerSession().executeQueueQuery(retainAddress).isExists())
{
session.getServerSession().createQueue(retainAddress, retainAddress, null, false, true);
}
Queue queue = session.getServer().locateQueue(retainAddress);
// Set the address of this message to the retained queue.
message.setAddress(retainAddress);
Iterator<MessageReference> iterator = queue.iterator();
synchronized (iterator)
{
if (iterator.hasNext())
{
Long messageId = iterator.next().getMessage().getMessageID();
queue.deleteReference(messageId);
}
if (!reset)
{
session.getServerSession().send(message.copy(), true);
}
}
}
void addRetainedMessagesToQueue(SimpleString queueName, String address) throws Exception
{
// Queue to add the retained messages to
Queue queue = session.getServer().locateQueue(queueName);
// The address filter that matches all retained message queues.
String retainAddress = MQTTUtil.convertMQTTAddressFilterToCoreRetain(address);
BindingQueryResult bindingQueryResult = session.getServerSession().executeBindingQuery(new SimpleString(retainAddress));
// Iterate over all matching retain queues and add the head message to the original queue.
for (SimpleString retainedQueueName : bindingQueryResult.getQueueNames())
{
Queue retainedQueue = session.getServer().locateQueue(retainedQueueName);
synchronized (this)
{
Iterator<MessageReference> i = retainedQueue.iterator();
if (i.hasNext())
{
ServerMessage message = i.next().getMessage().copy(session.getServer().getStorageManager().generateID());
queue.addTail(message.createReference(queue), true);
}
}
}
}
}

View File

@ -0,0 +1,173 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.mqtt;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
public class MQTTSession
{
static Map<String, MQTTSessionState> SESSIONS = new ConcurrentHashMap<>();
private final String id = UUID.randomUUID().toString();
private MQTTProtocolHandler protocolHandler;
private MQTTSubscriptionManager subscriptionManager;
private MQTTSessionCallback sessionCallback;
private ServerSessionImpl serverSession;
private MQTTPublishManager mqttPublishManager;
private MQTTConnectionManager mqttConnectionManager;
private MQTTRetainMessageManager retainMessageManager;
private MQTTConnection connection;
protected MQTTSessionState state;
private boolean stopped = false;
private MQTTLogger log = MQTTLogger.LOGGER;
public MQTTSession( MQTTProtocolHandler protocolHandler, MQTTConnection connection) throws Exception
{
this.protocolHandler = protocolHandler;
this.connection = connection;
mqttConnectionManager = new MQTTConnectionManager(this);
mqttPublishManager = new MQTTPublishManager(this);
sessionCallback = new MQTTSessionCallback(this);
subscriptionManager = new MQTTSubscriptionManager(this);
retainMessageManager = new MQTTRetainMessageManager(this);
log.debug("SESSION CREATED: " + id);
}
// Called after the client has Connected.
synchronized void start() throws Exception
{
mqttPublishManager.start();
subscriptionManager.start();
stopped = false;
}
// TODO ensure resources are cleaned up for GC.
synchronized void stop() throws Exception
{
if (!stopped)
{
protocolHandler.stop(false);
// TODO this should pass in clean session.
subscriptionManager.stop(false);
mqttPublishManager.stop(false);
if (serverSession != null)
{
serverSession.stop();
serverSession.close(false);
}
if (state != null)
{
state.setAttached(false);
}
}
stopped = true;
}
boolean getStopped()
{
return stopped;
}
MQTTPublishManager getMqttPublishManager()
{
return mqttPublishManager;
}
MQTTSessionState getState()
{
return state;
}
MQTTConnectionManager getConnectionManager()
{
return mqttConnectionManager;
}
MQTTSessionState getSessionState()
{
return state;
}
ServerSessionImpl getServerSession()
{
return serverSession;
}
ActiveMQServer getServer()
{
return protocolHandler.getServer();
}
MQTTSubscriptionManager getSubscriptionManager()
{
return subscriptionManager;
}
MQTTProtocolHandler getProtocolHandler()
{
return protocolHandler;
}
SessionCallback getSessionCallback()
{
return sessionCallback;
}
void setServerSession(ServerSessionImpl serverSession)
{
this.serverSession = serverSession;
}
void setSessionState(MQTTSessionState state)
{
this.state = state;
state.setAttached(true);
}
MQTTRetainMessageManager getRetainMessageManager()
{
return retainMessageManager;
}
MQTTConnection getConnection()
{
return connection;
}
}

View File

@ -0,0 +1,111 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.mqtt;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
public class MQTTSessionCallback implements SessionCallback
{
private MQTTSession session;
private MQTTLogger log = MQTTLogger.LOGGER;
public MQTTSessionCallback(MQTTSession session) throws Exception
{
this.session = session;
}
@Override
public int sendMessage(ServerMessage message, ServerConsumer consumer, int deliveryCount)
{
try
{
session.getMqttPublishManager().sendMessage(message, consumer, deliveryCount);
}
catch (Exception e)
{
e.printStackTrace();
log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage());
}
return 1;
}
@Override
public int sendLargeMessageContinuation(ServerConsumer consumerID, byte[] body, boolean continues, boolean requiresResponse)
{
log.warn("Sending LARGE MESSAGE");
return 1;
}
@Override
public void addReadyListener(ReadyListener listener)
{
session.getConnection().getTransportConnection().addReadyListener(listener);
}
@Override
public void removeReadyListener(ReadyListener listener)
{
session.getConnection().getTransportConnection().removeReadyListener(listener);
}
@Override
public int sendLargeMessage(ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount)
{
return sendMessage(message, consumer, deliveryCount);
}
@Override
public void disconnect(ServerConsumer consumer, String queueName)
{
try
{
consumer.removeItself();
}
catch (Exception e)
{
log.error(e.getMessage());
}
}
@Override
public boolean hasCredits(ServerConsumer consumerID)
{
return true;
}
@Override
public void sendProducerCreditsMessage(int credits, SimpleString address)
{
}
@Override
public void sendProducerCreditsFailMessage(int credits, SimpleString address)
{
}
@Override
public void closed()
{
}
}

View File

@ -0,0 +1,250 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.mqtt;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.server.ServerMessage;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
public class MQTTSessionState
{
private String clientId;
private ServerMessage willMessage;
private final ConcurrentHashMap<String, MqttTopicSubscription> subscriptions = new ConcurrentHashMap<>();
// Used to store Packet ID of Publish QoS1 and QoS2 message. See spec: 4.3.3 QoS 2: Exactly once delivery. Method B.
private Map<Integer, MQTTMessageInfo> messageRefStore;
private Map<String, Map<Long, Integer>> addressMessageMap;
private Set<Integer> pubRec;
private Set<Integer> pub;
private boolean attached = false;
private MQTTLogger log = MQTTLogger.LOGGER;
// Objects track the Outbound message references
private Map<Integer, Pair<String, Long>> outboundMessageReferenceStore;
private ConcurrentHashMap<String, ConcurrentHashMap<Long, Integer>> reverseOutboundReferenceStore;
private final Object outboundLock = new Object();
// FIXME We should use a better mechanism for creating packet IDs.
private AtomicInteger lastId = new AtomicInteger(0);
public MQTTSessionState(String clientId)
{
this.clientId = clientId;
pubRec = new HashSet<>();
pub = new HashSet<>();
outboundMessageReferenceStore = new ConcurrentHashMap<>();
reverseOutboundReferenceStore = new ConcurrentHashMap<>();
messageRefStore = new ConcurrentHashMap<>();
addressMessageMap = new ConcurrentHashMap<>();
}
int generateId()
{
lastId.compareAndSet(Short.MAX_VALUE, 1);
return lastId.addAndGet(1);
}
void addOutbandMessageRef(int mqttId, String address, long serverMessageId, int qos)
{
synchronized (outboundLock)
{
outboundMessageReferenceStore.put(mqttId, new Pair<String, Long>(address, serverMessageId));
if (qos == 2)
{
if (reverseOutboundReferenceStore.containsKey(address))
{
reverseOutboundReferenceStore.get(address).put(serverMessageId, mqttId);
}
else
{
ConcurrentHashMap<Long, Integer> serverToMqttId = new ConcurrentHashMap<Long, Integer>();
serverToMqttId.put(serverMessageId, mqttId);
reverseOutboundReferenceStore.put(address, serverToMqttId);
}
}
}
}
Pair<String, Long> removeOutbandMessageRef(int mqttId, int qos)
{
synchronized (outboundLock)
{
Pair<String, Long> messageInfo = outboundMessageReferenceStore.remove(mqttId);
if (qos == 1)
{
return messageInfo;
}
Map<Long, Integer> map = reverseOutboundReferenceStore.get(messageInfo.getA());
if (map != null)
{
map.remove(messageInfo.getB());
if (map.isEmpty())
{
reverseOutboundReferenceStore.remove(messageInfo.getA());
}
return messageInfo;
}
return null;
}
}
Set<Integer> getPubRec()
{
return pubRec;
}
Set<Integer> getPub()
{
return pub;
}
boolean getAttached()
{
return attached;
}
void setAttached(boolean attached)
{
this.attached = attached;
}
boolean isWill()
{
return willMessage != null;
}
ServerMessage getWillMessage()
{
return willMessage;
}
void setWillMessage(ServerMessage willMessage)
{
this.willMessage = willMessage;
}
void deleteWillMessage()
{
willMessage = null;
}
Collection<MqttTopicSubscription> getSubscriptions()
{
return subscriptions.values();
}
boolean addSubscription(MqttTopicSubscription subscription)
{
synchronized (subscriptions)
{
addressMessageMap.putIfAbsent(MQTTUtil.convertMQTTAddressFilterToCore(subscription.topicName()), new ConcurrentHashMap<Long, Integer>());
MqttTopicSubscription existingSubscription = subscriptions.get(subscription.topicName());
if (existingSubscription != null)
{
if (subscription.qualityOfService().value() > existingSubscription.qualityOfService().value())
{
subscriptions.put(subscription.topicName(), subscription);
return true;
}
}
else
{
subscriptions.put(subscription.topicName(), subscription);
return true;
}
}
return false;
}
void removeSubscription(String address)
{
synchronized (subscriptions)
{
subscriptions.remove(address);
addressMessageMap.remove(address);
}
}
MqttTopicSubscription getSubscription(String address)
{
return subscriptions.get(address);
}
String getClientId()
{
return clientId;
}
void setClientId(String clientId)
{
this.clientId = clientId;
}
void storeMessageRef(Integer mqttId, MQTTMessageInfo messageInfo, boolean storeAddress)
{
messageRefStore.put(mqttId, messageInfo);
if (storeAddress)
{
Map<Long, Integer> addressMap = addressMessageMap.get(messageInfo.getAddress());
if (addressMap != null)
{
addressMap.put(messageInfo.getServerMessageId(), mqttId);
}
}
}
void removeMessageRef(Integer mqttId)
{
MQTTMessageInfo info = messageRefStore.remove(mqttId);
if (info != null)
{
Map<Long, Integer> addressMap = addressMessageMap.get(info.getAddress());
if (addressMap != null)
{
addressMap.remove(info.getServerMessageId());
}
}
}
MQTTMessageInfo getMessageInfo(Integer mqttId)
{
return messageRefStore.get(mqttId);
}
}

View File

@ -0,0 +1,183 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.mqtt;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class MQTTSubscriptionManager
{
private MQTTSession session;
private ConcurrentHashMap<Long, Integer> consumerQoSLevels;
private ConcurrentHashMap<String, ServerConsumer> consumers;
private MQTTLogger log = MQTTLogger.LOGGER;
public MQTTSubscriptionManager(MQTTSession session)
{
this.session = session;
consumers = new ConcurrentHashMap<>();
consumerQoSLevels = new ConcurrentHashMap<>();
}
synchronized void start() throws Exception
{
for (MqttTopicSubscription subscription : session.getSessionState().getSubscriptions())
{
SimpleString q = createQueueForSubscription(subscription.topicName(), subscription.qualityOfService().value());
createConsumerForSubscriptionQueue(q, subscription.topicName(), subscription.qualityOfService().value());
}
}
synchronized void stop(boolean clean) throws Exception
{
for (ServerConsumer consumer : consumers.values())
{
consumer.setStarted(false);
consumer.disconnect();
consumer.getQueue().removeConsumer(consumer);
consumer.close(false);
}
if (clean)
{
for (ServerConsumer consumer : consumers.values())
{
session.getServer().destroyQueue(consumer.getQueue().getName());
}
}
}
/**
* Creates a Queue if it doesn't already exist, based on a topic and address. Returning the queue name.
*/
private SimpleString createQueueForSubscription(String topic, int qos) throws Exception
{
String address = MQTTUtil.convertMQTTAddressFilterToCore(topic);
SimpleString queue = getQueueNameForTopic(address);
Queue q = session.getServer().locateQueue(queue);
if (q == null)
{
session.getServerSession().createQueue(new SimpleString(address), queue, null, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0);
}
return queue;
}
/**
* Creates a new consumer for the queue associated with a subscription
*/
private void createConsumerForSubscriptionQueue(SimpleString queue, String topic, int qos) throws Exception
{
long cid = session.getServer().getStorageManager().generateID();
ServerConsumer consumer = session.getServerSession().createConsumer(cid, queue, null, false, true, -1);
consumer.setStarted(true);
consumers.put(topic, consumer);
consumerQoSLevels.put(cid, qos);
}
private void addSubscription(MqttTopicSubscription subscription) throws Exception
{
MqttTopicSubscription s = session.getSessionState().getSubscription(subscription.topicName());
int qos = subscription.qualityOfService().value();
String topic = subscription.topicName();
session.getSessionState().addSubscription(subscription);
SimpleString q = createQueueForSubscription(topic, qos);
if (s == null)
{
createConsumerForSubscriptionQueue(q, topic, qos);
}
else
{
consumerQoSLevels.put(consumers.get(topic).getID(), qos);
}
session.getRetainMessageManager().addRetainedMessagesToQueue(q, topic);
}
void removeSubscriptions(List<String> topics) throws Exception
{
for (String topic : topics)
{
removeSubscription(topic);
}
}
private synchronized void removeSubscription(String address) throws Exception
{
ServerConsumer consumer = consumers.get(address);
String internalAddress = MQTTUtil.convertMQTTAddressFilterToCore(address);
SimpleString internalQueueName = getQueueNameForTopic(internalAddress);
Queue queue = session.getServer().locateQueue(internalQueueName);
queue.deleteQueue(true);
session.getSessionState().removeSubscription(address);
consumers.remove(address);
consumerQoSLevels.remove(consumer.getID());
}
private SimpleString getQueueNameForTopic(String topic)
{
return new SimpleString(session.getSessionState().getClientId() + "." + topic);
}
/**
* As per MQTT Spec. Subscribes this client to a number of MQTT topics.
*
* @param subscriptions
* @return An array of integers representing the list of accepted QoS for each topic.
*
* @throws Exception
*/
int[] addSubscriptions(List<MqttTopicSubscription> subscriptions) throws Exception
{
int[] qos = new int[subscriptions.size()];
for (int i = 0; i < subscriptions.size(); i++)
{
addSubscription(subscriptions.get(i));
qos[i] = subscriptions.get(i).qualityOfService().value();
}
return qos;
}
Map<Long, Integer> getConsumerQoSLevels()
{
return consumerQoSLevels;
}
ServerConsumer getConsumerForAddress(String address)
{
return consumers.get(address);
}
}

View File

@ -0,0 +1,178 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.mqtt;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
/**
* A Utility Class for creating Server Side objects and converting MQTT concepts to/from Artemis.
*/
public class MQTTUtil
{
// TODO These settings should be configurable.
public static final int DEFAULT_SERVER_MESSAGE_BUFFER_SIZE = 512;
public static final boolean DURABLE_MESSAGES = true;
public static final boolean SESSION_AUTO_COMMIT_SENDS = true;
public static final boolean SESSION_AUTO_COMMIT_ACKS = false;
public static final boolean SESSION_PREACKNOWLEDGE = false;
public static final boolean SESSION_XA = false;
public static final boolean SESSION_AUTO_CREATE_QUEUE = false;
public static final int MAX_MESSAGE_SIZE = 268435455;
public static final String MQTT_ADDRESS_PREFIX = "$sys.mqtt.";
public static final String MQTT_RETAIN_ADDRESS_PREFIX = "$sys.mqtt.retain.";
public static final String MQTT_QOS_LEVEL_KEY = "mqtt.qos.level";
public static final String MQTT_MESSAGE_ID_KEY = "mqtt.message.id";
public static final String MQTT_MESSAGE_TYPE_KEY = "mqtt.message.type";
public static final String MQTT_MESSAGE_RETAIN_KEY = "mqtt.message.retain";
public static final int DEFAULT_KEEP_ALIVE_FREQUENCY = 5000;
public static String convertMQTTAddressFilterToCore(String filter)
{
return MQTT_ADDRESS_PREFIX + swapMQTTAndCoreWildCards(filter);
}
public static String convertCoreAddressFilterToMQTT(String filter)
{
if (filter.startsWith(MQTT_RETAIN_ADDRESS_PREFIX.toString()))
{
filter = filter.substring(MQTT_RETAIN_ADDRESS_PREFIX.length(), filter.length());
}
else if (filter.startsWith(MQTT_ADDRESS_PREFIX.toString()))
{
filter = filter.substring(MQTT_ADDRESS_PREFIX.length(), filter.length());
}
return swapMQTTAndCoreWildCards(filter);
}
public static String convertMQTTAddressFilterToCoreRetain(String filter)
{
return MQTT_RETAIN_ADDRESS_PREFIX + swapMQTTAndCoreWildCards(filter);
}
public static String swapMQTTAndCoreWildCards(String filter)
{
char[] topicFilter = filter.toCharArray();
for (int i = 0; i < topicFilter.length; i++)
{
switch (topicFilter[i])
{
case '/':
topicFilter[i] = '.'; break;
case '.':
topicFilter[i] = '/'; break;
case '*':
topicFilter[i] = '+'; break;
case '+':
topicFilter[i] = '*'; break;
default:
break;
}
}
return String.valueOf(topicFilter);
}
private static ServerMessage createServerMessage(MQTTSession session, SimpleString address, boolean retain, int qos)
{
long id = session.getServer().getStorageManager().generateID();
ServerMessageImpl message = new ServerMessageImpl(id, DEFAULT_SERVER_MESSAGE_BUFFER_SIZE);
message.setAddress(address);
message.putBooleanProperty(new SimpleString(MQTT_MESSAGE_RETAIN_KEY), retain);
message.putIntProperty(new SimpleString(MQTT_QOS_LEVEL_KEY), qos);
return message;
}
public static ServerMessage createServerMessageFromByteBuf(MQTTSession session, String topic, boolean retain, int qos, ByteBuf payload)
{
String coreAddress = convertMQTTAddressFilterToCore(topic);
ServerMessage message = createServerMessage(session, new SimpleString(coreAddress), retain, qos);
// FIXME does this involve a copy?
message.getBodyBuffer().writeBytes(new ChannelBufferWrapper(payload), payload.readableBytes());
return message;
}
public static ServerMessage createServerMessageFromString(MQTTSession session, String payload, String topic, int qos, boolean retain)
{
ServerMessage message = createServerMessage(session, new SimpleString(topic), retain, qos);
message.getBodyBuffer().writeString(payload);
return message;
}
public static ServerMessage createPubRelMessage(MQTTSession session, SimpleString address, int messageId)
{
ServerMessage message = createServerMessage(session, address, false, 1);
message.putIntProperty(new SimpleString(MQTTUtil.MQTT_MESSAGE_ID_KEY), messageId);
message.putIntProperty(new SimpleString(MQTTUtil.MQTT_MESSAGE_TYPE_KEY), MqttMessageType.PUBREL.value());
return message;
}
public static void logMessage(MQTTLogger logger, MqttMessage message, boolean inbound)
{
StringBuilder log = inbound ? new StringBuilder("Received ") : new StringBuilder("Sent ");
if (message.fixedHeader() != null)
{
log.append(message.fixedHeader().messageType().toString());
if (message.variableHeader() instanceof MqttPublishVariableHeader)
{
log.append("(" + ((MqttPublishVariableHeader) message.variableHeader()).messageId() + ") " + message.fixedHeader().qosLevel());
}
else if (message.variableHeader() instanceof MqttMessageIdVariableHeader)
{
log.append("(" + ((MqttMessageIdVariableHeader) message.variableHeader()).messageId() + ")");
}
if (message.fixedHeader().messageType() == MqttMessageType.SUBSCRIBE)
{
for (MqttTopicSubscription sub : ((MqttSubscribeMessage) message).payload().topicSubscriptions())
{
log.append("\n\t" + sub.topicName() + " : " + sub.qualityOfService());
}
}
logger.debug(log.toString());
}
}
}

View File

@ -0,0 +1 @@
org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManagerFactory

View File

@ -36,6 +36,7 @@
<module>artemis-openwire-protocol</module>
<module>artemis-proton-plug</module>
<module>artemis-hornetq-protocol</module>
<module>artemis-mqtt-protocol</module>
</modules>
</project>

23
pom.xml
View File

@ -166,6 +166,23 @@
<!-- License: CPL 1.0 -->
<!-- There are newer versions of the JUnit but they break our tests -->
</dependency>
<!-- ### For MQTT Tests -->
<dependency>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client</artifactId>
<version>1.10</version>
<scope>test</scope>
<!-- Apache v2.0 License -->
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>mqtt-client</artifactId>
<version>0.4.1-SNAPSHOT</version>
<scope>test</scope>
<!-- Eclipse Public License - v 1.0 -->
</dependency>
<!-- ## End Test Dependencies ## -->
<!-- ### Build Time Dependencies ### -->
@ -301,6 +318,12 @@
<version>${netty.version}</version>
<!-- License: Apache 2.0 -->
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-mqtt</artifactId>
<version>5.0.0.Alpha2</version>
<!-- License: Apache 2.0 -->
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>proton-j</artifactId>

View File

@ -32,6 +32,17 @@
<vertx.testtools.version>2.0.3-final</vertx.testtools.version>
</properties>
<repositories>
<!-- for the paho dependency -->
<repository>
<id>eclipse.m2</id>
<url>https://repo.eclipse.org/content/groups/snapshots/</url>
<releases><enabled>false</enabled></releases>
<snapshots><enabled>true</enabled></snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
@ -121,6 +132,32 @@
<artifactId>artemis-openwire-protocol</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-hornetq-protocol</artifactId>
<version>${project.version}</version>
</dependency>
<!-- MQTT Deps -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-mqtt-protocol</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>mqtt-client</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-mqtt</artifactId>
</dependency>
<!-- END MQTT Deps -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-aerogear-integration</artifactId>

View File

@ -0,0 +1,131 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.mqtt.imported;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import static org.fusesource.hawtbuf.UTF8Buffer.utf8;
public class FuseMQTTClientProvider implements MQTTClientProvider
{
private final MQTT mqtt = new MQTT();
private BlockingConnection connection;
@Override
public void connect(String host) throws Exception
{
mqtt.setHost(host);
mqtt.setVersion("3.1.1");
// shut off connect retry
mqtt.setConnectAttemptsMax(0);
mqtt.setReconnectAttemptsMax(0);
connection = mqtt.blockingConnection();
connection.connect();
}
@Override
public void disconnect() throws Exception
{
if (this.connection != null)
{
this.connection.disconnect();
}
}
@Override
public void publish(String topic, byte[] payload, int qos) throws Exception
{
publish(topic, payload, qos, false);
}
@Override
public void publish(String topic, byte[] payload, int qos, boolean retained) throws Exception
{
connection.publish(topic, payload, QoS.values()[qos], retained);
}
@Override
public void subscribe(String topic, int qos) throws Exception
{
Topic[] topics = {new Topic(utf8(topic), QoS.values()[qos])};
connection.subscribe(topics);
}
@Override
public void unsubscribe(String topic) throws Exception
{
connection.unsubscribe(new String[]{topic});
}
@Override
public byte[] receive(int timeout) throws Exception
{
byte[] result = null;
Message message = connection.receive(timeout, TimeUnit.MILLISECONDS);
if (message != null)
{
result = message.getPayload();
message.ack();
}
return result;
}
@Override
public void setSslContext(SSLContext sslContext)
{
mqtt.setSslContext(sslContext);
}
@Override
public void setWillMessage(String string)
{
mqtt.setWillMessage(string);
}
@Override
public void setWillTopic(String topic)
{
mqtt.setWillTopic(topic);
}
@Override
public void setClientId(String clientId)
{
mqtt.setClientId(clientId);
}
@Override
public void kill() throws Exception
{
connection.kill();
}
@Override
public void setKeepAlive(int keepAlive) throws Exception
{
mqtt.setKeepAlive((short) keepAlive);
}
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.mqtt.imported;
public interface MQTTClientProvider
{
void connect(String host) throws Exception;
void disconnect() throws Exception;
void publish(String topic, byte[] payload, int qos, boolean retained) throws Exception;
void publish(String topic, byte[] payload, int qos) throws Exception;
void subscribe(String topic, int qos) throws Exception;
void unsubscribe(String topic) throws Exception;
byte[] receive(int timeout) throws Exception;
void setSslContext(javax.net.ssl.SSLContext sslContext);
void setWillMessage(String string);
void setWillTopic(String topic);
void setClientId(String clientId);
void kill() throws Exception;
void setKeepAlive(int keepAlive) throws Exception;
}

View File

@ -0,0 +1,376 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.mqtt.imported;
import javax.net.ssl.KeyManager;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.io.File;
import java.io.IOException;
import java.security.ProtectionDomain;
import java.security.SecureRandom;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Tracer;
import org.fusesource.mqtt.codec.MQTTFrame;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MQTTTestSupport extends ActiveMQTestBase
{
private ActiveMQServer server;
private static final Logger LOG = LoggerFactory.getLogger(MQTTTestSupport.class);
protected int port = 1883;
protected ActiveMQConnectionFactory cf;
protected LinkedList<Throwable> exceptions = new LinkedList<Throwable>();
protected boolean persistent;
protected String protocolConfig;
protected String protocolScheme;
protected boolean useSSL;
public static final int AT_MOST_ONCE = 0;
public static final int AT_LEAST_ONCE = 1;
public static final int EXACTLY_ONCE = 2;
@Rule
public TestName name = new TestName();
public MQTTTestSupport()
{
this.protocolScheme = "mqtt";
this.useSSL = false;
cf = new ActiveMQConnectionFactory(false, new TransportConfiguration(ActiveMQTestBase.NETTY_CONNECTOR_FACTORY));
}
public File basedir() throws IOException
{
ProtectionDomain protectionDomain = getClass().getProtectionDomain();
return new File(new File(protectionDomain.getCodeSource().getLocation().getPath()), "../..").getCanonicalFile();
}
public MQTTTestSupport(String connectorScheme, boolean useSSL)
{
this.protocolScheme = connectorScheme;
this.useSSL = useSSL;
}
public String getName()
{
return name.getMethodName();
}
@Before
public void setUp() throws Exception
{
String basedir = basedir().getPath();
System.setProperty("javax.net.ssl.trustStore", basedir + "/src/test/resources/client.keystore");
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "jks");
System.setProperty("javax.net.ssl.keyStore", basedir + "/src/test/resources/server.keystore");
System.setProperty("javax.net.ssl.keyStorePassword", "password");
System.setProperty("javax.net.ssl.keyStoreType", "jks");
exceptions.clear();
startBroker();
}
@After
public void tearDown() throws Exception
{
stopBroker();
}
public void startBroker() throws Exception
{
// TODO Add SSL
super.setUp();
server = createServer(true, true);
addCoreConnector();
addMQTTConnector();
AddressSettings addressSettings = new AddressSettings();
addressSettings.setMaxSizeBytes(999999999);
server.getAddressSettingsRepository().addMatch("#", addressSettings);
server.start();
server.waitForActivation(10, TimeUnit.SECONDS);
}
protected void addCoreConnector() throws Exception
{
// Overrides of this method can add additional configuration options or add multiple
// MQTT transport connectors as needed, the port variable is always supposed to be
// assigned the primary MQTT connector's port.
HashMap<String, Object> params = new HashMap<String, Object>();
params.put(TransportConstants.PORT_PROP_NAME, "" + 5445);
params.put(TransportConstants.PROTOCOLS_PROP_NAME, "CORE");
TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration);
LOG.info("Added connector {} to broker", getProtocolScheme());
}
protected void addMQTTConnector() throws Exception
{
// Overrides of this method can add additional configuration options or add multiple
// MQTT transport connectors as needed, the port variable is always supposed to be
// assigned the primary MQTT connector's port.
HashMap<String, Object> params = new HashMap<String, Object>();
params.put(TransportConstants.PORT_PROP_NAME, "" + port);
params.put(TransportConstants.PROTOCOLS_PROP_NAME, "MQTT");
TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration);
LOG.info("Added connector {} to broker", getProtocolScheme());
}
public void stopBroker() throws Exception
{
if (server.isStarted())
{
server.stop();
server = null;
}
}
protected String getQueueName()
{
return getClass().getName() + "." + name.getMethodName();
}
protected String getTopicName()
{
return getClass().getName() + "." + name.getMethodName();
}
/**
* Initialize an MQTTClientProvider instance. By default this method uses the port that's
* assigned to be the TCP based port using the base version of addMQTTConnector. A subclass
* can either change the value of port or override this method to assign the correct port.
*
* @param provider the MQTTClientProvider instance to initialize.
* @throws Exception if an error occurs during initialization.
*/
protected void initializeConnection(MQTTClientProvider provider) throws Exception
{
if (!isUseSSL())
{
provider.connect("tcp://localhost:" + port);
}
else
{
SSLContext ctx = SSLContext.getInstance("TLS");
ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom());
provider.setSslContext(ctx);
provider.connect("ssl://localhost:" + port);
}
}
public String getProtocolScheme()
{
return protocolScheme;
}
public void setProtocolScheme(String scheme)
{
this.protocolScheme = scheme;
}
public boolean isUseSSL()
{
return this.useSSL;
}
public void setUseSSL(boolean useSSL)
{
this.useSSL = useSSL;
}
public boolean isPersistent()
{
return persistent;
}
public int getPort()
{
return this.port;
}
public boolean isSchedulerSupportEnabled()
{
return false;
}
protected interface Task
{
void run() throws Exception;
}
protected void within(int time, TimeUnit unit, Task task) throws InterruptedException
{
long timeMS = unit.toMillis(time);
long deadline = System.currentTimeMillis() + timeMS;
while (true)
{
try
{
task.run();
return;
}
catch (Throwable e)
{
long remaining = deadline - System.currentTimeMillis();
if (remaining <= 0)
{
if (e instanceof RuntimeException)
{
throw (RuntimeException) e;
}
if (e instanceof Error)
{
throw (Error) e;
}
throw new RuntimeException(e);
}
Thread.sleep(Math.min(timeMS / 10, remaining));
}
}
}
protected MQTTClientProvider getMQTTClientProvider()
{
return new FuseMQTTClientProvider();
}
protected MQTT createMQTTConnection() throws Exception
{
MQTT client = createMQTTConnection(null, false);
client.setVersion("3.1.1");
return client;
}
protected MQTT createMQTTConnection(String clientId, boolean clean) throws Exception
{
if (isUseSSL())
{
return createMQTTSslConnection(clientId, clean);
}
else
{
return createMQTTTcpConnection(clientId, clean);
}
}
private MQTT createMQTTTcpConnection(String clientId, boolean clean) throws Exception
{
MQTT mqtt = new MQTT();
mqtt.setConnectAttemptsMax(1);
mqtt.setReconnectAttemptsMax(0);
mqtt.setTracer(createTracer());
mqtt.setVersion("3.1.1");
if (clientId != null)
{
mqtt.setClientId(clientId);
}
mqtt.setCleanSession(clean);
mqtt.setHost("localhost", port);
return mqtt;
}
private MQTT createMQTTSslConnection(String clientId, boolean clean) throws Exception
{
MQTT mqtt = new MQTT();
mqtt.setConnectAttemptsMax(1);
mqtt.setReconnectAttemptsMax(0);
mqtt.setTracer(createTracer());
mqtt.setHost("ssl://localhost:" + port);
if (clientId != null)
{
mqtt.setClientId(clientId);
}
mqtt.setCleanSession(clean);
SSLContext ctx = SSLContext.getInstance("TLS");
ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom());
mqtt.setSslContext(ctx);
return mqtt;
}
protected Tracer createTracer()
{
return new Tracer()
{
@Override
public void onReceive(MQTTFrame frame)
{
LOG.info("Client Received:\n" + frame);
}
@Override
public void onSend(MQTTFrame frame)
{
LOG.info("Client Sent:\n" + frame);
}
@Override
public void debug(String message, Object... args)
{
LOG.info(String.format(message, args));
}
};
}
static class DefaultTrustManager implements X509TrustManager
{
@Override
public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException
{
}
@Override
public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException
{
}
@Override
public X509Certificate[] getAcceptedIssuers()
{
return new X509Certificate[0];
}
}
}

View File

@ -0,0 +1,175 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.mqtt.imported;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTLogger;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.Test;
public class PahoMQTTTest extends MQTTTestSupport
{
private static MQTTLogger LOG = MQTTLogger.LOGGER;
@Test(timeout = 300000)
public void testLotsOfClients() throws Exception
{
final int CLIENTS = Integer.getInteger("PahoMQTTTest.CLIENTS", 100);
LOG.info("Using: {} clients: " + CLIENTS);
final AtomicInteger receiveCounter = new AtomicInteger();
MqttClient client = createPahoClient("consumer");
client.setCallback(new MqttCallback()
{
@Override
public void connectionLost(Throwable cause)
{
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception
{
receiveCounter.incrementAndGet();
}
@Override
public void deliveryComplete(IMqttDeliveryToken token)
{
}
});
client.connect();
client.subscribe("test");
final AtomicReference<Throwable> asyncError = new AtomicReference<Throwable>();
final CountDownLatch connectedDoneLatch = new CountDownLatch(CLIENTS);
final CountDownLatch disconnectDoneLatch = new CountDownLatch(CLIENTS);
final CountDownLatch sendBarrier = new CountDownLatch(1);
for (int i = 0; i < CLIENTS; i++)
{
Thread.sleep(10);
new Thread(null, null, "client:" + i)
{
@Override
public void run()
{
try
{
MqttClient client = createPahoClient(Thread.currentThread().getName());
client.connect();
connectedDoneLatch.countDown();
sendBarrier.await();
for (int i = 0; i < 10; i++)
{
Thread.sleep(1000);
client.publish("test", "hello".getBytes(), 1, false);
}
client.disconnect();
client.close();
}
catch (Throwable e)
{
e.printStackTrace();
asyncError.set(e);
}
finally
{
disconnectDoneLatch.countDown();
}
}
}.start();
}
connectedDoneLatch.await();
assertNull("Async error: " + asyncError.get(), asyncError.get());
sendBarrier.countDown();
LOG.info("All clients connected... waiting to receive sent messages...");
// We should eventually get all the messages.
within(30, TimeUnit.SECONDS, new Task()
{
@Override
public void run() throws Exception
{
assertTrue(receiveCounter.get() == CLIENTS * 10);
}
});
LOG.info("All messages received.");
disconnectDoneLatch.await();
assertNull("Async error: " + asyncError.get(), asyncError.get());
}
@Test(timeout = 300000)
public void testSendAndReceiveMQTT() throws Exception
{
final CountDownLatch latch = new CountDownLatch(1);
MqttClient consumer = createPahoClient("consumerId");
MqttClient producer = createPahoClient("producerId");
consumer.connect();
consumer.subscribe("test");
consumer.setCallback(new MqttCallback()
{
@Override
public void connectionLost(Throwable cause)
{
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception
{
latch.countDown();
}
@Override
public void deliveryComplete(IMqttDeliveryToken token)
{
}
});
producer.connect();
producer.publish("test", "hello".getBytes(), 1, false);
waitForLatch(latch);
producer.disconnect();
producer.close();
}
private MqttClient createPahoClient(String clientId) throws MqttException
{
return new MqttClient("tcp://localhost:" + getPort(), clientId, new MemoryPersistence());
}
}

View File

@ -0,0 +1,284 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.mqtt.imported.util;
import javax.annotation.PostConstruct;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import java.io.File;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.security.KeyStore;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import org.apache.activemq.broker.SslContext;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
import org.springframework.core.io.UrlResource;
import org.springframework.util.ResourceUtils;
/**
* Extends the SslContext so that it's easier to configure from spring.
*/
public class ResourceLoadingSslContext extends SslContext
{
private String keyStoreType = "jks";
private String trustStoreType = "jks";
private String secureRandomAlgorithm = "SHA1PRNG";
private String keyStoreAlgorithm = KeyManagerFactory.getDefaultAlgorithm();
private String trustStoreAlgorithm = TrustManagerFactory.getDefaultAlgorithm();
private String keyStore;
private String trustStore;
private String keyStoreKeyPassword;
private String keyStorePassword;
private String trustStorePassword;
/**
* JSR-250 callback wrapper; converts checked exceptions to runtime exceptions
* <p/>
* delegates to afterPropertiesSet, done to prevent backwards incompatible
* signature change.
*/
@PostConstruct
private void postConstruct()
{
try
{
afterPropertiesSet();
}
catch (Exception ex)
{
throw new RuntimeException(ex);
}
}
/**
* @throws Exception
* @org.apache.xbean.InitMethod
*/
public void afterPropertiesSet() throws Exception
{
keyManagers.addAll(createKeyManagers());
trustManagers.addAll(createTrustManagers());
if (secureRandom == null)
{
secureRandom = createSecureRandom();
}
}
private SecureRandom createSecureRandom() throws NoSuchAlgorithmException
{
return SecureRandom.getInstance(secureRandomAlgorithm);
}
private Collection<TrustManager> createTrustManagers() throws Exception
{
KeyStore ks = createTrustManagerKeyStore();
if (ks == null)
{
return new ArrayList<TrustManager>(0);
}
TrustManagerFactory tmf = TrustManagerFactory.getInstance(trustStoreAlgorithm);
tmf.init(ks);
return Arrays.asList(tmf.getTrustManagers());
}
private Collection<KeyManager> createKeyManagers() throws Exception
{
KeyStore ks = createKeyManagerKeyStore();
if (ks == null)
{
return new ArrayList<KeyManager>(0);
}
KeyManagerFactory tmf = KeyManagerFactory.getInstance(keyStoreAlgorithm);
tmf.init(ks, keyStoreKeyPassword == null ? (keyStorePassword == null ? null : keyStorePassword.toCharArray()) : keyStoreKeyPassword.toCharArray());
return Arrays.asList(tmf.getKeyManagers());
}
private KeyStore createTrustManagerKeyStore() throws Exception
{
if (trustStore == null)
{
return null;
}
KeyStore ks = KeyStore.getInstance(trustStoreType);
InputStream is = resourceFromString(trustStore).getInputStream();
try
{
ks.load(is, trustStorePassword == null ? null : trustStorePassword.toCharArray());
}
finally
{
is.close();
}
return ks;
}
private KeyStore createKeyManagerKeyStore() throws Exception
{
if (keyStore == null)
{
return null;
}
KeyStore ks = KeyStore.getInstance(keyStoreType);
InputStream is = resourceFromString(keyStore).getInputStream();
try
{
ks.load(is, keyStorePassword == null ? null : keyStorePassword.toCharArray());
}
finally
{
is.close();
}
return ks;
}
public String getTrustStoreType()
{
return trustStoreType;
}
public String getKeyStoreType()
{
return keyStoreType;
}
public String getKeyStore()
{
return keyStore;
}
public void setKeyStore(String keyStore) throws MalformedURLException
{
this.keyStore = keyStore;
}
public String getTrustStore()
{
return trustStore;
}
public void setTrustStore(String trustStore) throws MalformedURLException
{
this.trustStore = trustStore;
}
public String getKeyStoreAlgorithm()
{
return keyStoreAlgorithm;
}
public void setKeyStoreAlgorithm(String keyAlgorithm)
{
this.keyStoreAlgorithm = keyAlgorithm;
}
public String getTrustStoreAlgorithm()
{
return trustStoreAlgorithm;
}
public void setTrustStoreAlgorithm(String trustAlgorithm)
{
this.trustStoreAlgorithm = trustAlgorithm;
}
public String getKeyStoreKeyPassword()
{
return keyStoreKeyPassword;
}
public void setKeyStoreKeyPassword(String keyPassword)
{
this.keyStoreKeyPassword = keyPassword;
}
public String getKeyStorePassword()
{
return keyStorePassword;
}
public void setKeyStorePassword(String keyPassword)
{
this.keyStorePassword = keyPassword;
}
public String getTrustStorePassword()
{
return trustStorePassword;
}
public void setTrustStorePassword(String trustPassword)
{
this.trustStorePassword = trustPassword;
}
public void setKeyStoreType(String keyType)
{
this.keyStoreType = keyType;
}
public void setTrustStoreType(String trustType)
{
this.trustStoreType = trustType;
}
public String getSecureRandomAlgorithm()
{
return secureRandomAlgorithm;
}
public void setSecureRandomAlgorithm(String secureRandomAlgorithm)
{
this.secureRandomAlgorithm = secureRandomAlgorithm;
}
public static Resource resourceFromString(String uri) throws MalformedURLException
{
Resource resource;
File file = new File(uri);
if (file.exists())
{
resource = new FileSystemResource(uri);
}
else if (ResourceUtils.isUrl(uri))
{
resource = new UrlResource(uri);
}
else
{
resource = new ClassPathResource(uri);
}
return resource;
}
}

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.mqtt.imported.util;
import java.util.concurrent.TimeUnit;
public class Wait
{
public static final long MAX_WAIT_MILLIS = 30 * 1000;
public static final int SLEEP_MILLIS = 1000;
public interface Condition
{
boolean isSatisified() throws Exception;
}
public static boolean waitFor(Condition condition) throws Exception
{
return waitFor(condition, MAX_WAIT_MILLIS);
}
public static boolean waitFor(final Condition condition, final long duration) throws Exception
{
return waitFor(condition, duration, SLEEP_MILLIS);
}
public static boolean waitFor(final Condition condition, final long duration, final int sleepMillis) throws Exception
{
final long expiry = System.currentTimeMillis() + duration;
boolean conditionSatisified = condition.isSatisified();
while (!conditionSatisified && System.currentTimeMillis() < expiry)
{
TimeUnit.MILLISECONDS.sleep(sleepMillis);
conditionSatisified = condition.isSatisified();
}
return conditionSatisified;
}
}