From 4ba4aa21d36f86554812d85a2b0143a6b8790d6d Mon Sep 17 00:00:00 2001 From: Dejan Bosanac Date: Thu, 20 Mar 2014 11:31:44 +0100 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5112 - mqtt transport thread safety --- .../transport/mqtt/MQTTProtocolConverter.java | 4 +-- .../transport/mqtt/MQTTTransportFilter.java | 16 +++++---- .../activemq/transport/mqtt/MQTTNioTest.java | 15 -------- .../activemq/transport/mqtt/MQTTSSLTest.java | 18 +--------- .../activemq/transport/mqtt/MQTTTest.java | 34 ++++++------------- 5 files changed, 22 insertions(+), 65 deletions(-) diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index 614b1339cb..014c6f6cb9 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -121,7 +121,7 @@ public class MQTTProtocolConverter { command.setResponseRequired(true); resposeHandlers.put(command.getCommandId(), handler); } - mqttTransport.sendToActiveMQ(command); + getMQTTTransport().sendToActiveMQ(command); } void sendToMQTT(MQTTFrame frame) { @@ -140,7 +140,7 @@ public class MQTTProtocolConverter { switch (frame.messageType()) { case PINGREQ.TYPE: { LOG.debug("Received a ping from client: " + getClientId()); - mqttTransport.sendToMQTT(PING_RESP_FRAME); + sendToMQTT(PING_RESP_FRAME); LOG.debug("Sent Ping Response to " + getClientId()); break; } diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java index 54f40e7768..181d2357db 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java @@ -17,13 +17,10 @@ package org.apache.activemq.transport.mqtt; import java.io.IOException; -import java.net.ProtocolException; import java.security.cert.X509Certificate; import java.util.concurrent.atomic.AtomicBoolean; - import javax.jms.JMSException; -import org.apache.activemq.broker.BrokerContext; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.Command; import org.apache.activemq.transport.Transport; @@ -51,6 +48,7 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor private final AtomicBoolean stopped = new AtomicBoolean(); private boolean trace; + private final Object sendLock = new Object(); public MQTTTransportFilter(Transport next, WireFormat wireFormat, BrokerService brokerService) { super(next); @@ -80,7 +78,7 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor } protocolConverter.onMQTTCommand(frame); } catch (IOException e) { - handleException(e); + onException(e); } catch (JMSException e) { onException(IOExceptionSupport.create(e)); } @@ -102,7 +100,10 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor } Transport n = next; if (n != null) { - n.oneway(command); + // sync access to underlying transport buffer + synchronized (sendLock) { + n.oneway(command); + } } } } @@ -174,9 +175,10 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor return this.wireFormat; } - public void handleException(IOException e) { + @Override + public void onException(IOException error) { protocolConverter.onTransportError(); - super.onException(e); + super.onException(error); } public long getDefaultKeepAlive() { diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTNioTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTNioTest.java index 2433d2e357..2513ac3928 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTNioTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTNioTest.java @@ -30,7 +30,6 @@ import org.apache.activemq.security.SimpleAuthenticationPlugin; import org.apache.activemq.util.Wait; import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.MQTT; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.BlockJUnit4ClassRunner; @@ -43,20 +42,6 @@ public class MQTTNioTest extends MQTTTest { return "mqtt+nio"; } - @Ignore("See AMQ-4712") - @Override - @Test - public void testReceiveMessageSentWhileOffline() throws Exception { - super.testReceiveMessageSentWhileOffline(); - } - - @Ignore("See AMQ-4712") - @Override - @Test - public void testResendMessageId() throws Exception { - super.testResendMessageId(); - } - @Test public void testPingOnMQTTNIO() throws Exception { addMQTTConnector("maxInactivityDuration=-1"); diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java index 6b44ae2dd4..1eb4ff530b 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java @@ -19,14 +19,12 @@ package org.apache.activemq.transport.mqtt; import java.security.SecureRandom; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; - import javax.net.ssl.KeyManager; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; + import org.fusesource.mqtt.client.MQTT; -import org.junit.Ignore; -import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.BlockJUnit4ClassRunner; import org.slf4j.Logger; @@ -53,20 +51,6 @@ public class MQTTSSLTest extends MQTTTest { return "mqtt+ssl"; } - @Ignore("See AMQ-4712") - @Override - @Test - public void testReceiveMessageSentWhileOffline() throws Exception { - super.testReceiveMessageSentWhileOffline(); - } - - @Ignore("See AMQ-4712") - @Override - @Test - public void testResendMessageId() throws Exception { - super.testResendMessageId(); - } - protected MQTT createMQTTConnection() throws Exception { MQTT mqtt = new MQTT(); mqtt.setConnectAttemptsMax(1); diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index d5d3983026..3acb4bb546 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -46,7 +46,6 @@ import org.apache.activemq.security.SimpleAuthenticationPlugin; import org.apache.activemq.security.SimpleAuthorizationMap; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.Wait; -import org.fusesource.hawtbuf.Buffer; import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.Message; @@ -590,15 +589,7 @@ public class MQTTTest extends AbstractMQTTTest { if (frame.messageType() == PUBLISH.TYPE) { PUBLISH publish = new PUBLISH(); try { - // copy the buffers before we decode - Buffer[] buffers = frame.buffers(); - Buffer[] copy = new Buffer[buffers.length]; - for (int i = 0; i < buffers.length; i++) { - copy[i] = buffers[i].deepCopy(); - } publish.decode(frame); - // reset frame buffers to deep copy - frame.buffers(copy); } catch (ProtocolException e) { fail("Error decoding publish " + e.getMessage()); } @@ -684,15 +675,7 @@ public class MQTTTest extends AbstractMQTTTest { if (frame.messageType() == PUBLISH.TYPE) { PUBLISH publish = new PUBLISH(); try { - // copy the buffers before we decode - Buffer[] buffers = frame.buffers(); - Buffer[] copy = new Buffer[buffers.length]; - for (int i = 0; i < buffers.length; i++) { - copy[i] = buffers[i].deepCopy(); - } publish.decode(frame); - // reset frame buffers to deep copy - frame.buffers(copy); } catch (ProtocolException e) { fail("Error decoding publish " + e.getMessage()); } @@ -717,25 +700,28 @@ public class MQTTTest extends AbstractMQTTTest { // publish non-retained message connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false); - Message msg = connection.receive(5000, TimeUnit.MILLISECONDS); + Message msg = connection.receive(1000, TimeUnit.MILLISECONDS); assertNotNull(msg); assertEquals(TOPIC, new String(msg.getPayload())); - msg = connection.receive(5000, TimeUnit.MILLISECONDS); + msg = connection.receive(1000, TimeUnit.MILLISECONDS); assertNotNull(msg); assertEquals(TOPIC, new String(msg.getPayload())); // drop subs without acknowledging messages, then subscribe and receive again connection.unsubscribe(subs); + Thread.sleep(1000); connection.subscribe(new Topic[]{new Topic(subs[0], QoS.AT_LEAST_ONCE), new Topic(subs[1], QoS.EXACTLY_ONCE)}); + Thread.sleep(1000); msg = connection.receive(5000, TimeUnit.MILLISECONDS); assertNotNull(msg); assertEquals(TOPIC, new String(msg.getPayload())); + final Message msg2 = connection.receive(5000, TimeUnit.MILLISECONDS); + assertNotNull(msg2); + assertEquals(TOPIC, new String(msg2.getPayload())); + // ack messages after receiving all of them msg.ack(); - msg = connection.receive(5000, TimeUnit.MILLISECONDS); - assertNotNull(msg); - assertEquals(TOPIC, new String(msg.getPayload())); - msg.ack(); + msg2.ack(); // make sure we received duplicate message ids List dups = new ArrayList(); @@ -1177,4 +1163,4 @@ public class MQTTTest extends AbstractMQTTTest { connection.disconnect(); } -} \ No newline at end of file +}