mirror of https://github.com/apache/activemq.git
This commit is contained in:
parent
f42d56c1f8
commit
b679787917
|
@ -187,7 +187,7 @@ public class MQTTProtocolConverter {
|
|||
void onMQTTConnect(final CONNECT connect) throws MQTTProtocolException {
|
||||
|
||||
if (connected.get()) {
|
||||
throw new MQTTProtocolException("All ready connected.");
|
||||
throw new MQTTProtocolException("Already connected.");
|
||||
}
|
||||
this.connect = connect;
|
||||
|
||||
|
@ -211,6 +211,18 @@ public class MQTTProtocolConverter {
|
|||
if (clientId != null && !clientId.isEmpty()) {
|
||||
connectionInfo.setClientId(clientId);
|
||||
} else {
|
||||
// Clean Session MUST be set for 0 length Client Id
|
||||
if (!connect.cleanSession()) {
|
||||
CONNACK ack = new CONNACK();
|
||||
ack.code(CONNACK.Code.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
|
||||
try {
|
||||
getMQTTTransport().sendToMQTT(ack.encode());
|
||||
getMQTTTransport().onException(IOExceptionSupport.create("Invalid Client ID", null));
|
||||
} catch (IOException e) {
|
||||
getMQTTTransport().onException(IOExceptionSupport.create(e));
|
||||
}
|
||||
return;
|
||||
}
|
||||
connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString());
|
||||
}
|
||||
|
||||
|
@ -249,6 +261,7 @@ public class MQTTProtocolConverter {
|
|||
ack.code(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);
|
||||
getMQTTTransport().sendToMQTT(ack.encode());
|
||||
getMQTTTransport().onException(IOExceptionSupport.create(exception));
|
||||
return;
|
||||
}
|
||||
|
||||
CONNACK ack = new CONNACK();
|
||||
|
@ -289,7 +302,6 @@ public class MQTTProtocolConverter {
|
|||
|
||||
public void restoreDurableSubs(List<SubscriptionInfo> subs) {
|
||||
try {
|
||||
SUBSCRIBE command = new SUBSCRIBE();
|
||||
for (SubscriptionInfo sub : subs) {
|
||||
String name = sub.getSubcriptionName();
|
||||
String[] split = name.split(":", 2);
|
||||
|
|
Loading…
Reference in New Issue