diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index 39e9b84ee7..5bd1a3293a 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -700,43 +700,38 @@ public class MQTTProtocolConverter { ResponseHandler createResponseHandler(final PUBLISH command) { if (command != null) { - switch (command.qos()) { - case AT_LEAST_ONCE: - return new ResponseHandler() { - @Override - public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { - if (response.isException()) { - LOG.warn("Failed to send MQTT Publish: ", command, ((ExceptionResponse) response).getException()); - } else { - PUBACK ack = new PUBACK(); - ack.messageId(command.messageId()); - LOG.trace("MQTT Snd PUBACK message:{} client:{} connection:{}", - command.messageId(), clientId, connectionInfo.getConnectionId()); - converter.getMQTTTransport().sendToMQTT(ack.encode()); + return new ResponseHandler() { + @Override + public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { + if (response.isException()) { + Throwable error = ((ExceptionResponse) response).getException(); + LOG.warn("Failed to send MQTT Publish: ", command, error.getMessage()); + LOG.trace("Error trace: {}", error); + } + + switch (command.qos()) { + case AT_LEAST_ONCE: + PUBACK ack = new PUBACK(); + ack.messageId(command.messageId()); + LOG.trace("MQTT Snd PUBACK message:{} client:{} connection:{}", + command.messageId(), clientId, connectionInfo.getConnectionId()); + converter.getMQTTTransport().sendToMQTT(ack.encode()); + break; + case EXACTLY_ONCE: + PUBREC req = new PUBREC(); + req.messageId(command.messageId()); + synchronized (publisherRecs) { + publisherRecs.put(command.messageId(), req); } - } - }; - case EXACTLY_ONCE: - return new ResponseHandler() { - @Override - public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { - if (response.isException()) { - LOG.warn("Failed to send MQTT Publish: ", command, ((ExceptionResponse) response).getException()); - } else { - PUBREC ack = new PUBREC(); - ack.messageId(command.messageId()); - synchronized (publisherRecs) { - publisherRecs.put(command.messageId(), ack); - } - LOG.trace("MQTT Snd PUBACK message:{} client:{} connection:{}", - command.messageId(), clientId, connectionInfo.getConnectionId()); - converter.getMQTTTransport().sendToMQTT(ack.encode()); - } - } - }; - case AT_MOST_ONCE: - break; - } + LOG.trace("MQTT Snd PUBREC message:{} client:{} connection:{}", + command.messageId(), clientId, connectionInfo.getConnectionId()); + converter.getMQTTTransport().sendToMQTT(req.encode()); + break; + default: + break; + } + } + }; } return null; } diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTest.java index 77942a08bd..7ffb3e80ab 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTest.java @@ -196,6 +196,45 @@ public class MQTTAuthTest extends MQTTAuthTestSupport { assertNull(msg); } + @Test(timeout = 30 * 1000) + public void testPublishWhenNotAuthorizedDoesNotStall() throws Exception { + + getProxyToBroker().addTopic("USERS.foo"); + + MQTT mqtt = null; + BlockingConnection connection = null; + + // Test 3.1 functionality + mqtt = createMQTTConnection("pub", true); + mqtt.setUserName("guest"); + mqtt.setPassword("password"); + mqtt.setVersion("3.1"); + + connection = mqtt.blockingConnection(); + connection.connect(); + connection.publish("USERS.foo", "test-AT_MOST_ONCE".getBytes(), QoS.AT_MOST_ONCE, true); + connection.publish("USERS.foo", "test-AT_LEAST_ONCE".getBytes(), QoS.AT_LEAST_ONCE, true); + connection.publish("USERS.foo", "test-EXACTLY_ONCE".getBytes(), QoS.EXACTLY_ONCE, true); + connection.disconnect(); + + assertEquals(0, getProxyToTopic("USERS.foo").getEnqueueCount()); + + // Test 3.1.1 functionality + mqtt = createMQTTConnection("pub", true); + mqtt.setUserName("guest"); + mqtt.setPassword("password"); + mqtt.setVersion("3.1.1"); + + connection = mqtt.blockingConnection(); + connection.connect(); + connection.publish("USERS.foo", "test-AT_MOST_ONCE".getBytes(), QoS.AT_MOST_ONCE, true); + connection.publish("USERS.foo", "test-AT_LEAST_ONCE".getBytes(), QoS.AT_LEAST_ONCE, true); + connection.publish("USERS.foo", "test-EXACTLY_ONCE".getBytes(), QoS.EXACTLY_ONCE, true); + connection.disconnect(); + + assertEquals(0, getProxyToTopic("USERS.foo").getEnqueueCount()); + } + @Test(timeout = 60 * 1000) public void testWildcardRetainedSubscription() throws Exception { MQTT mqttPub = createMQTTConnection("pub", true);