From 2ad13d682a38dfebafb5a0c0801961bf593d8f63 Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Fri, 19 Oct 2012 21:41:39 +0000 Subject: [PATCH] fix for: https://issues.apache.org/jira/browse/AMQ-4117 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1400304 13f79535-47bb-0310-9956-ffa450edef68 --- .../transport/mqtt/MQTTInactivityMonitor.java | 46 +++----- .../transport/mqtt/MQTTProtocolConverter.java | 103 ++++++++++-------- .../transport/mqtt/MQTTWireFormat.java | 4 - 3 files changed, 71 insertions(+), 82 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java index 7a1e48f565..350c2ac77c 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java @@ -25,7 +25,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.activemq.command.KeepAliveInfo; import org.apache.activemq.thread.SchedulerTimerTask; @@ -42,22 +42,19 @@ public class MQTTInactivityMonitor extends TransportFilter { private static final Logger LOG = LoggerFactory.getLogger(MQTTInactivityMonitor.class); + private static final long DEFAULT_CHECK_TIME_MILLS = 30000; + private static ThreadPoolExecutor ASYNC_TASKS; private static int CHECKER_COUNTER; - private static long DEFAULT_CHECK_TIME_MILLS = 30000; private static Timer READ_CHECK_TIMER; private final AtomicBoolean monitorStarted = new AtomicBoolean(false); - - private final AtomicBoolean commandSent = new AtomicBoolean(false); - private final AtomicBoolean inSend = new AtomicBoolean(false); private final AtomicBoolean failed = new AtomicBoolean(false); - private final AtomicBoolean commandReceived = new AtomicBoolean(true); private final AtomicBoolean inReceive = new AtomicBoolean(false); private final AtomicInteger lastReceiveCounter = new AtomicInteger(0); - private final ReentrantReadWriteLock sendLock = new ReentrantReadWriteLock(); + private final ReentrantLock sendLock = new ReentrantLock(); private SchedulerTimerTask readCheckerTask; private long readCheckTime = DEFAULT_CHECK_TIME_MILLS; @@ -65,7 +62,6 @@ public class MQTTInactivityMonitor extends TransportFilter { private boolean keepAliveResponseRequired; private MQTTProtocolConverter protocolConverter; - private final Runnable readChecker = new Runnable() { long lastRunTime; @@ -96,7 +92,6 @@ public class MQTTInactivityMonitor extends TransportFilter { return elapsed > (readCheckTime * 9 / 10); } - public MQTTInactivityMonitor(Transport next, WireFormat wireFormat) { super(next); } @@ -111,7 +106,6 @@ public class MQTTInactivityMonitor extends TransportFilter { next.stop(); } - final void readCheck() { int currentCounter = next.getReceiveCounter(); int previousCounter = lastReceiveCounter.getAndSet(currentCounter); @@ -132,8 +126,6 @@ public class MQTTInactivityMonitor extends TransportFilter { } onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: " + next.getRemoteAddress())); } - - ; }); } else { if (LOG.isTraceEnabled()) { @@ -143,7 +135,6 @@ public class MQTTInactivityMonitor extends TransportFilter { commandReceived.set(false); } - public void onCommand(Object command) { commandReceived.set(true); inReceive.set(true); @@ -151,14 +142,14 @@ public class MQTTInactivityMonitor extends TransportFilter { if (command.getClass() == KeepAliveInfo.class) { KeepAliveInfo info = (KeepAliveInfo) command; if (info.isResponseRequired()) { - sendLock.readLock().lock(); + sendLock.lock(); try { info.setResponseRequired(false); oneway(info); } catch (IOException e) { onException(e); } finally { - sendLock.readLock().unlock(); + sendLock.unlock(); } } } else { @@ -171,17 +162,12 @@ public class MQTTInactivityMonitor extends TransportFilter { public void oneway(Object o) throws IOException { // To prevent the inactivity monitor from sending a message while we - // are performing a send we take a read lock. The inactivity monitor - // sends its Heart-beat commands under a write lock. This means that - // the MutexTransport is still responsible for synchronizing sends - this.sendLock.readLock().lock(); - inSend.set(true); + // are performing a send we take the lock. + this.sendLock.lock(); try { doOnewaySend(o); } finally { - commandSent.set(true); - inSend.set(false); - this.sendLock.readLock().unlock(); + this.sendLock.unlock(); } } @@ -200,7 +186,6 @@ public class MQTTInactivityMonitor extends TransportFilter { } } - public long getReadCheckTime() { return readCheckTime; } @@ -209,7 +194,6 @@ public class MQTTInactivityMonitor extends TransportFilter { this.readCheckTime = readCheckTime; } - public long getInitialDelayTime() { return initialDelayTime; } @@ -239,16 +223,20 @@ public class MQTTInactivityMonitor extends TransportFilter { } synchronized void startMonitorThread() { + + // Not yet configured if this isn't set yet. + if (protocolConverter == null) { + return; + } + if (monitorStarted.get()) { return; } - if (readCheckTime > 0) { readCheckerTask = new SchedulerTimerTask(readChecker); } - if (readCheckTime > 0) { monitorStarted.set(true); synchronized (AbstractInactivityMonitor.class) { @@ -264,7 +252,6 @@ public class MQTTInactivityMonitor extends TransportFilter { } } - synchronized void stopMonitorThread() { if (monitorStarted.compareAndSet(true, false)) { if (readCheckerTask != null) { @@ -293,9 +280,8 @@ public class MQTTInactivityMonitor extends TransportFilter { }; private ThreadPoolExecutor createExecutor() { - ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue(), factory); + ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue(), factory); exec.allowCoreThreadTimeOut(true); return exec; } } - diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index aace8738d1..293add7eac 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -26,8 +26,30 @@ import java.util.zip.Inflater; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; + import org.apache.activemq.broker.BrokerContext; -import org.apache.activemq.command.*; +import org.apache.activemq.command.ActiveMQBytesMessage; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMapMessage; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.Command; +import org.apache.activemq.command.ConnectionError; +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.ExceptionResponse; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatch; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.ProducerId; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.RemoveInfo; +import org.apache.activemq.command.Response; +import org.apache.activemq.command.SessionId; +import org.apache.activemq.command.SessionInfo; import org.apache.activemq.util.ByteArrayOutputStream; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.IOExceptionSupport; @@ -38,7 +60,21 @@ import org.fusesource.hawtbuf.Buffer; import org.fusesource.hawtbuf.UTF8Buffer; import org.fusesource.mqtt.client.QoS; import org.fusesource.mqtt.client.Topic; -import org.fusesource.mqtt.codec.*; +import org.fusesource.mqtt.codec.CONNACK; +import org.fusesource.mqtt.codec.CONNECT; +import org.fusesource.mqtt.codec.DISCONNECT; +import org.fusesource.mqtt.codec.MQTTFrame; +import org.fusesource.mqtt.codec.PINGREQ; +import org.fusesource.mqtt.codec.PINGRESP; +import org.fusesource.mqtt.codec.PUBACK; +import org.fusesource.mqtt.codec.PUBCOMP; +import org.fusesource.mqtt.codec.PUBLISH; +import org.fusesource.mqtt.codec.PUBREC; +import org.fusesource.mqtt.codec.PUBREL; +import org.fusesource.mqtt.codec.SUBACK; +import org.fusesource.mqtt.codec.SUBSCRIBE; +import org.fusesource.mqtt.codec.UNSUBACK; +import org.fusesource.mqtt.codec.UNSUBSCRIBE; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +85,6 @@ class MQTTProtocolConverter { private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator(); private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode(); - private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId()); private final SessionId sessionId = new SessionId(connectionId, -1); private final ProducerId producerId = new ProducerId(sessionId, 1); @@ -83,7 +118,6 @@ class MQTTProtocolConverter { } } - void sendToActiveMQ(Command command, ResponseHandler handler) { command.setCommandId(generateCommandId()); if (handler != null) { @@ -101,13 +135,11 @@ class MQTTProtocolConverter { } } - /** * Convert a MQTT command */ public void onMQTTCommand(MQTTFrame frame) throws IOException, JMSException { - switch (frame.messageType()) { case PINGREQ.TYPE: { mqttTransport.sendToMQTT(PING_RESP_FRAME); @@ -152,13 +184,12 @@ class MQTTProtocolConverter { onMQTTPubComp(new PUBCOMP().decode(frame)); break; } - default: + default: { handleException(new MQTTProtocolException("Unknown MQTTFrame type: " + frame.messageType(), true), frame); + } } - } - void onMQTTConnect(final CONNECT connect) throws MQTTProtocolException { if (connected.get()) { @@ -182,7 +213,6 @@ class MQTTProtocolConverter { configureInactivityMonitor(connect.keepAlive()); - connectionInfo.setConnectionId(connectionId); if (clientId != null && !clientId.isEmpty()) { connectionInfo.setClientId(clientId); @@ -232,14 +262,12 @@ class MQTTProtocolConverter { } }); - } }); } void onSubscribe(SUBSCRIBE command) throws MQTTProtocolException { checkConnected(); - SUBACK result = new SUBACK(); Topic[] topics = command.topics(); if (topics != null) { byte[] qos = new byte[topics.length]; @@ -261,9 +289,6 @@ class MQTTProtocolConverter { QoS onSubscribe(SUBSCRIBE command, Topic topic) throws MQTTProtocolException { ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString())); - if (destination == null) { - throw new MQTTProtocolException("Invalid Destination."); - } ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); ConsumerInfo consumerInfo = new ConsumerInfo(id); @@ -277,7 +302,6 @@ class MQTTProtocolConverter { MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo); - subscriptionsByConsumerId.put(id, mqttSubscription); mqttSubscriptionByTopic.put(topic.name(), mqttSubscription); @@ -295,7 +319,6 @@ class MQTTProtocolConverter { UNSUBACK ack = new UNSUBACK(); ack.messageId(command.messageId()); sendToMQTT(ack.encode()); - } void onUnSubscribe(UTF8Buffer topicName) { @@ -310,12 +333,9 @@ class MQTTProtocolConverter { } } - /** * Dispatch a ActiveMQ command */ - - public void onActiveMQCommand(Command command) throws Exception { if (command.isResponse()) { Response response = (Response) command; @@ -406,7 +426,6 @@ class MQTTProtocolConverter { } } - ActiveMQMessage convertMessage(PUBLISH command) throws JMSException { ActiveMQBytesMessage msg = new ActiveMQBytesMessage(); @@ -456,43 +475,37 @@ class MQTTProtocolConverter { } result.topicName(topicName); - if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) { - ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy(); - msg.setReadOnlyBody(true); - String messageText = msg.getText(); - if (messageText != null) { - result.payload(new Buffer(messageText.getBytes("UTF-8"))); - } - - + ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy(); + msg.setReadOnlyBody(true); + String messageText = msg.getText(); + if (messageText != null) { + result.payload(new Buffer(messageText.getBytes("UTF-8"))); + } } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) { - ActiveMQBytesMessage msg = (ActiveMQBytesMessage) message.copy(); msg.setReadOnlyBody(true); byte[] data = new byte[(int) msg.getBodyLength()]; msg.readBytes(data); result.payload(new Buffer(data)); - } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE){ + } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) { ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy(); msg.setReadOnlyBody(true); - Map map = msg.getContentMap(); - if (map != null){ + Map map = msg.getContentMap(); + if (map != null) { result.payload(new Buffer(map.toString().getBytes("UTF-8"))); } - } - - else { + } else { ByteSequence byteSequence = message.getContent(); if (byteSequence != null && byteSequence.getLength() > 0) { - if (message.isCompressed()){ + if (message.isCompressed()) { Inflater inflater = new Inflater(); - inflater.setInput(byteSequence.data,byteSequence.offset,byteSequence.length); - byte[] data = new byte[4096]; + inflater.setInput(byteSequence.data, byteSequence.offset, byteSequence.length); + byte[] data = new byte[4096]; int read; ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); - while((read = inflater.inflate(data)) != 0){ - bytesOut.write(data,0,read); + while ((read = inflater.inflate(data)) != 0) { + bytesOut.write(data, 0, read); } byteSequence = bytesOut.toByteSequence(); } @@ -502,7 +515,6 @@ class MQTTProtocolConverter { return result; } - public MQTTTransport getMQTTTransport() { return mqttTransport; } @@ -522,22 +534,18 @@ class MQTTProtocolConverter { } catch (Exception e) { LOG.warn("Failed to publish Will Message " + connect.willMessage()); } - } } } - void configureInactivityMonitor(short heartBeat) { try { - int heartBeatMS = heartBeat * 1000; MQTTInactivityMonitor monitor = getMQTTTransport().getInactivityMonitor(); monitor.setProtocolConverter(this); monitor.setReadCheckTime(heartBeatMS); monitor.setInitialDelayTime(heartBeatMS); monitor.startMonitorThread(); - } catch (Exception ex) { LOG.warn("Failed to start MQTT InactivityMonitor ", ex); } @@ -545,7 +553,6 @@ class MQTTProtocolConverter { LOG.debug(getClientId() + " MQTT Connection using heart beat of " + heartBeat + " secs"); } - void handleException(Throwable exception, MQTTFrame command) { LOG.warn("Exception occurred processing: \n" + command + ": " + exception.toString()); if (LOG.isDebugEnabled()) { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java index 0b3c55b950..cc35020ffe 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java @@ -35,10 +35,8 @@ import org.fusesource.mqtt.codec.MQTTFrame; */ public class MQTTWireFormat implements WireFormat { - static final int MAX_MESSAGE_LENGTH = 1024 * 1024 * 256; - private boolean encodingEnabled = false; private int version = 1; public ByteSequence marshal(Object command) throws IOException { @@ -119,6 +117,4 @@ public class MQTTWireFormat implements WireFormat { public int getVersion() { return this.version; } - - }