NO-JIRA speeding up a few tests

This commit is contained in:
Clebert Suconic 2019-05-06 11:49:06 -04:00
parent 2395358e80
commit 0e40984145
9 changed files with 114 additions and 72 deletions

View File

@ -253,4 +253,9 @@ public class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQ
public Map<String, MQTTSessionState> getSessionStates() { public Map<String, MQTTSessionState> getSessionStates() {
return new HashMap<>(sessionStates); return new HashMap<>(sessionStates);
} }
/** For DEBUG only */
public Map<String, MQTTConnection> getConnectedClients() {
return connectedClients;
}
} }

View File

@ -286,6 +286,12 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
return acceptor; return acceptor;
} }
/** No interface method, for tests only */
public Map<String, Acceptor> getAcceptors() {
return acceptors;
}
@Override @Override
public void destroyAcceptor(String name) throws Exception { public void destroyAcceptor(String name) throws Exception {
Acceptor acceptor = acceptors.get(name); Acceptor acceptor = acceptors.get(name);

View File

@ -800,7 +800,7 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport {
} }
} }
@Test(timeout = 240000) @Test(timeout = 30000)
public void testTimedOutWaitingForWriteLogOnConsumer() throws Throwable { public void testTimedOutWaitingForWriteLogOnConsumer() throws Throwable {
String name = "exampleQueue1"; String name = "exampleQueue1";
@ -823,8 +823,7 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queue = session.createQueue(name); queue = session.createQueue(name);
MessageConsumer c = session.createConsumer(queue); MessageConsumer c = session.createConsumer(queue);
c.receive(1000); Assert.assertNotNull(c.receive(1000));
producer.close();
session.close(); session.close();
} }
@ -832,7 +831,7 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport {
queue = session.createQueue(name); queue = session.createQueue(name);
MessageConsumer c = session.createConsumer(queue); MessageConsumer c = session.createConsumer(queue);
for (int i = 0; i < numMessages; i++) { for (int i = 0; i < numMessages; i++) {
c.receive(1000); Assert.assertNull(c.receive(1));
} }
producer.close(); producer.close();
session.close(); session.close();

View File

@ -69,6 +69,7 @@ public class ConcurrentCreateDeleteProduceTest extends ActiveMQTestBase {
@Test @Test
public void testConcurrentProduceCreateAndDelete() throws Throwable { public void testConcurrentProduceCreateAndDelete() throws Throwable {
locator.setBlockOnDurableSend(false).setBlockOnNonDurableSend(false);
ClientSessionFactory factory = locator.createSessionFactory(); ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession(true, true); ClientSession session = factory.createSession(true, true);
ClientProducer producer = session.createProducer(ADDRESS); ClientProducer producer = session.createProducer(ADDRESS);
@ -84,7 +85,7 @@ public class ConcurrentCreateDeleteProduceTest extends ActiveMQTestBase {
consumers[i].start(); consumers[i].start();
} }
for (int i = 0; i < 50000 && running; i++) { for (int i = 0; i < 1500 && running; i++) {
producer.send(session.createMessage(true)); producer.send(session.createMessage(true));
//Thread.sleep(10); //Thread.sleep(10);
} }
@ -122,9 +123,10 @@ public class ConcurrentCreateDeleteProduceTest extends ActiveMQTestBase {
session.createQueue(ADDRESS, queueName, true); session.createQueue(ADDRESS, queueName, true);
ClientConsumer consumer = session.createConsumer(queueName); ClientConsumer consumer = session.createConsumer(queueName);
while (running) { while (running) {
ClientMessage msg = consumer.receive(5000); ClientMessage msg = consumer.receive(500);
if (msg == null) { if (msg == null) {
break; if (running) continue;
else break;
} }
if (msgcount++ == 500) { if (msgcount++ == 500) {
msgcount = 0; msgcount = 0;
@ -134,7 +136,6 @@ public class ConcurrentCreateDeleteProduceTest extends ActiveMQTestBase {
consumer.close(); consumer.close();
session.commit(); session.commit();
session.deleteQueue(queueName); session.deleteQueue(queueName);
System.out.println("Deleting " + queueName);
} }
session.close(); session.close();
} catch (Throwable e) { } catch (Throwable e) {

View File

@ -509,7 +509,7 @@ public class ConsumerTest extends ActiveMQTestBase {
} }
long time = System.currentTimeMillis(); long time = System.currentTimeMillis();
int NUMBER_OF_MESSAGES = durable ? 500 : 5000; int NUMBER_OF_MESSAGES = durable ? 5 : 50;
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
TextMessage msg = session.createTextMessage("hello " + i); TextMessage msg = session.createTextMessage("hello " + i);
msg.setIntProperty("mycount", i); msg.setIntProperty("mycount", i);

View File

@ -107,6 +107,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
locator.setConsumerWindowSize(0); locator.setConsumerWindowSize(0);
} }
} }
locator.setBlockOnNonDurableSend(false).setBlockOnNonDurableSend(false).setBlockOnAcknowledge(false);
settings.setMaxDeliveryAttempts(-1); settings.setMaxDeliveryAttempts(-1);
server.getAddressSettingsRepository().addMatch("#", settings); server.getAddressSettingsRepository().addMatch("#", settings);
@ -148,7 +149,6 @@ public class LargeMessageTest extends LargeMessageTestBase {
// System.out.println("message:" + message); // System.out.println("message:" + message);
try { try {
if (counter++ < 20) { if (counter++ < 20) {
Thread.sleep(100);
// System.out.println("Rollback"); // System.out.println("Rollback");
message.acknowledge(); message.acknowledge();
session.rollback(); session.rollback();
@ -1189,22 +1189,22 @@ public class LargeMessageTest extends LargeMessageTestBase {
@Test @Test
public void testFilePersistenceDelayed() throws Exception { public void testFilePersistenceDelayed() throws Exception {
testChunks(false, false, true, false, true, false, false, false, false, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 2000); testChunks(false, false, true, false, true, false, false, false, false, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 200);
} }
@Test @Test
public void testFilePersistenceDelayedConsumer() throws Exception { public void testFilePersistenceDelayedConsumer() throws Exception {
testChunks(false, false, true, false, true, false, false, false, true, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 2000); testChunks(false, false, true, false, true, false, false, false, true, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 200);
} }
@Test @Test
public void testFilePersistenceDelayedXA() throws Exception { public void testFilePersistenceDelayedXA() throws Exception {
testChunks(true, false, true, false, true, false, false, false, false, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 2000); testChunks(true, false, true, false, true, false, false, false, false, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 200);
} }
@Test @Test
public void testFilePersistenceDelayedXAConsumer() throws Exception { public void testFilePersistenceDelayedXAConsumer() throws Exception {
testChunks(true, false, true, false, true, false, false, false, true, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 2000); testChunks(true, false, true, false, true, false, false, false, true, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 200);
} }
@Test @Test
@ -1274,22 +1274,22 @@ public class LargeMessageTest extends LargeMessageTestBase {
@Test @Test
public void testSendRegularMessageNullPersistenceDelayed() throws Exception { public void testSendRegularMessageNullPersistenceDelayed() throws Exception {
testChunks(false, false, true, false, false, false, false, false, false, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 1000); testChunks(false, false, true, false, false, false, false, false, false, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 100);
} }
@Test @Test
public void testSendRegularMessageNullPersistenceDelayedConsumer() throws Exception { public void testSendRegularMessageNullPersistenceDelayedConsumer() throws Exception {
testChunks(false, false, true, false, false, false, false, false, true, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 1000); testChunks(false, false, true, false, false, false, false, false, true, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 100);
} }
@Test @Test
public void testSendRegularMessageNullPersistenceDelayedXA() throws Exception { public void testSendRegularMessageNullPersistenceDelayedXA() throws Exception {
testChunks(true, false, true, false, false, false, false, false, false, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 1000); testChunks(true, false, true, false, false, false, false, false, false, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 100);
} }
@Test @Test
public void testSendRegularMessageNullPersistenceDelayedXAConsumer() throws Exception { public void testSendRegularMessageNullPersistenceDelayedXAConsumer() throws Exception {
testChunks(true, false, true, false, false, false, false, false, true, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 1000); testChunks(true, false, true, false, false, false, false, false, true, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 100);
} }
@Test @Test
@ -1314,22 +1314,22 @@ public class LargeMessageTest extends LargeMessageTestBase {
@Test @Test
public void testSendRegularMessagePersistenceDelayed() throws Exception { public void testSendRegularMessagePersistenceDelayed() throws Exception {
testChunks(false, false, true, false, true, false, false, false, false, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 1000); testChunks(false, false, true, false, true, false, false, false, false, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 100);
} }
@Test @Test
public void testSendRegularMessagePersistenceDelayedConsumer() throws Exception { public void testSendRegularMessagePersistenceDelayedConsumer() throws Exception {
testChunks(false, false, true, false, true, false, false, false, true, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 1000); testChunks(false, false, true, false, true, false, false, false, true, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 100);
} }
@Test @Test
public void testSendRegularMessagePersistenceDelayedXA() throws Exception { public void testSendRegularMessagePersistenceDelayedXA() throws Exception {
testChunks(false, false, true, false, true, false, false, false, false, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 1000); testChunks(false, false, true, false, true, false, false, false, false, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 100);
} }
@Test @Test
public void testSendRegularMessagePersistenceDelayedXAConsumer() throws Exception { public void testSendRegularMessagePersistenceDelayedXAConsumer() throws Exception {
testChunks(false, false, true, false, true, false, false, false, true, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 1000); testChunks(false, false, true, false, true, false, false, false, true, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME, 100);
} }
@Test @Test
@ -1659,11 +1659,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
ClientConsumerInternal consumer = (ClientConsumerInternal) session.createConsumer(ADDRESS); ClientConsumerInternal consumer = (ClientConsumerInternal) session.createConsumer(ADDRESS);
// Wait the consumer to be complete with 10 messages before getting others Wait.waitFor(() -> consumer.getBufferSize() >= NUMBER_OF_MESSAGES, 30_000, 100);
long timeout = System.currentTimeMillis() + 10000;
while (consumer.getBufferSize() < NUMBER_OF_MESSAGES && timeout > System.currentTimeMillis()) {
Thread.sleep(10);
}
Assert.assertEquals(NUMBER_OF_MESSAGES, consumer.getBufferSize()); Assert.assertEquals(NUMBER_OF_MESSAGES, consumer.getBufferSize());
// Reads the messages, rollback.. read them again // Reads the messages, rollback.. read them again
@ -1711,14 +1707,14 @@ public class LargeMessageTest extends LargeMessageTestBase {
ActiveMQServer server = null; ActiveMQServer server = null;
final int SIZE = 10 * 1024; final int SIZE = 10 * 1024;
final int NUMBER_OF_MESSAGES = 1000; final int NUMBER_OF_MESSAGES = 100;
try { try {
server = createServer(true, isNetty(), storeType); server = createServer(true, isNetty(), storeType);
server.start(); server.start();
locator.setMinLargeMessageSize(1024).setConsumerWindowSize(1024 * 1024); locator.setMinLargeMessageSize(1024).setConsumerWindowSize(1024 * 1024).setBlockOnDurableSend(false).setBlockOnNonDurableSend(false).setBlockOnAcknowledge(false);
ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator)); ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));
@ -1744,11 +1740,8 @@ public class LargeMessageTest extends LargeMessageTestBase {
ClientConsumerInternal consumer = (ClientConsumerInternal) session.createConsumer(ADDRESS); ClientConsumerInternal consumer = (ClientConsumerInternal) session.createConsumer(ADDRESS);
// Wait the consumer to be complete with 10 messages before getting others Assert.assertTrue(Wait.waitFor(() -> consumer.getBufferSize() >= 5, 30_000, 100));
long timeout = System.currentTimeMillis() + 10000;
while (consumer.getBufferSize() < 10 && timeout > System.currentTimeMillis()) {
Thread.sleep(10);
}
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
ClientMessage msg = consumer.receive(10000); ClientMessage msg = consumer.receive(10000);

View File

@ -51,6 +51,7 @@ import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil; import org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.InVMNodeManagerServer; import org.apache.activemq.artemis.tests.util.InVMNodeManagerServer;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -142,12 +143,12 @@ public class JMSFailoverListenerTest extends ActiveMQTestBase {
producer.send(bm); producer.send(bm);
} }
Wait.assertEquals(numMessages, liveServer.locateQueue(jmsQueueName)::getMessageCount);
conn.start(); conn.start();
JMSFailoverListenerTest.log.info("sent messages and started connection"); JMSFailoverListenerTest.log.info("sent messages and started connection");
Thread.sleep(2000);
JMSUtil.crash(liveServer, ((ActiveMQSession) sess).getCoreSession()); JMSUtil.crash(liveServer, ((ActiveMQSession) sess).getCoreSession());
Assert.assertEquals(FailoverEventType.FAILURE_DETECTED, listener.get(0)); Assert.assertEquals(FailoverEventType.FAILURE_DETECTED, listener.get(0));

View File

@ -346,14 +346,10 @@ public abstract class LargeMessageTestBase extends ActiveMQTestBase {
session.start(); session.start();
for (int i = 0; i < numberOfMessages; i++) { for (int i = 0; i < numberOfMessages; i++) {
System.currentTimeMillis();
ClientMessage message = consumer.receive(waitOnConsumer + delayDelivery); ClientMessage message = consumer.receive(waitOnConsumer + delayDelivery);
Assert.assertNotNull(message); Assert.assertNotNull(message);
System.currentTimeMillis();
if (delayDelivery > 0) { if (delayDelivery > 0) {
long originalTime = (Long) message.getObjectProperty(new SimpleString("original-time")); long originalTime = (Long) message.getObjectProperty(new SimpleString("original-time"));
Assert.assertTrue(System.currentTimeMillis() - originalTime + "<" + delayDelivery, System.currentTimeMillis() - originalTime >= delayDelivery); Assert.assertTrue(System.currentTimeMillis() - originalTime + "<" + delayDelivery, System.currentTimeMillis() - originalTime >= delayDelivery);

View File

@ -21,8 +21,14 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration; import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration; import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
import org.apache.activemq.artemis.core.config.WildcardConfiguration; import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManager;
import org.apache.activemq.artemis.core.remoting.impl.AbstractAcceptor;
import org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase; import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.tests.util.Wait;
import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.BlockingConnection;
@ -57,16 +63,19 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
BlockingConnection subConnection2 = null; BlockingConnection subConnection2 = null;
BlockingConnection pubConnection = null; BlockingConnection pubConnection = null;
try { try {
//Waiting for resource initialization to complete Thread.sleep(1000);
Thread.sleep(5000);
Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)}; Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)};
subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId); subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId);
Wait.assertEquals(1, locateMQTTPM(servers[0]).getConnectedClients()::size);
subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId);
pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId); pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId);
//Waiting for the first sub connection be closed //Waiting for the first sub connection be closed
assertTrue(waitConnectionClosed(subConnection1)); assertTrue(waitConnectionClosed(subConnection1));
Wait.assertEquals(1, locateMQTTPM(servers[1]).getConnectedClients()::size);
subConnection1 = null;
subConnection2.subscribe(topics); subConnection2.subscribe(topics);
waitForBindings(0, ANYCAST_TOPIC, 1, 0, true); waitForBindings(0, ANYCAST_TOPIC, 1, 0, true);
@ -107,11 +116,11 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
pubConnection.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); pubConnection.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false);
pubConnection.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); pubConnection.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false);
Message message11 = subConnection2.receive(5, TimeUnit.SECONDS); Message message11 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
assertNull(message11); assertNull(message11);
Message message21 = subConnection2.receive(5, TimeUnit.SECONDS); Message message21 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
assertNull(message21); assertNull(message21);
Message message31 = subConnection2.receive(5, TimeUnit.SECONDS); Message message31 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
assertNull(message31); assertNull(message31);
} finally { } finally {
@ -145,7 +154,7 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
BlockingConnection connection2 = null; BlockingConnection connection2 = null;
try { try {
//Waiting for resource initialization to complete //Waiting for resource initialization to complete
Thread.sleep(5000); Thread.sleep(1000);
Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)}; Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)};
connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1); connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1);
connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2); connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2);
@ -244,14 +253,20 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
BlockingConnection pubConnection = null; BlockingConnection pubConnection = null;
try { try {
//Waiting for resource initialization to complete //Waiting for resource initialization to complete
Thread.sleep(5000); Thread.sleep(1000);
Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)}; Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)};
subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId); subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId);
Wait.assertEquals(1, locateMQTTPM(servers[0]).getConnectedClients()::size);
subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId);
pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId); pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId);
//Waiting for the first sub connection be closed //Waiting for the first sub connection be closed
assertTrue(waitConnectionClosed(subConnection1)); assertTrue(waitConnectionClosed(subConnection1));
Wait.assertEquals(1, locateMQTTPM(servers[1]).getConnectedClients()::size);
subConnection1 = null;
subConnection2.subscribe(topics); subConnection2.subscribe(topics);
@ -293,11 +308,11 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
pubConnection.publish(MULTICAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); pubConnection.publish(MULTICAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false);
pubConnection.publish(MULTICAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); pubConnection.publish(MULTICAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false);
Message message11 = subConnection2.receive(5, TimeUnit.SECONDS); Message message11 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
assertNull(message11); assertNull(message11);
Message message21 = subConnection2.receive(5, TimeUnit.SECONDS); Message message21 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
assertNull(message21); assertNull(message21);
Message message31 = subConnection2.receive(5, TimeUnit.SECONDS); Message message31 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
assertNull(message31); assertNull(message31);
} finally { } finally {
@ -332,7 +347,7 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
BlockingConnection connection2 = null; BlockingConnection connection2 = null;
try { try {
//Waiting for resource initialization to complete //Waiting for resource initialization to complete
Thread.sleep(5000); Thread.sleep(1000);
Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)}; Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)};
connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1); connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1);
connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2); connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2);
@ -436,14 +451,19 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
BlockingConnection pubConnection = null; BlockingConnection pubConnection = null;
try { try {
//Waiting for resource initialization to complete //Waiting for resource initialization to complete
Thread.sleep(5000); Thread.sleep(1000);
Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)}; Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)};
subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId); subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId);
Wait.assertEquals(1, locateMQTTPM(servers[0]).getConnectedClients()::size);
subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId);
pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId); pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId);
//Waiting for the first sub connection be closed //Waiting for the first sub connection be closed
assertTrue(waitConnectionClosed(subConnection1)); assertTrue(waitConnectionClosed(subConnection1));
Wait.assertEquals(1, locateMQTTPM(servers[1]).getConnectedClients()::size);
subConnection1 = null;
subConnection2.subscribe(topics); subConnection2.subscribe(topics);
@ -485,11 +505,11 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
pubConnection.publish("anycast/test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false); pubConnection.publish("anycast/test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false);
pubConnection.publish("anycast/test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false); pubConnection.publish("anycast/test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false);
Message message11 = subConnection2.receive(5, TimeUnit.SECONDS); Message message11 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
assertNull(message11); assertNull(message11);
Message message21 = subConnection2.receive(5, TimeUnit.SECONDS); Message message21 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
assertNull(message21); assertNull(message21);
Message message31 = subConnection2.receive(5, TimeUnit.SECONDS); Message message31 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
assertNull(message31); assertNull(message31);
} finally { } finally {
@ -523,7 +543,7 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
BlockingConnection connection2 = null; BlockingConnection connection2 = null;
try { try {
//Waiting for resource initialization to complete //Waiting for resource initialization to complete
Thread.sleep(5000); Thread.sleep(1000);
Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)}; Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)};
connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1); connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1);
connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2); connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2);
@ -608,6 +628,20 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
} }
MQTTProtocolManager locateMQTTPM(ActiveMQServer server) {
RemotingServiceImpl impl = (RemotingServiceImpl) server.getRemotingService();
for (Acceptor acceptor : impl.getAcceptors().values()) {
AbstractAcceptor abstractAcceptor = (AbstractAcceptor) acceptor;
for (ProtocolManager manager : abstractAcceptor.getProtocolMap().values()) {
if (manager instanceof MQTTProtocolManager) {
return (MQTTProtocolManager) manager;
}
}
}
return null;
}
@Test @Test
public void useSameClientIdAndMulticastSubscribeRemoteQueueWildCard() throws Exception { public void useSameClientIdAndMulticastSubscribeRemoteQueueWildCard() throws Exception {
final String MULTICAST_TOPIC = "multicast/test/+/some/#"; final String MULTICAST_TOPIC = "multicast/test/+/some/#";
@ -624,9 +658,10 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
BlockingConnection pubConnection = null; BlockingConnection pubConnection = null;
try { try {
//Waiting for resource initialization to complete //Waiting for resource initialization to complete
Thread.sleep(5000); Thread.sleep(1000);
Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)}; Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)};
subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId); subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId);
Wait.assertEquals(1, locateMQTTPM(servers[0]).getConnectedClients()::size);
subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId);
pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId); pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId);
@ -673,11 +708,11 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
pubConnection.publish("multicast/test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false); pubConnection.publish("multicast/test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false);
pubConnection.publish("multicast/test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false); pubConnection.publish("multicast/test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false);
Message message11 = subConnection2.receive(5, TimeUnit.SECONDS); Message message11 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
assertNull(message11); assertNull(message11);
Message message21 = subConnection2.receive(5, TimeUnit.SECONDS); Message message21 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
assertNull(message21); assertNull(message21);
Message message31 = subConnection2.receive(5, TimeUnit.SECONDS); Message message31 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
assertNull(message31); assertNull(message31);
} finally { } finally {
@ -712,10 +747,12 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
BlockingConnection connection2 = null; BlockingConnection connection2 = null;
try { try {
//Waiting for resource initialization to complete //Waiting for resource initialization to complete
Thread.sleep(5000); Thread.sleep(1000);
Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)}; Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)};
connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1); connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1);
Wait.assertEquals(1, locateMQTTPM(servers[0]).getConnectedClients()::size);
connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2); connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2);
Wait.assertEquals(1, locateMQTTPM(servers[1]).getConnectedClients()::size);
// Subscribe to topics // Subscribe to topics
connection1.subscribe(topics); connection1.subscribe(topics);
@ -817,7 +854,7 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
BlockingConnection connection2 = null; BlockingConnection connection2 = null;
try { try {
//Waiting for resource initialization to complete //Waiting for resource initialization to complete
Thread.sleep(5000); Thread.sleep(1000);
connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1); connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1);
connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2); connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2);
// Subscribe to topics // Subscribe to topics
@ -923,8 +960,11 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
BlockingConnection pubConnection = null; BlockingConnection pubConnection = null;
try { try {
//Waiting for resource initialization to complete //Waiting for resource initialization to complete
Thread.sleep(5000); Thread.sleep(1000);
subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId); subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId);
Wait.assertEquals(1, locateMQTTPM(servers[0]).getConnectedClients()::size);
subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId);
pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId); pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId);
@ -985,11 +1025,11 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
message11.ack(); message11.ack();
assertEquals(payload4, new String(message11.getPayload())); assertEquals(payload4, new String(message11.getPayload()));
Message message21 = subConnection2.receive(5, TimeUnit.SECONDS); Message message21 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
assertNull(message21); assertNull(message21);
Message message31 = subConnection2.receive(5, TimeUnit.SECONDS); Message message31 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
assertNull(message31); assertNull(message31);
Message message41 = subConnection2.receive(5, TimeUnit.SECONDS); Message message41 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
assertNull(message41); assertNull(message41);
} finally { } finally {
@ -1024,10 +1064,11 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
BlockingConnection pubConnection = null; BlockingConnection pubConnection = null;
try { try {
//Waiting for resource initialization to complete //Waiting for resource initialization to complete
Thread.sleep(5000); Thread.sleep(1000);
pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId); pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId);
subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId); subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId);
Wait.assertEquals(2, locateMQTTPM(servers[0]).getConnectedClients()::size);
subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId);
//Waiting for the first sub connection be closed //Waiting for the first sub connection be closed
@ -1086,11 +1127,11 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
pubConnection.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); pubConnection.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false);
pubConnection.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); pubConnection.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false);
Message message11 = subConnection3.receive(5, TimeUnit.SECONDS); Message message11 = subConnection3.receive(100, TimeUnit.MILLISECONDS);
assertNull(message11); assertNull(message11);
Message message21 = subConnection3.receive(5, TimeUnit.SECONDS); Message message21 = subConnection3.receive(100, TimeUnit.MILLISECONDS);
assertNull(message21); assertNull(message21);
Message message31 = subConnection3.receive(5, TimeUnit.SECONDS); Message message31 = subConnection3.receive(100, TimeUnit.MILLISECONDS);
assertNull(message31); assertNull(message31);
@ -1131,7 +1172,7 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
BlockingConnection connection3 = null; BlockingConnection connection3 = null;
try { try {
//Waiting for resource initialization to complete //Waiting for resource initialization to complete
Thread.sleep(5000); Thread.sleep(1000);
connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1); connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1);
connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2); connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2);
connection3 = retrieveMQTTConnection("tcp://localhost:61617", clientId3); connection3 = retrieveMQTTConnection("tcp://localhost:61617", clientId3);