Attempts to fix many of the compatibility issues with MQTT highlighted by AMQ-5043.

This commit is contained in:
Hiram Chirino 2014-02-11 17:52:57 -05:00
parent 99d533c060
commit 9735806030
5 changed files with 124 additions and 26 deletions

View File

@ -133,6 +133,15 @@ public class DestinationMapNode implements DestinationNode {
}
}
public void set(String[] paths, int idx, Object value) {
if (idx >= paths.length) {
values.clear();
values.add(value);
} else {
getChildOrCreate(paths[idx]).add(paths, idx + 1, value);
}
}
public void remove(String[] paths, int idx, Object value) {
if (idx >= paths.length) {
values.remove(value);

View File

@ -134,6 +134,12 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-kahadb-store</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>mqtt-client</artifactId>

View File

@ -95,6 +95,7 @@ public class MQTTProtocolConverter {
}
void sendToActiveMQ(Command command, ResponseHandler handler) {
System.out.println(mqttTransport.getInactivityMonitor()+" ==> "+command);
command.setCommandId(generateCommandId());
if (handler != null) {
command.setResponseRequired(true);
@ -308,15 +309,14 @@ public class MQTTProtocolConverter {
//check retained messages
if (topics != null){
for (Topic topic:topics){
Buffer buffer = retainedMessages.getMessage(topic.name().toString());
if (buffer != null){
PUBLISH msg = new PUBLISH();
msg.payload(buffer);
msg.topicName(topic.name());
try {
getMQTTTransport().sendToMQTT(msg.encode());
} catch (IOException e) {
LOG.warn("Couldn't send retained message " + msg, e);
ActiveMQTopic destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString()));
for (PUBLISH msg : retainedMessages.getMessages(destination)) {
if( msg.payload().length > 0 ) {
try {
getMQTTTransport().sendToMQTT(msg.encode());
} catch (IOException e) {
LOG.warn("Couldn't send retained message " + msg, e);
}
}
}
}
@ -333,7 +333,7 @@ public class MQTTProtocolConverter {
consumerInfo.setDestination(destination);
consumerInfo.setPrefetchSize(getActiveMQSubscriptionPrefetch());
consumerInfo.setDispatchAsync(true);
if (!connect.cleanSession() && (connect.clientId() != null)) {
if ( connect.clientId() != null && topic.qos().ordinal() >= QoS.AT_LEAST_ONCE.ordinal() ) {
consumerInfo.setSubscriptionName(topic.qos()+":"+topic.name().toString());
}
MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo);
@ -418,10 +418,10 @@ public class MQTTProtocolConverter {
void onMQTTPublish(PUBLISH command) throws IOException, JMSException {
checkConnected();
if (command.retain()){
retainedMessages.addMessage(command.topicName().toString(),command.payload());
}
ActiveMQMessage message = convertMessage(command);
if (command.retain()){
retainedMessages.addMessage((ActiveMQTopic) message.getDestination(), command);
}
message.setProducerId(producerId);
message.onSend();
sendToActiveMQ(message, createResponseHandler(command));
@ -484,7 +484,7 @@ public class MQTTProtocolConverter {
synchronized (activeMQTopicMap) {
topic = activeMQTopicMap.get(command.topicName());
if (topic == null) {
String topicName = command.topicName().toString().replaceAll("/", ".");
String topicName = convertMQTTToActiveMQ(command.topicName().toString());
topic = new ActiveMQTopic(topicName);
activeMQTopicMap.put(command.topicName(), topic);
}
@ -563,17 +563,21 @@ public class MQTTProtocolConverter {
return mqttTransport;
}
boolean willSent = false;
public void onTransportError() {
if (connect != null) {
if (connected.get() && connect.willTopic() != null && connect.willMessage() != null) {
if (connected.get() && connect.willTopic() != null && connect.willMessage() != null && !willSent) {
willSent = true;
try {
PUBLISH publish = new PUBLISH();
publish.topicName(connect.willTopic());
publish.qos(connect.willQos());
publish.messageId((short) messageIdGenerator.getNextSequenceId());
publish.payload(connect.willMessage());
ActiveMQMessage message = convertMessage(publish);
message.setProducerId(producerId);
message.onSend();
sendToActiveMQ(message, null);
} catch (Exception e) {
LOG.warn("Failed to publish Will Message " + connect.willMessage());
@ -703,10 +707,35 @@ public class MQTTProtocolConverter {
}
private String convertMQTTToActiveMQ(String name) {
String result = name.replace('#', '>');
result = result.replace('+', '*');
result = result.replace('/', '.');
return result;
char[] chars = name.toCharArray();
for (int i = 0; i < chars.length; i++) {
switch(chars[i]) {
case '#':
chars[i] = '>';
break;
case '>':
chars[i] = '#';
break;
case '+':
chars[i] = '*';
break;
case '*':
chars[i] = '+';
break;
case '/':
chars[i] = '.';
break;
case '.':
chars[i] = '/';
break;
}
}
String rc = new String(chars);
return rc;
}
public long getDefaultKeepAlive() {

View File

@ -18,36 +18,51 @@ package org.apache.activemq.transport.mqtt;
import org.apache.activemq.Service;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.LRUCache;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.filter.DestinationMapNode;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.mqtt.codec.PUBLISH;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashSet;
import java.util.Set;
public class MQTTRetainedMessages extends ServiceSupport {
private static final Logger LOG = LoggerFactory.getLogger(MQTTRetainedMessages.class);
private static final Object LOCK = new Object();
private LRUCache<String,Buffer> cache = new LRUCache<String, Buffer>(10000);
DestinationMapNode retainedMessages = new DestinationMapNode(null);
private MQTTRetainedMessages(){
}
@Override
protected void doStop(ServiceStopper stopper) throws Exception {
cache.clear();
synchronized (this) {
retainedMessages = new DestinationMapNode(null);
}
}
@Override
protected void doStart() throws Exception {
}
public void addMessage(String destination,Buffer payload){
cache.put(destination,payload);
public void addMessage(ActiveMQTopic dest, PUBLISH publish){
synchronized (this) {
retainedMessages.set(dest.getDestinationPaths(), 0, publish);
}
}
public Buffer getMessage(String destination){
return cache.get(destination);
public Set<PUBLISH> getMessages(ActiveMQTopic topic){
Set answer = new HashSet();
synchronized (this) {
retainedMessages.appendMatchingValues(answer, topic.getDestinationPaths(), 0);
}
return (Set<PUBLISH>)answer;
}
public static MQTTRetainedMessages getMQTTRetainedMessages(BrokerService broker){

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.kahadb.KahaDBStore;
import java.io.File;
/**
* A little helper class for testing a broker in your IDE.
*/
public class IDERunner {
public static void main(String[]args) throws Exception {
BrokerService bs = new BrokerService();
bs.addConnector("mqtt://0.0.0.0:1883?trace=true");
KahaDBStore store = new KahaDBStore();
store.setDirectory(new File("target/activemq-data/kahadb"));
bs.setPersistenceAdapter(store);
bs.deleteAllMessages();
bs.start();
bs.waitUntilStopped();
}
}