diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractMQTTSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractMQTTSocket.java index 5f67d8a3a0..21c8f5e95f 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractMQTTSocket.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractMQTTSocket.java @@ -34,7 +34,7 @@ import org.fusesource.mqtt.codec.MQTTFrame; public abstract class AbstractMQTTSocket extends TransportSupport implements MQTTTransport, BrokerServiceAware { - protected MQTTProtocolConverter protocolConverter = null; + protected volatile MQTTProtocolConverter protocolConverter = null; protected MQTTWireFormat wireFormat = new MQTTWireFormat(); protected final MQTTInactivityMonitor mqttInactivityMonitor = new MQTTInactivityMonitor(this, wireFormat); protected final CountDownLatch socketTransportStarted = new CountDownLatch(1); @@ -123,7 +123,11 @@ public abstract class AbstractMQTTSocket extends TransportSupport implements MQT protected MQTTProtocolConverter getProtocolConverter() { if (protocolConverter == null) { - protocolConverter = new MQTTProtocolConverter(this, brokerService); + synchronized(this) { + if (protocolConverter == null) { + protocolConverter = new MQTTProtocolConverter(this, brokerService); + } + } } return protocolConverter; diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSTransportTest.java b/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSTransportTest.java index f304ada8af..b62442a834 100644 --- a/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSTransportTest.java +++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSTransportTest.java @@ -72,6 +72,20 @@ public class MQTTWSTransportTest extends WSTransportTestSupport { super.tearDown(); } + @Test(timeout = 60000) + public void testConnectCycles() throws Exception { + for (int i = 0; i < 10; ++i) { + testConnect(); + + wsMQTTConnection = new MQTTWSConnection(); + + wsClient.open(wsConnectUri, wsMQTTConnection); + if (!wsMQTTConnection.awaitConnection(30, TimeUnit.SECONDS)) { + throw new IOException("Could not connect to MQTT WS endpoint"); + } + } + } + @Test(timeout = 60000) public void testConnect() throws Exception {