More improvements for AMQ-5043.

This commit is contained in:
Hiram Chirino 2014-02-12 10:59:31 -05:00
parent 084d606d89
commit e2a7d6af5a
3 changed files with 45 additions and 11 deletions

View File

@ -138,7 +138,7 @@ public class DestinationMapNode implements DestinationNode {
values.clear();
values.add(value);
} else {
getChildOrCreate(paths[idx]).add(paths, idx + 1, value);
getChildOrCreate(paths[idx]).set(paths, idx + 1, value);
}
}

View File

@ -95,7 +95,6 @@ public class MQTTProtocolConverter {
}
void sendToActiveMQ(Command command, ResponseHandler handler) {
System.out.println(mqttTransport.getInactivityMonitor()+" ==> "+command);
command.setCommandId(generateCommandId());
if (handler != null) {
command.setResponseRequired(true);
@ -256,10 +255,18 @@ public class MQTTProtocolConverter {
public void deleteDurableSubs(List<SubscriptionInfo> subs) {
try {
for (SubscriptionInfo sub : subs) {
TopicMessageStore store = brokerService.getPersistenceAdapter().createTopicMessageStore((ActiveMQTopic) sub.getDestination());
store.deleteSubscription(connectionInfo.getClientId(), sub.getSubscriptionName());
RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
rsi.setConnectionId(connectionId);
rsi.setSubscriptionName(sub.getSubcriptionName());
rsi.setClientId(sub.getClientId());
sendToActiveMQ(rsi, new ResponseHandler() {
@Override
public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
// ignore failures..
}
});
}
} catch (IOException e) {
} catch (Throwable e) {
LOG.warn("Could not delete the MQTT durable subs.", e);
}
}
@ -477,7 +484,7 @@ public class MQTTProtocolConverter {
msg.setMessageId(id);
msg.setTimestamp(System.currentTimeMillis());
msg.setPriority((byte) Message.DEFAULT_PRIORITY);
msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE);
msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE && !command.retain());
msg.setIntProperty(QOS_PROPERTY_NAME, command.qos().ordinal());
ActiveMQTopic topic;

View File

@ -17,6 +17,7 @@
package org.apache.activemq.transport.mqtt;
import java.io.IOException;
import java.net.ProtocolException;
import java.security.cert.X509Certificate;
import java.util.concurrent.atomic.AtomicBoolean;
@ -31,7 +32,7 @@ import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.tcp.SslTransport;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
import org.fusesource.mqtt.codec.MQTTFrame;
import org.fusesource.mqtt.codec.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -73,11 +74,11 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor
@Override
public void onCommand(Object command) {
try {
MQTTFrame frame = (MQTTFrame) command;
if (trace) {
TRACE.trace("Received: \n" + command);
TRACE.trace("Received: " + toString(frame));
}
protocolConverter.onMQTTCommand((MQTTFrame) command);
protocolConverter.onMQTTCommand(frame);
} catch (IOException e) {
handleException(e);
} catch (JMSException e) {
@ -97,7 +98,7 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor
public void sendToMQTT(MQTTFrame command) throws IOException {
if( !stopped.get() ) {
if (trace) {
TRACE.trace("Sending: \n" + command);
TRACE.trace("Sending : " + toString(command));
}
Transport n = next;
if (n != null) {
@ -106,6 +107,32 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor
}
}
static private String toString(MQTTFrame frame) {
if( frame == null )
return null;
try {
switch (frame.messageType()) {
case PINGREQ.TYPE: return new PINGREQ().decode(frame).toString();
case PINGRESP.TYPE: return new PINGRESP().decode(frame).toString();
case CONNECT.TYPE: return new CONNECT().decode(frame).toString();
case DISCONNECT.TYPE: return new DISCONNECT().decode(frame).toString();
case SUBSCRIBE.TYPE: return new SUBSCRIBE().decode(frame).toString();
case UNSUBSCRIBE.TYPE: return new UNSUBSCRIBE().decode(frame).toString();
case PUBLISH.TYPE: return new PUBLISH().decode(frame).toString();
case PUBACK.TYPE: return new PUBACK().decode(frame).toString();
case PUBREC.TYPE: return new PUBREC().decode(frame).toString();
case PUBREL.TYPE: return new PUBREL().decode(frame).toString();
case PUBCOMP.TYPE: return new PUBCOMP().decode(frame).toString();
case CONNACK.TYPE: return new CONNACK().decode(frame).toString();
case SUBACK.TYPE: return new SUBACK().decode(frame).toString();
default: return frame.toString();
}
} catch (Throwable e) {
e.printStackTrace();
return frame.toString();
}
}
@Override
public void stop() throws Exception {
if( stopped.compareAndSet(false, true) ) {