Ensure that a publish receives an ACK even when the user is not
authorized to write to the target destination
This commit is contained in:
Timothy Bish 2015-06-10 14:59:02 -04:00
parent 3100909041
commit 789eb9abf9
2 changed files with 70 additions and 36 deletions

View File

@ -700,43 +700,38 @@ public class MQTTProtocolConverter {
ResponseHandler createResponseHandler(final PUBLISH command) { ResponseHandler createResponseHandler(final PUBLISH command) {
if (command != null) { if (command != null) {
switch (command.qos()) { return new ResponseHandler() {
case AT_LEAST_ONCE: @Override
return new ResponseHandler() { public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
@Override if (response.isException()) {
public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { Throwable error = ((ExceptionResponse) response).getException();
if (response.isException()) { LOG.warn("Failed to send MQTT Publish: ", command, error.getMessage());
LOG.warn("Failed to send MQTT Publish: ", command, ((ExceptionResponse) response).getException()); LOG.trace("Error trace: {}", error);
} else { }
PUBACK ack = new PUBACK();
ack.messageId(command.messageId()); switch (command.qos()) {
LOG.trace("MQTT Snd PUBACK message:{} client:{} connection:{}", case AT_LEAST_ONCE:
command.messageId(), clientId, connectionInfo.getConnectionId()); PUBACK ack = new PUBACK();
converter.getMQTTTransport().sendToMQTT(ack.encode()); ack.messageId(command.messageId());
LOG.trace("MQTT Snd PUBACK message:{} client:{} connection:{}",
command.messageId(), clientId, connectionInfo.getConnectionId());
converter.getMQTTTransport().sendToMQTT(ack.encode());
break;
case EXACTLY_ONCE:
PUBREC req = new PUBREC();
req.messageId(command.messageId());
synchronized (publisherRecs) {
publisherRecs.put(command.messageId(), req);
} }
} LOG.trace("MQTT Snd PUBREC message:{} client:{} connection:{}",
}; command.messageId(), clientId, connectionInfo.getConnectionId());
case EXACTLY_ONCE: converter.getMQTTTransport().sendToMQTT(req.encode());
return new ResponseHandler() { break;
@Override default:
public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { break;
if (response.isException()) { }
LOG.warn("Failed to send MQTT Publish: ", command, ((ExceptionResponse) response).getException()); }
} else { };
PUBREC ack = new PUBREC();
ack.messageId(command.messageId());
synchronized (publisherRecs) {
publisherRecs.put(command.messageId(), ack);
}
LOG.trace("MQTT Snd PUBACK message:{} client:{} connection:{}",
command.messageId(), clientId, connectionInfo.getConnectionId());
converter.getMQTTTransport().sendToMQTT(ack.encode());
}
}
};
case AT_MOST_ONCE:
break;
}
} }
return null; return null;
} }

View File

@ -196,6 +196,45 @@ public class MQTTAuthTest extends MQTTAuthTestSupport {
assertNull(msg); assertNull(msg);
} }
@Test(timeout = 30 * 1000)
public void testPublishWhenNotAuthorizedDoesNotStall() throws Exception {
getProxyToBroker().addTopic("USERS.foo");
MQTT mqtt = null;
BlockingConnection connection = null;
// Test 3.1 functionality
mqtt = createMQTTConnection("pub", true);
mqtt.setUserName("guest");
mqtt.setPassword("password");
mqtt.setVersion("3.1");
connection = mqtt.blockingConnection();
connection.connect();
connection.publish("USERS.foo", "test-AT_MOST_ONCE".getBytes(), QoS.AT_MOST_ONCE, true);
connection.publish("USERS.foo", "test-AT_LEAST_ONCE".getBytes(), QoS.AT_LEAST_ONCE, true);
connection.publish("USERS.foo", "test-EXACTLY_ONCE".getBytes(), QoS.EXACTLY_ONCE, true);
connection.disconnect();
assertEquals(0, getProxyToTopic("USERS.foo").getEnqueueCount());
// Test 3.1.1 functionality
mqtt = createMQTTConnection("pub", true);
mqtt.setUserName("guest");
mqtt.setPassword("password");
mqtt.setVersion("3.1.1");
connection = mqtt.blockingConnection();
connection.connect();
connection.publish("USERS.foo", "test-AT_MOST_ONCE".getBytes(), QoS.AT_MOST_ONCE, true);
connection.publish("USERS.foo", "test-AT_LEAST_ONCE".getBytes(), QoS.AT_LEAST_ONCE, true);
connection.publish("USERS.foo", "test-EXACTLY_ONCE".getBytes(), QoS.EXACTLY_ONCE, true);
connection.disconnect();
assertEquals(0, getProxyToTopic("USERS.foo").getEnqueueCount());
}
@Test(timeout = 60 * 1000) @Test(timeout = 60 * 1000)
public void testWildcardRetainedSubscription() throws Exception { public void testWildcardRetainedSubscription() throws Exception {
MQTT mqttPub = createMQTTConnection("pub", true); MQTT mqttPub = createMQTTConnection("pub", true);