Patch applied.
This commit is contained in:
Timothy Bish 2014-02-19 14:09:34 -05:00
parent 28c565c266
commit 0db7e69b4e
2 changed files with 68 additions and 10 deletions

View File

@ -23,10 +23,10 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.*;
import org.apache.activemq.store.PersistenceAdapterSupport;
@ -338,23 +338,29 @@ public class MQTTProtocolConverter {
} catch (IOException e) {
LOG.warn("Couldn't send SUBACK for " + command, e);
}
} else {
LOG.warn("No topics defined for Subscription " + command);
}
// check retained messages
if (topics != null){
for (Topic topic:topics){
for (int i = 0; i < topics.length; i++) {
final Topic topic = topics[i];
ActiveMQTopic destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString()));
for (PUBLISH msg : retainedMessages.getMessages(destination)) {
if( msg.payload().length > 0 ) {
try {
getMQTTTransport().sendToMQTT(msg.encode());
PUBLISH retainedCopy = new PUBLISH();
retainedCopy.topicName(msg.topicName());
retainedCopy.retain(msg.retain());
retainedCopy.messageId(msg.messageId());
retainedCopy.payload(msg.payload());
// set QoS of retained message to maximum of subscription QoS
retainedCopy.qos(msg.qos().ordinal() > qos[i] ? QoS.values()[qos[i]] : msg.qos());
getMQTTTransport().sendToMQTT(retainedCopy.encode());
} catch (IOException e) {
LOG.warn("Couldn't send retained message " + msg, e);
}
}
}
}
} else {
LOG.warn("No topics defined for Subscription " + command);
}
}

View File

@ -16,11 +16,11 @@
*/
package org.apache.activemq.transport.mqtt;
import java.net.ProtocolException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
@ -28,6 +28,8 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import static org.junit.Assert.assertArrayEquals;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.TransportConnector;
@ -41,10 +43,10 @@ import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.fusesource.mqtt.client.Tracer;
import org.fusesource.mqtt.codec.MQTTFrame;
import org.fusesource.mqtt.codec.PUBLISH;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertArrayEquals;
public class MQTTTest extends AbstractMQTTTest {
@ -324,6 +326,56 @@ public class MQTTTest extends AbstractMQTTTest {
publisher.disconnect();
}
@Test(timeout = 60 * 1000)
public void testMQTTRetainQoS() throws Exception {
addMQTTConnector();
brokerService.start();
String[] topics = { "AT_MOST_ONCE", "AT_LEAST_ONCE", "EXACTLY_ONCE" };
for (int i = 0; i < topics.length; i++) {
final String topic = topics[i];
MQTT mqtt = createMQTTConnection();
mqtt.setClientId("foo");
mqtt.setKeepAlive((short)2);
final int[] actualQoS = {-1};
mqtt.setTracer(new Tracer() {
@Override
public void onReceive(MQTTFrame frame) {
// validate the QoS
if (frame.messageType() == PUBLISH.TYPE) {
PUBLISH publish = new PUBLISH();
try {
publish.decode(frame);
} catch (ProtocolException e) {
fail ("Failed decoding " + e.getMessage());
}
actualQoS[0] = publish.qos().ordinal();
}
}
});
final BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
connection.publish(topic, topic.getBytes(), QoS.EXACTLY_ONCE, true);
connection.subscribe(new Topic[]{ new Topic(topic, QoS.valueOf(topic)) });
final Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
assertNotNull(msg);
assertEquals(topic, new String(msg.getPayload()));
int waitCount = 0;
while (actualQoS[0] == -1 && waitCount < 10) {
Thread.sleep(1000);
waitCount++;
}
assertEquals(i, actualQoS[0]);
connection.unsubscribe(new String[]{topic});
connection.disconnect();
}
}
@Test(timeout=60 * 1000)
public void testSendMQTTReceiveJMS() throws Exception {