AMQ-5101 - MQTT SUBACK packet MUST use return code 0x80 to report failed topic subscriptions

This commit is contained in:
Dejan Bosanac 2014-03-14 14:10:12 +01:00
parent 1671523076
commit ac23b01749
2 changed files with 85 additions and 8 deletions

View File

@ -52,6 +52,7 @@ public class MQTTProtocolConverter {
private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode(); private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode();
private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD= 0.5; private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD= 0.5;
private static final int DEFAULT_CACHE_SIZE = 5000; private static final int DEFAULT_CACHE_SIZE = 5000;
private static final byte SUBSCRIBE_ERROR = (byte) 0x80;
private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId()); private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
private final SessionId sessionId = new SessionId(connectionId, -1); private final SessionId sessionId = new SessionId(connectionId, -1);
@ -332,7 +333,7 @@ public class MQTTProtocolConverter {
if (topics != null) { if (topics != null) {
byte[] qos = new byte[topics.length]; byte[] qos = new byte[topics.length];
for (int i = 0; i < topics.length; i++) { for (int i = 0; i < topics.length; i++) {
qos[i] = (byte) onSubscribe(topics[i]).ordinal(); qos[i] = onSubscribe(topics[i]);
} }
SUBACK ack = new SUBACK(); SUBACK ack = new SUBACK();
ack.messageId(command.messageId()); ack.messageId(command.messageId());
@ -344,6 +345,10 @@ public class MQTTProtocolConverter {
} }
// check retained messages // check retained messages
for (int i = 0; i < topics.length; i++) { for (int i = 0; i < topics.length; i++) {
if (qos[i] == SUBSCRIBE_ERROR) {
// skip this topic if subscribe failed
continue;
}
final Topic topic = topics[i]; final Topic topic = topics[i];
ActiveMQTopic destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString())); ActiveMQTopic destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString()));
for (PUBLISH msg : retainedMessages.getMessages(destination)) { for (PUBLISH msg : retainedMessages.getMessages(destination)) {
@ -374,7 +379,7 @@ public class MQTTProtocolConverter {
} }
QoS onSubscribe(Topic topic) throws MQTTProtocolException { byte onSubscribe(final Topic topic) throws MQTTProtocolException {
if( mqttSubscriptionByTopic.containsKey(topic.name())) { if( mqttSubscriptionByTopic.containsKey(topic.name())) {
if (topic.qos() != mqttSubscriptionByTopic.get(topic.name()).qos()) { if (topic.qos() != mqttSubscriptionByTopic.get(topic.name()).qos()) {
@ -382,7 +387,7 @@ public class MQTTProtocolConverter {
onUnSubscribe(topic.name()); onUnSubscribe(topic.name());
} else { } else {
// duplicate SUBSCRIBE packet, nothing to do // duplicate SUBSCRIBE packet, nothing to do
return topic.qos(); return (byte) topic.qos().ordinal();
} }
} }
@ -398,11 +403,27 @@ public class MQTTProtocolConverter {
} }
MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo); MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo);
final byte[] qos = {-1};
sendToActiveMQ(consumerInfo, new ResponseHandler() {
@Override
public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
// validate subscription request
if (response.isException()) {
final Throwable throwable = ((ExceptionResponse) response).getException();
LOG.debug("Error subscribing to " + topic.name(), throwable);
qos[0] = SUBSCRIBE_ERROR;
} else {
qos[0] = (byte) topic.qos().ordinal();
}
}
});
if (qos[0] != SUBSCRIBE_ERROR) {
subscriptionsByConsumerId.put(id, mqttSubscription); subscriptionsByConsumerId.put(id, mqttSubscription);
mqttSubscriptionByTopic.put(topic.name(), mqttSubscription); mqttSubscriptionByTopic.put(topic.name(), mqttSubscription);
}
sendToActiveMQ(consumerInfo, null); return qos[0];
return topic.qos();
} }
void onUnSubscribe(UNSUBSCRIBE command) throws MQTTProtocolException { void onUnSubscribe(UNSUBSCRIBE command) throws MQTTProtocolException {

View File

@ -34,8 +34,15 @@ import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotEquals;
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.jaas.GroupPrincipal;
import org.apache.activemq.security.AuthorizationPlugin;
import org.apache.activemq.security.DefaultAuthorizationMap;
import org.apache.activemq.security.SimpleAuthenticationPlugin;
import org.apache.activemq.security.SimpleAuthorizationMap;
import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
import org.fusesource.hawtbuf.Buffer; import org.fusesource.hawtbuf.Buffer;
@ -515,6 +522,55 @@ public class MQTTTest extends AbstractMQTTTest {
} }
@Test(timeout = 60 * 1000)
public void testFailedSubscription() throws Exception {
addMQTTConnector();
final SimpleAuthenticationPlugin authenticationPlugin = new SimpleAuthenticationPlugin();
authenticationPlugin.setAnonymousAccessAllowed(true);
final String ANONYMOUS = "anonymous";
authenticationPlugin.setAnonymousGroup(ANONYMOUS);
final DefaultAuthorizationMap map = new DefaultAuthorizationMap();
// only one authorized destination, anonymous for anonymous group!
map.put(new ActiveMQTopic(ANONYMOUS), new GroupPrincipal(ANONYMOUS));
final AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin(new SimpleAuthorizationMap(map, map, map));
brokerService.setPlugins(new BrokerPlugin[] {authorizationPlugin, authenticationPlugin});
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setClientId("foo");
mqtt.setKeepAlive((short) 2);
final BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
final String NAMED = "named";
byte[] qos = connection.subscribe(new Topic[] {
new Topic(NAMED, QoS.AT_MOST_ONCE), new Topic(ANONYMOUS, QoS.EXACTLY_ONCE)});
assertEquals((byte)0x80, qos[0]);
assertEquals((byte)QoS.EXACTLY_ONCE.ordinal(), qos[1]);
// validate the subscription by sending a retained message
connection.publish(ANONYMOUS, ANONYMOUS.getBytes(), QoS.AT_MOST_ONCE, true);
Message msg = connection.receive(1000, TimeUnit.MILLISECONDS);
assertNotNull(msg);
assertEquals(ANONYMOUS, new String(msg.getPayload()));
msg.ack();
connection.unsubscribe(new String[]{ANONYMOUS});
qos = connection.subscribe(new Topic[]{new Topic(ANONYMOUS, QoS.AT_LEAST_ONCE)});
assertEquals((byte)QoS.AT_LEAST_ONCE.ordinal(), qos[0]);
msg = connection.receive(1000, TimeUnit.MILLISECONDS);
assertNotNull(msg);
assertEquals(ANONYMOUS, new String(msg.getPayload()));
msg.ack();
connection.disconnect();
}
@Test(timeout = 60 * 1000) @Test(timeout = 60 * 1000)
public void testUniqueMessageIds() throws Exception { public void testUniqueMessageIds() throws Exception {
addMQTTConnector(); addMQTTConnector();
@ -612,7 +668,7 @@ public class MQTTTest extends AbstractMQTTTest {
connection.disconnect(); connection.disconnect();
} }
@Test(timeout = 600 * 1000) @Test(timeout = 60 * 1000)
public void testResendMessageId() throws Exception { public void testResendMessageId() throws Exception {
addMQTTConnector(); addMQTTConnector();
brokerService.start(); brokerService.start();