This commit is contained in:
Clebert Suconic 2017-08-09 21:00:58 -04:00
commit ba1b68faf5
2 changed files with 8 additions and 8 deletions

View File

@ -17,13 +17,13 @@
package org.apache.activemq.artemis.core.protocol.mqtt; package org.apache.activemq.artemis.core.protocol.mqtt;
import java.nio.charset.Charset;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode; import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.util.CharsetUtil;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.ServerSession;
@ -32,7 +32,7 @@ import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
/** /**
* MQTTConnectionMananager is responsible for handle Connect and Disconnect packets and any resulting behaviour of these * MQTTConnectionManager is responsible for handle Connect and Disconnect packets and any resulting behaviour of these
* events. * events.
*/ */
public class MQTTConnectionManager { public class MQTTConnectionManager {
@ -65,9 +65,9 @@ public class MQTTConnectionManager {
*/ */
synchronized void connect(String cId, synchronized void connect(String cId,
String username, String username,
String password, byte[] passwordInBytes,
boolean will, boolean will,
String willMessage, byte[] willMessage,
String willTopic, String willTopic,
boolean willRetain, boolean willRetain,
int willQosLevel, int willQosLevel,
@ -80,6 +80,7 @@ public class MQTTConnectionManager {
} }
session.setSessionState(getSessionState(clientId)); session.setSessionState(getSessionState(clientId));
String password = passwordInBytes == null ? null : new String(passwordInBytes, CharsetUtil.UTF_8);
ServerSessionImpl serverSession = createServerSession(username, password); ServerSessionImpl serverSession = createServerSession(username, password);
serverSession.start(); serverSession.start();
@ -88,9 +89,8 @@ public class MQTTConnectionManager {
if (will) { if (will) {
isWill = true; isWill = true;
byte[] payload = willMessage.getBytes(Charset.forName("UTF-8")); this.willMessage = ByteBufAllocator.DEFAULT.buffer(willMessage.length);
this.willMessage = ByteBufAllocator.DEFAULT.buffer(payload.length); this.willMessage.writeBytes(willMessage);
this.willMessage.writeBytes(payload);
this.willQoSLevel = willQosLevel; this.willQoSLevel = willQosLevel;
this.willRetain = willRetain; this.willRetain = willRetain;
this.willTopic = willTopic; this.willTopic = willTopic;

View File

@ -171,7 +171,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
connectionEntry.ttl = connect.variableHeader().keepAliveTimeSeconds() * 1500L; connectionEntry.ttl = connect.variableHeader().keepAliveTimeSeconds() * 1500L;
String clientId = connect.payload().clientIdentifier(); String clientId = connect.payload().clientIdentifier();
session.getConnectionManager().connect(clientId, connect.payload().userName(), connect.payload().password(), connect.variableHeader().isWillFlag(), connect.payload().willMessage(), connect.payload().willTopic(), connect.variableHeader().isWillRetain(), connect.variableHeader().willQos(), connect.variableHeader().isCleanSession()); session.getConnectionManager().connect(clientId, connect.payload().userName(), connect.payload().passwordInBytes(), connect.variableHeader().isWillFlag(), connect.payload().willMessageInBytes(), connect.payload().willTopic(), connect.variableHeader().isWillRetain(), connect.variableHeader().willQos(), connect.variableHeader().isCleanSession());
} }
void disconnect(boolean error) { void disconnect(boolean error) {