Resolve race that leads to stalled connect attempt.  Should fix the
failing CI tests that are seeing this.
This commit is contained in:
Timothy Bish 2015-06-30 17:16:47 -04:00
parent 87fd0a9e05
commit 10ae0d9d6f
2 changed files with 20 additions and 2 deletions

View File

@ -34,7 +34,7 @@ import org.fusesource.mqtt.codec.MQTTFrame;
public abstract class AbstractMQTTSocket extends TransportSupport implements MQTTTransport, BrokerServiceAware { public abstract class AbstractMQTTSocket extends TransportSupport implements MQTTTransport, BrokerServiceAware {
protected MQTTProtocolConverter protocolConverter = null; protected volatile MQTTProtocolConverter protocolConverter = null;
protected MQTTWireFormat wireFormat = new MQTTWireFormat(); protected MQTTWireFormat wireFormat = new MQTTWireFormat();
protected final MQTTInactivityMonitor mqttInactivityMonitor = new MQTTInactivityMonitor(this, wireFormat); protected final MQTTInactivityMonitor mqttInactivityMonitor = new MQTTInactivityMonitor(this, wireFormat);
protected final CountDownLatch socketTransportStarted = new CountDownLatch(1); protected final CountDownLatch socketTransportStarted = new CountDownLatch(1);
@ -122,9 +122,13 @@ public abstract class AbstractMQTTSocket extends TransportSupport implements MQT
//----- Internal support methods -----------------------------------------// //----- Internal support methods -----------------------------------------//
protected MQTTProtocolConverter getProtocolConverter() { protected MQTTProtocolConverter getProtocolConverter() {
if (protocolConverter == null) {
synchronized(this) {
if (protocolConverter == null) { if (protocolConverter == null) {
protocolConverter = new MQTTProtocolConverter(this, brokerService); protocolConverter = new MQTTProtocolConverter(this, brokerService);
} }
}
}
return protocolConverter; return protocolConverter;
} }

View File

@ -72,6 +72,20 @@ public class MQTTWSTransportTest extends WSTransportTestSupport {
super.tearDown(); super.tearDown();
} }
@Test(timeout = 60000)
public void testConnectCycles() throws Exception {
for (int i = 0; i < 10; ++i) {
testConnect();
wsMQTTConnection = new MQTTWSConnection();
wsClient.open(wsConnectUri, wsMQTTConnection);
if (!wsMQTTConnection.awaitConnection(30, TimeUnit.SECONDS)) {
throw new IOException("Could not connect to MQTT WS endpoint");
}
}
}
@Test(timeout = 60000) @Test(timeout = 60000)
public void testConnect() throws Exception { public void testConnect() throws Exception {