diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java index 6e914432ac..d1777ea6a7 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java @@ -253,4 +253,9 @@ public class MQTTProtocolManager extends AbstractProtocolManager getSessionStates() { return new HashMap<>(sessionStates); } + + /** For DEBUG only */ + public Map getConnectedClients() { + return connectedClients; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java index 87a3c30340..416e9a95bf 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java @@ -286,6 +286,12 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif return acceptor; } + + /** No interface method, for tests only */ + public Map getAcceptors() { + return acceptors; + } + @Override public void destroyAcceptor(String name) throws Exception { Acceptor acceptor = acceptors.get(name); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java index 485d886d92..7a83172b03 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java @@ -800,7 +800,7 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport { } } - @Test(timeout = 240000) + @Test(timeout = 30000) public void testTimedOutWaitingForWriteLogOnConsumer() throws Throwable { String name = "exampleQueue1"; @@ -823,8 +823,7 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport { session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); queue = session.createQueue(name); MessageConsumer c = session.createConsumer(queue); - c.receive(1000); - producer.close(); + Assert.assertNotNull(c.receive(1000)); session.close(); } @@ -832,7 +831,7 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport { queue = session.createQueue(name); MessageConsumer c = session.createConsumer(queue); for (int i = 0; i < numMessages; i++) { - c.receive(1000); + Assert.assertNull(c.receive(1)); } producer.close(); session.close(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConcurrentCreateDeleteProduceTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConcurrentCreateDeleteProduceTest.java index afc34d9ea1..12a7af4d98 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConcurrentCreateDeleteProduceTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConcurrentCreateDeleteProduceTest.java @@ -69,6 +69,7 @@ public class ConcurrentCreateDeleteProduceTest extends ActiveMQTestBase { @Test public void testConcurrentProduceCreateAndDelete() throws Throwable { + locator.setBlockOnDurableSend(false).setBlockOnNonDurableSend(false); ClientSessionFactory factory = locator.createSessionFactory(); ClientSession session = factory.createSession(true, true); ClientProducer producer = session.createProducer(ADDRESS); @@ -84,7 +85,7 @@ public class ConcurrentCreateDeleteProduceTest extends ActiveMQTestBase { consumers[i].start(); } - for (int i = 0; i < 50000 && running; i++) { + for (int i = 0; i < 1500 && running; i++) { producer.send(session.createMessage(true)); //Thread.sleep(10); } @@ -122,9 +123,10 @@ public class ConcurrentCreateDeleteProduceTest extends ActiveMQTestBase { session.createQueue(ADDRESS, queueName, true); ClientConsumer consumer = session.createConsumer(queueName); while (running) { - ClientMessage msg = consumer.receive(5000); + ClientMessage msg = consumer.receive(500); if (msg == null) { - break; + if (running) continue; + else break; } if (msgcount++ == 500) { msgcount = 0; @@ -134,7 +136,6 @@ public class ConcurrentCreateDeleteProduceTest extends ActiveMQTestBase { consumer.close(); session.commit(); session.deleteQueue(queueName); - System.out.println("Deleting " + queueName); } session.close(); } catch (Throwable e) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java index 39200e68e3..bbde2b301b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java @@ -509,7 +509,7 @@ public class ConsumerTest extends ActiveMQTestBase { } long time = System.currentTimeMillis(); - int NUMBER_OF_MESSAGES = durable ? 500 : 5000; + int NUMBER_OF_MESSAGES = durable ? 5 : 50; for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { TextMessage msg = session.createTextMessage("hello " + i); msg.setIntProperty("mycount", i); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java index 84727f6bce..add05f06df 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java @@ -107,6 +107,7 @@ public class LargeMessageTest extends LargeMessageTestBase { locator.setConsumerWindowSize(0); } } + locator.setBlockOnNonDurableSend(false).setBlockOnNonDurableSend(false).setBlockOnAcknowledge(false); settings.setMaxDeliveryAttempts(-1); server.getAddressSettingsRepository().addMatch("#", settings); @@ -148,7 +149,6 @@ public class LargeMessageTest extends LargeMessageTestBase { // System.out.println("message:" + message); try { if (counter++ < 20) { - Thread.sleep(100); // System.out.println("Rollback"); message.acknowledge(); session.rollback(); @@ -1189,22 +1189,22 @@ public class LargeMessageTest extends LargeMessageTestBase { @Test public void testFilePersistenceDelayed() throws Exception { - testChunks(false, false, true, false, true, false, false, false, false, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 2000); + testChunks(false, false, true, false, true, false, false, false, false, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 200); } @Test public void testFilePersistenceDelayedConsumer() throws Exception { - testChunks(false, false, true, false, true, false, false, false, true, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 2000); + testChunks(false, false, true, false, true, false, false, false, true, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 200); } @Test public void testFilePersistenceDelayedXA() throws Exception { - testChunks(true, false, true, false, true, false, false, false, false, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 2000); + testChunks(true, false, true, false, true, false, false, false, false, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 200); } @Test public void testFilePersistenceDelayedXAConsumer() throws Exception { - testChunks(true, false, true, false, true, false, false, false, true, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 2000); + testChunks(true, false, true, false, true, false, false, false, true, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 200); } @Test @@ -1274,22 +1274,22 @@ public class LargeMessageTest extends LargeMessageTestBase { @Test public void testSendRegularMessageNullPersistenceDelayed() throws Exception { - testChunks(false, false, true, false, false, false, false, false, false, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 1000); + testChunks(false, false, true, false, false, false, false, false, false, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 100); } @Test public void testSendRegularMessageNullPersistenceDelayedConsumer() throws Exception { - testChunks(false, false, true, false, false, false, false, false, true, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 1000); + testChunks(false, false, true, false, false, false, false, false, true, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 100); } @Test public void testSendRegularMessageNullPersistenceDelayedXA() throws Exception { - testChunks(true, false, true, false, false, false, false, false, false, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 1000); + testChunks(true, false, true, false, false, false, false, false, false, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 100); } @Test public void testSendRegularMessageNullPersistenceDelayedXAConsumer() throws Exception { - testChunks(true, false, true, false, false, false, false, false, true, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 1000); + testChunks(true, false, true, false, false, false, false, false, true, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 100); } @Test @@ -1314,22 +1314,22 @@ public class LargeMessageTest extends LargeMessageTestBase { @Test public void testSendRegularMessagePersistenceDelayed() throws Exception { - testChunks(false, false, true, false, true, false, false, false, false, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 1000); + testChunks(false, false, true, false, true, false, false, false, false, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 100); } @Test public void testSendRegularMessagePersistenceDelayedConsumer() throws Exception { - testChunks(false, false, true, false, true, false, false, false, true, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 1000); + testChunks(false, false, true, false, true, false, false, false, true, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 100); } @Test public void testSendRegularMessagePersistenceDelayedXA() throws Exception { - testChunks(false, false, true, false, true, false, false, false, false, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 1000); + testChunks(false, false, true, false, true, false, false, false, false, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 100); } @Test public void testSendRegularMessagePersistenceDelayedXAConsumer() throws Exception { - testChunks(false, false, true, false, true, false, false, false, true, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 1000); + testChunks(false, false, true, false, true, false, false, false, true, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 100); } @Test @@ -1659,11 +1659,7 @@ public class LargeMessageTest extends LargeMessageTestBase { ClientConsumerInternal consumer = (ClientConsumerInternal) session.createConsumer(ADDRESS); - // Wait the consumer to be complete with 10 messages before getting others - long timeout = System.currentTimeMillis() + 10000; - while (consumer.getBufferSize() < NUMBER_OF_MESSAGES && timeout > System.currentTimeMillis()) { - Thread.sleep(10); - } + Wait.waitFor(() -> consumer.getBufferSize() >= NUMBER_OF_MESSAGES, 30_000, 100); Assert.assertEquals(NUMBER_OF_MESSAGES, consumer.getBufferSize()); // Reads the messages, rollback.. read them again @@ -1711,14 +1707,14 @@ public class LargeMessageTest extends LargeMessageTestBase { ActiveMQServer server = null; final int SIZE = 10 * 1024; - final int NUMBER_OF_MESSAGES = 1000; + final int NUMBER_OF_MESSAGES = 100; try { server = createServer(true, isNetty(), storeType); server.start(); - locator.setMinLargeMessageSize(1024).setConsumerWindowSize(1024 * 1024); + locator.setMinLargeMessageSize(1024).setConsumerWindowSize(1024 * 1024).setBlockOnDurableSend(false).setBlockOnNonDurableSend(false).setBlockOnAcknowledge(false); ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator)); @@ -1744,11 +1740,8 @@ public class LargeMessageTest extends LargeMessageTestBase { ClientConsumerInternal consumer = (ClientConsumerInternal) session.createConsumer(ADDRESS); - // Wait the consumer to be complete with 10 messages before getting others - long timeout = System.currentTimeMillis() + 10000; - while (consumer.getBufferSize() < 10 && timeout > System.currentTimeMillis()) { - Thread.sleep(10); - } + Assert.assertTrue(Wait.waitFor(() -> consumer.getBufferSize() >= 5, 30_000, 100)); + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { ClientMessage msg = consumer.receive(10000); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverListenerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverListenerTest.java index cec8e1b8fa..7bd9cbecd8 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverListenerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverListenerTest.java @@ -51,6 +51,7 @@ import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.InVMNodeManagerServer; +import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.utils.RandomUtil; import org.junit.Assert; import org.junit.Before; @@ -142,12 +143,12 @@ public class JMSFailoverListenerTest extends ActiveMQTestBase { producer.send(bm); } + Wait.assertEquals(numMessages, liveServer.locateQueue(jmsQueueName)::getMessageCount); + conn.start(); JMSFailoverListenerTest.log.info("sent messages and started connection"); - Thread.sleep(2000); - JMSUtil.crash(liveServer, ((ActiveMQSession) sess).getCoreSession()); Assert.assertEquals(FailoverEventType.FAILURE_DETECTED, listener.get(0)); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/LargeMessageTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/LargeMessageTestBase.java index 302c64a1a3..1b1a930892 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/LargeMessageTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/LargeMessageTestBase.java @@ -346,14 +346,10 @@ public abstract class LargeMessageTestBase extends ActiveMQTestBase { session.start(); for (int i = 0; i < numberOfMessages; i++) { - System.currentTimeMillis(); - ClientMessage message = consumer.receive(waitOnConsumer + delayDelivery); Assert.assertNotNull(message); - System.currentTimeMillis(); - if (delayDelivery > 0) { long originalTime = (Long) message.getObjectProperty(new SimpleString("original-time")); Assert.assertTrue(System.currentTimeMillis() - originalTime + "<" + delayDelivery, System.currentTimeMillis() - originalTime >= delayDelivery); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java index 19360b16ae..f8391098a8 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java @@ -21,8 +21,14 @@ import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.config.CoreAddressConfiguration; import org.apache.activemq.artemis.core.config.CoreQueueConfiguration; import org.apache.activemq.artemis.core.config.WildcardConfiguration; +import org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManager; +import org.apache.activemq.artemis.core.remoting.impl.AbstractAcceptor; +import org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl; +import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; +import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase; import org.apache.activemq.artemis.tests.util.Wait; import org.fusesource.mqtt.client.BlockingConnection; @@ -57,16 +63,19 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { BlockingConnection subConnection2 = null; BlockingConnection pubConnection = null; try { - //Waiting for resource initialization to complete - Thread.sleep(5000); + Thread.sleep(1000); Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)}; subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId); + + Wait.assertEquals(1, locateMQTTPM(servers[0]).getConnectedClients()::size); + subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId); //Waiting for the first sub connection be closed assertTrue(waitConnectionClosed(subConnection1)); - + Wait.assertEquals(1, locateMQTTPM(servers[1]).getConnectedClients()::size); + subConnection1 = null; subConnection2.subscribe(topics); waitForBindings(0, ANYCAST_TOPIC, 1, 0, true); @@ -107,11 +116,11 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { pubConnection.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); pubConnection.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); - Message message11 = subConnection2.receive(5, TimeUnit.SECONDS); + Message message11 = subConnection2.receive(100, TimeUnit.MILLISECONDS); assertNull(message11); - Message message21 = subConnection2.receive(5, TimeUnit.SECONDS); + Message message21 = subConnection2.receive(100, TimeUnit.MILLISECONDS); assertNull(message21); - Message message31 = subConnection2.receive(5, TimeUnit.SECONDS); + Message message31 = subConnection2.receive(100, TimeUnit.MILLISECONDS); assertNull(message31); } finally { @@ -145,7 +154,7 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { BlockingConnection connection2 = null; try { //Waiting for resource initialization to complete - Thread.sleep(5000); + Thread.sleep(1000); Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)}; connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1); connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2); @@ -244,14 +253,20 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { BlockingConnection pubConnection = null; try { //Waiting for resource initialization to complete - Thread.sleep(5000); + Thread.sleep(1000); Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)}; subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId); + + Wait.assertEquals(1, locateMQTTPM(servers[0]).getConnectedClients()::size); + subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId); //Waiting for the first sub connection be closed assertTrue(waitConnectionClosed(subConnection1)); + Wait.assertEquals(1, locateMQTTPM(servers[1]).getConnectedClients()::size); + subConnection1 = null; + subConnection2.subscribe(topics); @@ -293,11 +308,11 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { pubConnection.publish(MULTICAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); pubConnection.publish(MULTICAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); - Message message11 = subConnection2.receive(5, TimeUnit.SECONDS); + Message message11 = subConnection2.receive(100, TimeUnit.MILLISECONDS); assertNull(message11); - Message message21 = subConnection2.receive(5, TimeUnit.SECONDS); + Message message21 = subConnection2.receive(100, TimeUnit.MILLISECONDS); assertNull(message21); - Message message31 = subConnection2.receive(5, TimeUnit.SECONDS); + Message message31 = subConnection2.receive(100, TimeUnit.MILLISECONDS); assertNull(message31); } finally { @@ -332,7 +347,7 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { BlockingConnection connection2 = null; try { //Waiting for resource initialization to complete - Thread.sleep(5000); + Thread.sleep(1000); Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)}; connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1); connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2); @@ -436,14 +451,19 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { BlockingConnection pubConnection = null; try { //Waiting for resource initialization to complete - Thread.sleep(5000); + Thread.sleep(1000); Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)}; subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId); + + Wait.assertEquals(1, locateMQTTPM(servers[0]).getConnectedClients()::size); + subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId); //Waiting for the first sub connection be closed assertTrue(waitConnectionClosed(subConnection1)); + Wait.assertEquals(1, locateMQTTPM(servers[1]).getConnectedClients()::size); + subConnection1 = null; subConnection2.subscribe(topics); @@ -485,11 +505,11 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { pubConnection.publish("anycast/test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false); pubConnection.publish("anycast/test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false); - Message message11 = subConnection2.receive(5, TimeUnit.SECONDS); + Message message11 = subConnection2.receive(100, TimeUnit.MILLISECONDS); assertNull(message11); - Message message21 = subConnection2.receive(5, TimeUnit.SECONDS); + Message message21 = subConnection2.receive(100, TimeUnit.MILLISECONDS); assertNull(message21); - Message message31 = subConnection2.receive(5, TimeUnit.SECONDS); + Message message31 = subConnection2.receive(100, TimeUnit.MILLISECONDS); assertNull(message31); } finally { @@ -523,7 +543,7 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { BlockingConnection connection2 = null; try { //Waiting for resource initialization to complete - Thread.sleep(5000); + Thread.sleep(1000); Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)}; connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1); connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2); @@ -608,6 +628,20 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { } + MQTTProtocolManager locateMQTTPM(ActiveMQServer server) { + + RemotingServiceImpl impl = (RemotingServiceImpl) server.getRemotingService(); + for (Acceptor acceptor : impl.getAcceptors().values()) { + AbstractAcceptor abstractAcceptor = (AbstractAcceptor) acceptor; + for (ProtocolManager manager : abstractAcceptor.getProtocolMap().values()) { + if (manager instanceof MQTTProtocolManager) { + return (MQTTProtocolManager) manager; + } + } + } + return null; + } + @Test public void useSameClientIdAndMulticastSubscribeRemoteQueueWildCard() throws Exception { final String MULTICAST_TOPIC = "multicast/test/+/some/#"; @@ -624,9 +658,10 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { BlockingConnection pubConnection = null; try { //Waiting for resource initialization to complete - Thread.sleep(5000); + Thread.sleep(1000); Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)}; subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId); + Wait.assertEquals(1, locateMQTTPM(servers[0]).getConnectedClients()::size); subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId); @@ -673,11 +708,11 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { pubConnection.publish("multicast/test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false); pubConnection.publish("multicast/test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false); - Message message11 = subConnection2.receive(5, TimeUnit.SECONDS); + Message message11 = subConnection2.receive(100, TimeUnit.MILLISECONDS); assertNull(message11); - Message message21 = subConnection2.receive(5, TimeUnit.SECONDS); + Message message21 = subConnection2.receive(100, TimeUnit.MILLISECONDS); assertNull(message21); - Message message31 = subConnection2.receive(5, TimeUnit.SECONDS); + Message message31 = subConnection2.receive(100, TimeUnit.MILLISECONDS); assertNull(message31); } finally { @@ -712,10 +747,12 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { BlockingConnection connection2 = null; try { //Waiting for resource initialization to complete - Thread.sleep(5000); + Thread.sleep(1000); Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)}; connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1); + Wait.assertEquals(1, locateMQTTPM(servers[0]).getConnectedClients()::size); connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2); + Wait.assertEquals(1, locateMQTTPM(servers[1]).getConnectedClients()::size); // Subscribe to topics connection1.subscribe(topics); @@ -817,7 +854,7 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { BlockingConnection connection2 = null; try { //Waiting for resource initialization to complete - Thread.sleep(5000); + Thread.sleep(1000); connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1); connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2); // Subscribe to topics @@ -923,8 +960,11 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { BlockingConnection pubConnection = null; try { //Waiting for resource initialization to complete - Thread.sleep(5000); + Thread.sleep(1000); subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId); + + Wait.assertEquals(1, locateMQTTPM(servers[0]).getConnectedClients()::size); + subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId); @@ -985,11 +1025,11 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { message11.ack(); assertEquals(payload4, new String(message11.getPayload())); - Message message21 = subConnection2.receive(5, TimeUnit.SECONDS); + Message message21 = subConnection2.receive(100, TimeUnit.MILLISECONDS); assertNull(message21); - Message message31 = subConnection2.receive(5, TimeUnit.SECONDS); + Message message31 = subConnection2.receive(100, TimeUnit.MILLISECONDS); assertNull(message31); - Message message41 = subConnection2.receive(5, TimeUnit.SECONDS); + Message message41 = subConnection2.receive(100, TimeUnit.MILLISECONDS); assertNull(message41); } finally { @@ -1024,10 +1064,11 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { BlockingConnection pubConnection = null; try { //Waiting for resource initialization to complete - Thread.sleep(5000); + Thread.sleep(1000); pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId); subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId); + Wait.assertEquals(2, locateMQTTPM(servers[0]).getConnectedClients()::size); subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); //Waiting for the first sub connection be closed @@ -1086,11 +1127,11 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { pubConnection.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); pubConnection.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); - Message message11 = subConnection3.receive(5, TimeUnit.SECONDS); + Message message11 = subConnection3.receive(100, TimeUnit.MILLISECONDS); assertNull(message11); - Message message21 = subConnection3.receive(5, TimeUnit.SECONDS); + Message message21 = subConnection3.receive(100, TimeUnit.MILLISECONDS); assertNull(message21); - Message message31 = subConnection3.receive(5, TimeUnit.SECONDS); + Message message31 = subConnection3.receive(100, TimeUnit.MILLISECONDS); assertNull(message31); @@ -1131,7 +1172,7 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { BlockingConnection connection3 = null; try { //Waiting for resource initialization to complete - Thread.sleep(5000); + Thread.sleep(1000); connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1); connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2); connection3 = retrieveMQTTConnection("tcp://localhost:61617", clientId3);