mirror of https://github.com/apache/activemq.git
Updated link stealing test to account for isConnected state not being
immediately updated which can lead to failure on a very fast machine.
This commit is contained in:
parent
27a5f6b9d5
commit
b11fc8faf4
|
@ -16,6 +16,14 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.transport.mqtt;
|
package org.apache.activemq.transport.mqtt;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.net.ProtocolException;
|
import java.net.ProtocolException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
@ -27,6 +35,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
import javax.jms.BytesMessage;
|
import javax.jms.BytesMessage;
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
import javax.jms.Destination;
|
import javax.jms.Destination;
|
||||||
|
@ -35,14 +44,6 @@ import javax.jms.MessageProducer;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
import javax.jms.TextMessage;
|
import javax.jms.TextMessage;
|
||||||
|
|
||||||
import static org.junit.Assert.assertArrayEquals;
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertFalse;
|
|
||||||
import static org.junit.Assert.assertNotEquals;
|
|
||||||
import static org.junit.Assert.assertNotNull;
|
|
||||||
import static org.junit.Assert.assertNull;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
import static org.junit.Assert.fail;
|
|
||||||
import org.apache.activemq.ActiveMQConnection;
|
import org.apache.activemq.ActiveMQConnection;
|
||||||
import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
|
import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
|
||||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||||
|
@ -1088,18 +1089,30 @@ public class MQTTTest extends MQTTTestSupport {
|
||||||
final String clientId = "duplicateClient";
|
final String clientId = "duplicateClient";
|
||||||
MQTT mqtt = createMQTTConnection(clientId, false);
|
MQTT mqtt = createMQTTConnection(clientId, false);
|
||||||
mqtt.setKeepAlive((short) 2);
|
mqtt.setKeepAlive((short) 2);
|
||||||
BlockingConnection connection = mqtt.blockingConnection();
|
final BlockingConnection connection = mqtt.blockingConnection();
|
||||||
connection.connect();
|
connection.connect();
|
||||||
final String TOPICA = "TopicA";
|
final String TOPICA = "TopicA";
|
||||||
connection.publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true);
|
connection.publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true);
|
||||||
|
|
||||||
MQTT mqtt1 = createMQTTConnection(clientId, false);
|
MQTT mqtt1 = createMQTTConnection(clientId, false);
|
||||||
mqtt1.setKeepAlive((short) 2);
|
mqtt1.setKeepAlive((short) 2);
|
||||||
BlockingConnection connection1 = mqtt1.blockingConnection();
|
final BlockingConnection connection1 = mqtt1.blockingConnection();
|
||||||
connection1.connect();
|
connection1.connect();
|
||||||
|
|
||||||
assertTrue("Duplicate client disconnected", connection1.isConnected());
|
assertTrue("Duplicate client disconnected", Wait.waitFor(new Wait.Condition() {
|
||||||
assertFalse("Old client still connected", connection.isConnected());
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return connection1.isConnected();
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
|
assertTrue("Old client still connected", Wait.waitFor(new Wait.Condition() {
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return !connection.isConnected();
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
connection1.publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true);
|
connection1.publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true);
|
||||||
connection1.disconnect();
|
connection1.disconnect();
|
||||||
|
|
||||||
|
@ -1110,23 +1123,23 @@ public class MQTTTest extends MQTTTestSupport {
|
||||||
|
|
||||||
mqtt = createMQTTConnection(clientId, false);
|
mqtt = createMQTTConnection(clientId, false);
|
||||||
mqtt.setKeepAlive((short) 2);
|
mqtt.setKeepAlive((short) 2);
|
||||||
connection = mqtt.blockingConnection();
|
final BlockingConnection connection2 = mqtt.blockingConnection();
|
||||||
connection.connect();
|
connection2.connect();
|
||||||
connection.publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true);
|
connection2.publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true);
|
||||||
|
|
||||||
mqtt1 = createMQTTConnection(clientId, false);
|
mqtt1 = createMQTTConnection(clientId, false);
|
||||||
mqtt1.setKeepAlive((short) 2);
|
mqtt1.setKeepAlive((short) 2);
|
||||||
connection1 = mqtt1.blockingConnection();
|
final BlockingConnection connection3 = mqtt1.blockingConnection();
|
||||||
try {
|
try {
|
||||||
connection1.connect();
|
connection3.connect();
|
||||||
fail("Duplicate client connected");
|
fail("Duplicate client connected");
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// ignore
|
// ignore
|
||||||
}
|
}
|
||||||
|
|
||||||
assertTrue("Old client disconnected", connection.isConnected());
|
assertTrue("Old client disconnected", connection2.isConnected());
|
||||||
connection.publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true);
|
connection2.publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true);
|
||||||
connection.disconnect();
|
connection2.disconnect();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 30 * 10000)
|
@Test(timeout = 30 * 10000)
|
||||||
|
|
Loading…
Reference in New Issue