git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1400304 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-10-19 21:41:39 +00:00
parent 652b47f943
commit 2ad13d682a
3 changed files with 71 additions and 82 deletions

View File

@ -25,7 +25,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantLock;
import org.apache.activemq.command.KeepAliveInfo; import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.thread.SchedulerTimerTask; import org.apache.activemq.thread.SchedulerTimerTask;
@ -42,22 +42,19 @@ public class MQTTInactivityMonitor extends TransportFilter {
private static final Logger LOG = LoggerFactory.getLogger(MQTTInactivityMonitor.class); private static final Logger LOG = LoggerFactory.getLogger(MQTTInactivityMonitor.class);
private static final long DEFAULT_CHECK_TIME_MILLS = 30000;
private static ThreadPoolExecutor ASYNC_TASKS; private static ThreadPoolExecutor ASYNC_TASKS;
private static int CHECKER_COUNTER; private static int CHECKER_COUNTER;
private static long DEFAULT_CHECK_TIME_MILLS = 30000;
private static Timer READ_CHECK_TIMER; private static Timer READ_CHECK_TIMER;
private final AtomicBoolean monitorStarted = new AtomicBoolean(false); private final AtomicBoolean monitorStarted = new AtomicBoolean(false);
private final AtomicBoolean commandSent = new AtomicBoolean(false);
private final AtomicBoolean inSend = new AtomicBoolean(false);
private final AtomicBoolean failed = new AtomicBoolean(false); private final AtomicBoolean failed = new AtomicBoolean(false);
private final AtomicBoolean commandReceived = new AtomicBoolean(true); private final AtomicBoolean commandReceived = new AtomicBoolean(true);
private final AtomicBoolean inReceive = new AtomicBoolean(false); private final AtomicBoolean inReceive = new AtomicBoolean(false);
private final AtomicInteger lastReceiveCounter = new AtomicInteger(0); private final AtomicInteger lastReceiveCounter = new AtomicInteger(0);
private final ReentrantReadWriteLock sendLock = new ReentrantReadWriteLock(); private final ReentrantLock sendLock = new ReentrantLock();
private SchedulerTimerTask readCheckerTask; private SchedulerTimerTask readCheckerTask;
private long readCheckTime = DEFAULT_CHECK_TIME_MILLS; private long readCheckTime = DEFAULT_CHECK_TIME_MILLS;
@ -65,7 +62,6 @@ public class MQTTInactivityMonitor extends TransportFilter {
private boolean keepAliveResponseRequired; private boolean keepAliveResponseRequired;
private MQTTProtocolConverter protocolConverter; private MQTTProtocolConverter protocolConverter;
private final Runnable readChecker = new Runnable() { private final Runnable readChecker = new Runnable() {
long lastRunTime; long lastRunTime;
@ -96,7 +92,6 @@ public class MQTTInactivityMonitor extends TransportFilter {
return elapsed > (readCheckTime * 9 / 10); return elapsed > (readCheckTime * 9 / 10);
} }
public MQTTInactivityMonitor(Transport next, WireFormat wireFormat) { public MQTTInactivityMonitor(Transport next, WireFormat wireFormat) {
super(next); super(next);
} }
@ -111,7 +106,6 @@ public class MQTTInactivityMonitor extends TransportFilter {
next.stop(); next.stop();
} }
final void readCheck() { final void readCheck() {
int currentCounter = next.getReceiveCounter(); int currentCounter = next.getReceiveCounter();
int previousCounter = lastReceiveCounter.getAndSet(currentCounter); int previousCounter = lastReceiveCounter.getAndSet(currentCounter);
@ -132,8 +126,6 @@ public class MQTTInactivityMonitor extends TransportFilter {
} }
onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: " + next.getRemoteAddress())); onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: " + next.getRemoteAddress()));
} }
;
}); });
} else { } else {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
@ -143,7 +135,6 @@ public class MQTTInactivityMonitor extends TransportFilter {
commandReceived.set(false); commandReceived.set(false);
} }
public void onCommand(Object command) { public void onCommand(Object command) {
commandReceived.set(true); commandReceived.set(true);
inReceive.set(true); inReceive.set(true);
@ -151,14 +142,14 @@ public class MQTTInactivityMonitor extends TransportFilter {
if (command.getClass() == KeepAliveInfo.class) { if (command.getClass() == KeepAliveInfo.class) {
KeepAliveInfo info = (KeepAliveInfo) command; KeepAliveInfo info = (KeepAliveInfo) command;
if (info.isResponseRequired()) { if (info.isResponseRequired()) {
sendLock.readLock().lock(); sendLock.lock();
try { try {
info.setResponseRequired(false); info.setResponseRequired(false);
oneway(info); oneway(info);
} catch (IOException e) { } catch (IOException e) {
onException(e); onException(e);
} finally { } finally {
sendLock.readLock().unlock(); sendLock.unlock();
} }
} }
} else { } else {
@ -171,17 +162,12 @@ public class MQTTInactivityMonitor extends TransportFilter {
public void oneway(Object o) throws IOException { public void oneway(Object o) throws IOException {
// To prevent the inactivity monitor from sending a message while we // To prevent the inactivity monitor from sending a message while we
// are performing a send we take a read lock. The inactivity monitor // are performing a send we take the lock.
// sends its Heart-beat commands under a write lock. This means that this.sendLock.lock();
// the MutexTransport is still responsible for synchronizing sends
this.sendLock.readLock().lock();
inSend.set(true);
try { try {
doOnewaySend(o); doOnewaySend(o);
} finally { } finally {
commandSent.set(true); this.sendLock.unlock();
inSend.set(false);
this.sendLock.readLock().unlock();
} }
} }
@ -200,7 +186,6 @@ public class MQTTInactivityMonitor extends TransportFilter {
} }
} }
public long getReadCheckTime() { public long getReadCheckTime() {
return readCheckTime; return readCheckTime;
} }
@ -209,7 +194,6 @@ public class MQTTInactivityMonitor extends TransportFilter {
this.readCheckTime = readCheckTime; this.readCheckTime = readCheckTime;
} }
public long getInitialDelayTime() { public long getInitialDelayTime() {
return initialDelayTime; return initialDelayTime;
} }
@ -239,16 +223,20 @@ public class MQTTInactivityMonitor extends TransportFilter {
} }
synchronized void startMonitorThread() { synchronized void startMonitorThread() {
// Not yet configured if this isn't set yet.
if (protocolConverter == null) {
return;
}
if (monitorStarted.get()) { if (monitorStarted.get()) {
return; return;
} }
if (readCheckTime > 0) { if (readCheckTime > 0) {
readCheckerTask = new SchedulerTimerTask(readChecker); readCheckerTask = new SchedulerTimerTask(readChecker);
} }
if (readCheckTime > 0) { if (readCheckTime > 0) {
monitorStarted.set(true); monitorStarted.set(true);
synchronized (AbstractInactivityMonitor.class) { synchronized (AbstractInactivityMonitor.class) {
@ -264,7 +252,6 @@ public class MQTTInactivityMonitor extends TransportFilter {
} }
} }
synchronized void stopMonitorThread() { synchronized void stopMonitorThread() {
if (monitorStarted.compareAndSet(true, false)) { if (monitorStarted.compareAndSet(true, false)) {
if (readCheckerTask != null) { if (readCheckerTask != null) {
@ -293,9 +280,8 @@ public class MQTTInactivityMonitor extends TransportFilter {
}; };
private ThreadPoolExecutor createExecutor() { private ThreadPoolExecutor createExecutor() {
ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory); ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
exec.allowCoreThreadTimeOut(true); exec.allowCoreThreadTimeOut(true);
return exec; return exec;
} }
} }

View File

@ -26,8 +26,30 @@ import java.util.zip.Inflater;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Message; import javax.jms.Message;
import org.apache.activemq.broker.BrokerContext; import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.command.*; import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionError;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.util.ByteArrayOutputStream; import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IOExceptionSupport;
@ -38,7 +60,21 @@ import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.UTF8Buffer; import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.mqtt.client.QoS; import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic; import org.fusesource.mqtt.client.Topic;
import org.fusesource.mqtt.codec.*; import org.fusesource.mqtt.codec.CONNACK;
import org.fusesource.mqtt.codec.CONNECT;
import org.fusesource.mqtt.codec.DISCONNECT;
import org.fusesource.mqtt.codec.MQTTFrame;
import org.fusesource.mqtt.codec.PINGREQ;
import org.fusesource.mqtt.codec.PINGRESP;
import org.fusesource.mqtt.codec.PUBACK;
import org.fusesource.mqtt.codec.PUBCOMP;
import org.fusesource.mqtt.codec.PUBLISH;
import org.fusesource.mqtt.codec.PUBREC;
import org.fusesource.mqtt.codec.PUBREL;
import org.fusesource.mqtt.codec.SUBACK;
import org.fusesource.mqtt.codec.SUBSCRIBE;
import org.fusesource.mqtt.codec.UNSUBACK;
import org.fusesource.mqtt.codec.UNSUBSCRIBE;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -49,7 +85,6 @@ class MQTTProtocolConverter {
private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator(); private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode(); private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode();
private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId()); private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
private final SessionId sessionId = new SessionId(connectionId, -1); private final SessionId sessionId = new SessionId(connectionId, -1);
private final ProducerId producerId = new ProducerId(sessionId, 1); private final ProducerId producerId = new ProducerId(sessionId, 1);
@ -83,7 +118,6 @@ class MQTTProtocolConverter {
} }
} }
void sendToActiveMQ(Command command, ResponseHandler handler) { void sendToActiveMQ(Command command, ResponseHandler handler) {
command.setCommandId(generateCommandId()); command.setCommandId(generateCommandId());
if (handler != null) { if (handler != null) {
@ -101,13 +135,11 @@ class MQTTProtocolConverter {
} }
} }
/** /**
* Convert a MQTT command * Convert a MQTT command
*/ */
public void onMQTTCommand(MQTTFrame frame) throws IOException, JMSException { public void onMQTTCommand(MQTTFrame frame) throws IOException, JMSException {
switch (frame.messageType()) { switch (frame.messageType()) {
case PINGREQ.TYPE: { case PINGREQ.TYPE: {
mqttTransport.sendToMQTT(PING_RESP_FRAME); mqttTransport.sendToMQTT(PING_RESP_FRAME);
@ -152,13 +184,12 @@ class MQTTProtocolConverter {
onMQTTPubComp(new PUBCOMP().decode(frame)); onMQTTPubComp(new PUBCOMP().decode(frame));
break; break;
} }
default: default: {
handleException(new MQTTProtocolException("Unknown MQTTFrame type: " + frame.messageType(), true), frame); handleException(new MQTTProtocolException("Unknown MQTTFrame type: " + frame.messageType(), true), frame);
}
} }
} }
void onMQTTConnect(final CONNECT connect) throws MQTTProtocolException { void onMQTTConnect(final CONNECT connect) throws MQTTProtocolException {
if (connected.get()) { if (connected.get()) {
@ -182,7 +213,6 @@ class MQTTProtocolConverter {
configureInactivityMonitor(connect.keepAlive()); configureInactivityMonitor(connect.keepAlive());
connectionInfo.setConnectionId(connectionId); connectionInfo.setConnectionId(connectionId);
if (clientId != null && !clientId.isEmpty()) { if (clientId != null && !clientId.isEmpty()) {
connectionInfo.setClientId(clientId); connectionInfo.setClientId(clientId);
@ -232,14 +262,12 @@ class MQTTProtocolConverter {
} }
}); });
} }
}); });
} }
void onSubscribe(SUBSCRIBE command) throws MQTTProtocolException { void onSubscribe(SUBSCRIBE command) throws MQTTProtocolException {
checkConnected(); checkConnected();
SUBACK result = new SUBACK();
Topic[] topics = command.topics(); Topic[] topics = command.topics();
if (topics != null) { if (topics != null) {
byte[] qos = new byte[topics.length]; byte[] qos = new byte[topics.length];
@ -261,9 +289,6 @@ class MQTTProtocolConverter {
QoS onSubscribe(SUBSCRIBE command, Topic topic) throws MQTTProtocolException { QoS onSubscribe(SUBSCRIBE command, Topic topic) throws MQTTProtocolException {
ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString())); ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString()));
if (destination == null) {
throw new MQTTProtocolException("Invalid Destination.");
}
ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
ConsumerInfo consumerInfo = new ConsumerInfo(id); ConsumerInfo consumerInfo = new ConsumerInfo(id);
@ -277,7 +302,6 @@ class MQTTProtocolConverter {
MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo); MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo);
subscriptionsByConsumerId.put(id, mqttSubscription); subscriptionsByConsumerId.put(id, mqttSubscription);
mqttSubscriptionByTopic.put(topic.name(), mqttSubscription); mqttSubscriptionByTopic.put(topic.name(), mqttSubscription);
@ -295,7 +319,6 @@ class MQTTProtocolConverter {
UNSUBACK ack = new UNSUBACK(); UNSUBACK ack = new UNSUBACK();
ack.messageId(command.messageId()); ack.messageId(command.messageId());
sendToMQTT(ack.encode()); sendToMQTT(ack.encode());
} }
void onUnSubscribe(UTF8Buffer topicName) { void onUnSubscribe(UTF8Buffer topicName) {
@ -310,12 +333,9 @@ class MQTTProtocolConverter {
} }
} }
/** /**
* Dispatch a ActiveMQ command * Dispatch a ActiveMQ command
*/ */
public void onActiveMQCommand(Command command) throws Exception { public void onActiveMQCommand(Command command) throws Exception {
if (command.isResponse()) { if (command.isResponse()) {
Response response = (Response) command; Response response = (Response) command;
@ -406,7 +426,6 @@ class MQTTProtocolConverter {
} }
} }
ActiveMQMessage convertMessage(PUBLISH command) throws JMSException { ActiveMQMessage convertMessage(PUBLISH command) throws JMSException {
ActiveMQBytesMessage msg = new ActiveMQBytesMessage(); ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
@ -456,43 +475,37 @@ class MQTTProtocolConverter {
} }
result.topicName(topicName); result.topicName(topicName);
if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) { if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy(); ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy();
msg.setReadOnlyBody(true); msg.setReadOnlyBody(true);
String messageText = msg.getText(); String messageText = msg.getText();
if (messageText != null) { if (messageText != null) {
result.payload(new Buffer(messageText.getBytes("UTF-8"))); result.payload(new Buffer(messageText.getBytes("UTF-8")));
} }
} else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) { } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) {
ActiveMQBytesMessage msg = (ActiveMQBytesMessage) message.copy(); ActiveMQBytesMessage msg = (ActiveMQBytesMessage) message.copy();
msg.setReadOnlyBody(true); msg.setReadOnlyBody(true);
byte[] data = new byte[(int) msg.getBodyLength()]; byte[] data = new byte[(int) msg.getBodyLength()];
msg.readBytes(data); msg.readBytes(data);
result.payload(new Buffer(data)); result.payload(new Buffer(data));
} else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE){ } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {
ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy(); ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
msg.setReadOnlyBody(true); msg.setReadOnlyBody(true);
Map map = msg.getContentMap(); Map<String, Object> map = msg.getContentMap();
if (map != null){ if (map != null) {
result.payload(new Buffer(map.toString().getBytes("UTF-8"))); result.payload(new Buffer(map.toString().getBytes("UTF-8")));
} }
} } else {
else {
ByteSequence byteSequence = message.getContent(); ByteSequence byteSequence = message.getContent();
if (byteSequence != null && byteSequence.getLength() > 0) { if (byteSequence != null && byteSequence.getLength() > 0) {
if (message.isCompressed()){ if (message.isCompressed()) {
Inflater inflater = new Inflater(); Inflater inflater = new Inflater();
inflater.setInput(byteSequence.data,byteSequence.offset,byteSequence.length); inflater.setInput(byteSequence.data, byteSequence.offset, byteSequence.length);
byte[] data = new byte[4096]; byte[] data = new byte[4096];
int read; int read;
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
while((read = inflater.inflate(data)) != 0){ while ((read = inflater.inflate(data)) != 0) {
bytesOut.write(data,0,read); bytesOut.write(data, 0, read);
} }
byteSequence = bytesOut.toByteSequence(); byteSequence = bytesOut.toByteSequence();
} }
@ -502,7 +515,6 @@ class MQTTProtocolConverter {
return result; return result;
} }
public MQTTTransport getMQTTTransport() { public MQTTTransport getMQTTTransport() {
return mqttTransport; return mqttTransport;
} }
@ -522,22 +534,18 @@ class MQTTProtocolConverter {
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Failed to publish Will Message " + connect.willMessage()); LOG.warn("Failed to publish Will Message " + connect.willMessage());
} }
} }
} }
} }
void configureInactivityMonitor(short heartBeat) { void configureInactivityMonitor(short heartBeat) {
try { try {
int heartBeatMS = heartBeat * 1000; int heartBeatMS = heartBeat * 1000;
MQTTInactivityMonitor monitor = getMQTTTransport().getInactivityMonitor(); MQTTInactivityMonitor monitor = getMQTTTransport().getInactivityMonitor();
monitor.setProtocolConverter(this); monitor.setProtocolConverter(this);
monitor.setReadCheckTime(heartBeatMS); monitor.setReadCheckTime(heartBeatMS);
monitor.setInitialDelayTime(heartBeatMS); monitor.setInitialDelayTime(heartBeatMS);
monitor.startMonitorThread(); monitor.startMonitorThread();
} catch (Exception ex) { } catch (Exception ex) {
LOG.warn("Failed to start MQTT InactivityMonitor ", ex); LOG.warn("Failed to start MQTT InactivityMonitor ", ex);
} }
@ -545,7 +553,6 @@ class MQTTProtocolConverter {
LOG.debug(getClientId() + " MQTT Connection using heart beat of " + heartBeat + " secs"); LOG.debug(getClientId() + " MQTT Connection using heart beat of " + heartBeat + " secs");
} }
void handleException(Throwable exception, MQTTFrame command) { void handleException(Throwable exception, MQTTFrame command) {
LOG.warn("Exception occurred processing: \n" + command + ": " + exception.toString()); LOG.warn("Exception occurred processing: \n" + command + ": " + exception.toString());
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {

View File

@ -35,10 +35,8 @@ import org.fusesource.mqtt.codec.MQTTFrame;
*/ */
public class MQTTWireFormat implements WireFormat { public class MQTTWireFormat implements WireFormat {
static final int MAX_MESSAGE_LENGTH = 1024 * 1024 * 256; static final int MAX_MESSAGE_LENGTH = 1024 * 1024 * 256;
private boolean encodingEnabled = false;
private int version = 1; private int version = 1;
public ByteSequence marshal(Object command) throws IOException { public ByteSequence marshal(Object command) throws IOException {
@ -119,6 +117,4 @@ public class MQTTWireFormat implements WireFormat {
public int getVersion() { public int getVersion() {
return this.version; return this.version;
} }
} }