This commit is contained in:
Clebert Suconic 2019-05-06 23:17:29 -04:00
commit 4c6447ea27
9 changed files with 114 additions and 72 deletions

View File

@ -253,4 +253,9 @@ public class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQ
public Map<String, MQTTSessionState> getSessionStates() {
return new HashMap<>(sessionStates);
}
/** For DEBUG only */
public Map<String, MQTTConnection> getConnectedClients() {
return connectedClients;
}
}

View File

@ -286,6 +286,12 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
return acceptor;
}
/** No interface method, for tests only */
public Map<String, Acceptor> getAcceptors() {
return acceptors;
}
@Override
public void destroyAcceptor(String name) throws Exception {
Acceptor acceptor = acceptors.get(name);

View File

@ -800,7 +800,7 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport {
}
}
@Test(timeout = 240000)
@Test(timeout = 30000)
public void testTimedOutWaitingForWriteLogOnConsumer() throws Throwable {
String name = "exampleQueue1";
@ -823,8 +823,7 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queue = session.createQueue(name);
MessageConsumer c = session.createConsumer(queue);
c.receive(1000);
producer.close();
Assert.assertNotNull(c.receive(1000));
session.close();
}
@ -832,7 +831,7 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport {
queue = session.createQueue(name);
MessageConsumer c = session.createConsumer(queue);
for (int i = 0; i < numMessages; i++) {
c.receive(1000);
Assert.assertNull(c.receive(1));
}
producer.close();
session.close();

View File

@ -69,6 +69,7 @@ public class ConcurrentCreateDeleteProduceTest extends ActiveMQTestBase {
@Test
public void testConcurrentProduceCreateAndDelete() throws Throwable {
locator.setBlockOnDurableSend(false).setBlockOnNonDurableSend(false);
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession(true, true);
ClientProducer producer = session.createProducer(ADDRESS);
@ -84,7 +85,7 @@ public class ConcurrentCreateDeleteProduceTest extends ActiveMQTestBase {
consumers[i].start();
}
for (int i = 0; i < 50000 && running; i++) {
for (int i = 0; i < 1500 && running; i++) {
producer.send(session.createMessage(true));
//Thread.sleep(10);
}
@ -122,9 +123,10 @@ public class ConcurrentCreateDeleteProduceTest extends ActiveMQTestBase {
session.createQueue(ADDRESS, queueName, true);
ClientConsumer consumer = session.createConsumer(queueName);
while (running) {
ClientMessage msg = consumer.receive(5000);
ClientMessage msg = consumer.receive(500);
if (msg == null) {
break;
if (running) continue;
else break;
}
if (msgcount++ == 500) {
msgcount = 0;
@ -134,7 +136,6 @@ public class ConcurrentCreateDeleteProduceTest extends ActiveMQTestBase {
consumer.close();
session.commit();
session.deleteQueue(queueName);
System.out.println("Deleting " + queueName);
}
session.close();
} catch (Throwable e) {

View File

@ -509,7 +509,7 @@ public class ConsumerTest extends ActiveMQTestBase {
}
long time = System.currentTimeMillis();
int NUMBER_OF_MESSAGES = durable ? 500 : 5000;
int NUMBER_OF_MESSAGES = durable ? 5 : 50;
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
TextMessage msg = session.createTextMessage("hello " + i);
msg.setIntProperty("mycount", i);

View File

@ -107,6 +107,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
locator.setConsumerWindowSize(0);
}
}
locator.setBlockOnNonDurableSend(false).setBlockOnNonDurableSend(false).setBlockOnAcknowledge(false);
settings.setMaxDeliveryAttempts(-1);
server.getAddressSettingsRepository().addMatch("#", settings);
@ -148,7 +149,6 @@ public class LargeMessageTest extends LargeMessageTestBase {
// System.out.println("message:" + message);
try {
if (counter++ < 20) {
Thread.sleep(100);
// System.out.println("Rollback");
message.acknowledge();
session.rollback();
@ -1189,22 +1189,22 @@ public class LargeMessageTest extends LargeMessageTestBase {
@Test
public void testFilePersistenceDelayed() throws Exception {
testChunks(false, false, true, false, true, false, false, false, false, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 2000);
testChunks(false, false, true, false, true, false, false, false, false, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 200);
}
@Test
public void testFilePersistenceDelayedConsumer() throws Exception {
testChunks(false, false, true, false, true, false, false, false, true, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 2000);
testChunks(false, false, true, false, true, false, false, false, true, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 200);
}
@Test
public void testFilePersistenceDelayedXA() throws Exception {
testChunks(true, false, true, false, true, false, false, false, false, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 2000);
testChunks(true, false, true, false, true, false, false, false, false, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 200);
}
@Test
public void testFilePersistenceDelayedXAConsumer() throws Exception {
testChunks(true, false, true, false, true, false, false, false, true, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 2000);
testChunks(true, false, true, false, true, false, false, false, true, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 200);
}
@Test
@ -1274,22 +1274,22 @@ public class LargeMessageTest extends LargeMessageTestBase {
@Test
public void testSendRegularMessageNullPersistenceDelayed() throws Exception {
testChunks(false, false, true, false, false, false, false, false, false, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 1000);
testChunks(false, false, true, false, false, false, false, false, false, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 100);
}
@Test
public void testSendRegularMessageNullPersistenceDelayedConsumer() throws Exception {
testChunks(false, false, true, false, false, false, false, false, true, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 1000);
testChunks(false, false, true, false, false, false, false, false, true, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 100);
}
@Test
public void testSendRegularMessageNullPersistenceDelayedXA() throws Exception {
testChunks(true, false, true, false, false, false, false, false, false, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 1000);
testChunks(true, false, true, false, false, false, false, false, false, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 100);
}
@Test
public void testSendRegularMessageNullPersistenceDelayedXAConsumer() throws Exception {
testChunks(true, false, true, false, false, false, false, false, true, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 1000);
testChunks(true, false, true, false, false, false, false, false, true, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 100);
}
@Test
@ -1314,22 +1314,22 @@ public class LargeMessageTest extends LargeMessageTestBase {
@Test
public void testSendRegularMessagePersistenceDelayed() throws Exception {
testChunks(false, false, true, false, true, false, false, false, false, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 1000);
testChunks(false, false, true, false, true, false, false, false, false, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 100);
}
@Test
public void testSendRegularMessagePersistenceDelayedConsumer() throws Exception {
testChunks(false, false, true, false, true, false, false, false, true, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 1000);
testChunks(false, false, true, false, true, false, false, false, true, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 100);
}
@Test
public void testSendRegularMessagePersistenceDelayedXA() throws Exception {
testChunks(false, false, true, false, true, false, false, false, false, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 1000);
testChunks(false, false, true, false, true, false, false, false, false, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 100);
}
@Test
public void testSendRegularMessagePersistenceDelayedXAConsumer() throws Exception {
testChunks(false, false, true, false, true, false, false, false, true, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 1000);
testChunks(false, false, true, false, true, false, false, false, true, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 100);
}
@Test
@ -1659,11 +1659,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
ClientConsumerInternal consumer = (ClientConsumerInternal) session.createConsumer(ADDRESS);
// Wait the consumer to be complete with 10 messages before getting others
long timeout = System.currentTimeMillis() + 10000;
while (consumer.getBufferSize() < NUMBER_OF_MESSAGES && timeout > System.currentTimeMillis()) {
Thread.sleep(10);
}
Wait.waitFor(() -> consumer.getBufferSize() >= NUMBER_OF_MESSAGES, 30_000, 100);
Assert.assertEquals(NUMBER_OF_MESSAGES, consumer.getBufferSize());
// Reads the messages, rollback.. read them again
@ -1711,14 +1707,14 @@ public class LargeMessageTest extends LargeMessageTestBase {
ActiveMQServer server = null;
final int SIZE = 10 * 1024;
final int NUMBER_OF_MESSAGES = 1000;
final int NUMBER_OF_MESSAGES = 100;
try {
server = createServer(true, isNetty(), storeType);
server.start();
locator.setMinLargeMessageSize(1024).setConsumerWindowSize(1024 * 1024);
locator.setMinLargeMessageSize(1024).setConsumerWindowSize(1024 * 1024).setBlockOnDurableSend(false).setBlockOnNonDurableSend(false).setBlockOnAcknowledge(false);
ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));
@ -1744,11 +1740,8 @@ public class LargeMessageTest extends LargeMessageTestBase {
ClientConsumerInternal consumer = (ClientConsumerInternal) session.createConsumer(ADDRESS);
// Wait the consumer to be complete with 10 messages before getting others
long timeout = System.currentTimeMillis() + 10000;
while (consumer.getBufferSize() < 10 && timeout > System.currentTimeMillis()) {
Thread.sleep(10);
}
Assert.assertTrue(Wait.waitFor(() -> consumer.getBufferSize() >= 5, 30_000, 100));
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
ClientMessage msg = consumer.receive(10000);

View File

@ -51,6 +51,7 @@ import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.InVMNodeManagerServer;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert;
import org.junit.Before;
@ -142,12 +143,12 @@ public class JMSFailoverListenerTest extends ActiveMQTestBase {
producer.send(bm);
}
Wait.assertEquals(numMessages, liveServer.locateQueue(jmsQueueName)::getMessageCount);
conn.start();
JMSFailoverListenerTest.log.info("sent messages and started connection");
Thread.sleep(2000);
JMSUtil.crash(liveServer, ((ActiveMQSession) sess).getCoreSession());
Assert.assertEquals(FailoverEventType.FAILURE_DETECTED, listener.get(0));

View File

@ -346,14 +346,10 @@ public abstract class LargeMessageTestBase extends ActiveMQTestBase {
session.start();
for (int i = 0; i < numberOfMessages; i++) {
System.currentTimeMillis();
ClientMessage message = consumer.receive(waitOnConsumer + delayDelivery);
Assert.assertNotNull(message);
System.currentTimeMillis();
if (delayDelivery > 0) {
long originalTime = (Long) message.getObjectProperty(new SimpleString("original-time"));
Assert.assertTrue(System.currentTimeMillis() - originalTime + "<" + delayDelivery, System.currentTimeMillis() - originalTime >= delayDelivery);

View File

@ -21,8 +21,14 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManager;
import org.apache.activemq.artemis.core.remoting.impl.AbstractAcceptor;
import org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.fusesource.mqtt.client.BlockingConnection;
@ -57,16 +63,19 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
BlockingConnection subConnection2 = null;
BlockingConnection pubConnection = null;
try {
//Waiting for resource initialization to complete
Thread.sleep(5000);
Thread.sleep(1000);
Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)};
subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId);
Wait.assertEquals(1, locateMQTTPM(servers[0]).getConnectedClients()::size);
subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId);
pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId);
//Waiting for the first sub connection be closed
assertTrue(waitConnectionClosed(subConnection1));
Wait.assertEquals(1, locateMQTTPM(servers[1]).getConnectedClients()::size);
subConnection1 = null;
subConnection2.subscribe(topics);
waitForBindings(0, ANYCAST_TOPIC, 1, 0, true);
@ -107,11 +116,11 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
pubConnection.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false);
pubConnection.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false);
Message message11 = subConnection2.receive(5, TimeUnit.SECONDS);
Message message11 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
assertNull(message11);
Message message21 = subConnection2.receive(5, TimeUnit.SECONDS);
Message message21 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
assertNull(message21);
Message message31 = subConnection2.receive(5, TimeUnit.SECONDS);
Message message31 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
assertNull(message31);
} finally {
@ -145,7 +154,7 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
BlockingConnection connection2 = null;
try {
//Waiting for resource initialization to complete
Thread.sleep(5000);
Thread.sleep(1000);
Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)};
connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1);
connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2);
@ -244,14 +253,20 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
BlockingConnection pubConnection = null;
try {
//Waiting for resource initialization to complete
Thread.sleep(5000);
Thread.sleep(1000);
Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)};
subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId);
Wait.assertEquals(1, locateMQTTPM(servers[0]).getConnectedClients()::size);
subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId);
pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId);
//Waiting for the first sub connection be closed
assertTrue(waitConnectionClosed(subConnection1));
Wait.assertEquals(1, locateMQTTPM(servers[1]).getConnectedClients()::size);
subConnection1 = null;
subConnection2.subscribe(topics);
@ -293,11 +308,11 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
pubConnection.publish(MULTICAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false);
pubConnection.publish(MULTICAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false);
Message message11 = subConnection2.receive(5, TimeUnit.SECONDS);
Message message11 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
assertNull(message11);
Message message21 = subConnection2.receive(5, TimeUnit.SECONDS);
Message message21 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
assertNull(message21);
Message message31 = subConnection2.receive(5, TimeUnit.SECONDS);
Message message31 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
assertNull(message31);
} finally {
@ -332,7 +347,7 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
BlockingConnection connection2 = null;
try {
//Waiting for resource initialization to complete
Thread.sleep(5000);
Thread.sleep(1000);
Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)};
connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1);
connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2);
@ -436,14 +451,19 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
BlockingConnection pubConnection = null;
try {
//Waiting for resource initialization to complete
Thread.sleep(5000);
Thread.sleep(1000);
Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)};
subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId);
Wait.assertEquals(1, locateMQTTPM(servers[0]).getConnectedClients()::size);
subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId);
pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId);
//Waiting for the first sub connection be closed
assertTrue(waitConnectionClosed(subConnection1));
Wait.assertEquals(1, locateMQTTPM(servers[1]).getConnectedClients()::size);
subConnection1 = null;
subConnection2.subscribe(topics);
@ -485,11 +505,11 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
pubConnection.publish("anycast/test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false);
pubConnection.publish("anycast/test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false);
Message message11 = subConnection2.receive(5, TimeUnit.SECONDS);
Message message11 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
assertNull(message11);
Message message21 = subConnection2.receive(5, TimeUnit.SECONDS);
Message message21 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
assertNull(message21);
Message message31 = subConnection2.receive(5, TimeUnit.SECONDS);
Message message31 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
assertNull(message31);
} finally {
@ -523,7 +543,7 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
BlockingConnection connection2 = null;
try {
//Waiting for resource initialization to complete
Thread.sleep(5000);
Thread.sleep(1000);
Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)};
connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1);
connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2);
@ -608,6 +628,20 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
}
MQTTProtocolManager locateMQTTPM(ActiveMQServer server) {
RemotingServiceImpl impl = (RemotingServiceImpl) server.getRemotingService();
for (Acceptor acceptor : impl.getAcceptors().values()) {
AbstractAcceptor abstractAcceptor = (AbstractAcceptor) acceptor;
for (ProtocolManager manager : abstractAcceptor.getProtocolMap().values()) {
if (manager instanceof MQTTProtocolManager) {
return (MQTTProtocolManager) manager;
}
}
}
return null;
}
@Test
public void useSameClientIdAndMulticastSubscribeRemoteQueueWildCard() throws Exception {
final String MULTICAST_TOPIC = "multicast/test/+/some/#";
@ -624,9 +658,10 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
BlockingConnection pubConnection = null;
try {
//Waiting for resource initialization to complete
Thread.sleep(5000);
Thread.sleep(1000);
Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)};
subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId);
Wait.assertEquals(1, locateMQTTPM(servers[0]).getConnectedClients()::size);
subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId);
pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId);
@ -673,11 +708,11 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
pubConnection.publish("multicast/test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false);
pubConnection.publish("multicast/test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false);
Message message11 = subConnection2.receive(5, TimeUnit.SECONDS);
Message message11 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
assertNull(message11);
Message message21 = subConnection2.receive(5, TimeUnit.SECONDS);
Message message21 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
assertNull(message21);
Message message31 = subConnection2.receive(5, TimeUnit.SECONDS);
Message message31 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
assertNull(message31);
} finally {
@ -712,10 +747,12 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
BlockingConnection connection2 = null;
try {
//Waiting for resource initialization to complete
Thread.sleep(5000);
Thread.sleep(1000);
Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)};
connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1);
Wait.assertEquals(1, locateMQTTPM(servers[0]).getConnectedClients()::size);
connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2);
Wait.assertEquals(1, locateMQTTPM(servers[1]).getConnectedClients()::size);
// Subscribe to topics
connection1.subscribe(topics);
@ -817,7 +854,7 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
BlockingConnection connection2 = null;
try {
//Waiting for resource initialization to complete
Thread.sleep(5000);
Thread.sleep(1000);
connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1);
connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2);
// Subscribe to topics
@ -923,8 +960,11 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
BlockingConnection pubConnection = null;
try {
//Waiting for resource initialization to complete
Thread.sleep(5000);
Thread.sleep(1000);
subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId);
Wait.assertEquals(1, locateMQTTPM(servers[0]).getConnectedClients()::size);
subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId);
pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId);
@ -985,11 +1025,11 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
message11.ack();
assertEquals(payload4, new String(message11.getPayload()));
Message message21 = subConnection2.receive(5, TimeUnit.SECONDS);
Message message21 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
assertNull(message21);
Message message31 = subConnection2.receive(5, TimeUnit.SECONDS);
Message message31 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
assertNull(message31);
Message message41 = subConnection2.receive(5, TimeUnit.SECONDS);
Message message41 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
assertNull(message41);
} finally {
@ -1024,10 +1064,11 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
BlockingConnection pubConnection = null;
try {
//Waiting for resource initialization to complete
Thread.sleep(5000);
Thread.sleep(1000);
pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId);
subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId);
Wait.assertEquals(2, locateMQTTPM(servers[0]).getConnectedClients()::size);
subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId);
//Waiting for the first sub connection be closed
@ -1086,11 +1127,11 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
pubConnection.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false);
pubConnection.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false);
Message message11 = subConnection3.receive(5, TimeUnit.SECONDS);
Message message11 = subConnection3.receive(100, TimeUnit.MILLISECONDS);
assertNull(message11);
Message message21 = subConnection3.receive(5, TimeUnit.SECONDS);
Message message21 = subConnection3.receive(100, TimeUnit.MILLISECONDS);
assertNull(message21);
Message message31 = subConnection3.receive(5, TimeUnit.SECONDS);
Message message31 = subConnection3.receive(100, TimeUnit.MILLISECONDS);
assertNull(message31);
@ -1131,7 +1172,7 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
BlockingConnection connection3 = null;
try {
//Waiting for resource initialization to complete
Thread.sleep(5000);
Thread.sleep(1000);
connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1);
connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2);
connection3 = retrieveMQTTConnection("tcp://localhost:61617", clientId3);