NO-JIRA Reuse getMessageCount across RealServerTestBase

This commit is contained in:
Clebert Suconic 2024-06-05 17:33:02 -04:00
parent c378ef9da7
commit 047bc98cc3
9 changed files with 54 additions and 115 deletions

View File

@ -43,6 +43,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.core.management.SimpleManagement;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
@ -284,4 +285,22 @@ public class RealServerTestBase extends ActiveMQTestBase {
outputStream.close();
}
protected long getMessageCount(String uri, String queueName) throws Exception {
SimpleManagement management = new SimpleManagement(uri, null, null);
return getMessageCount(management, queueName);
}
protected long getMessageCount(SimpleManagement simpleManagement, String queue) throws Exception {
try {
long value = simpleManagement.getMessageCountOnQueue(queue);
logger.info("count on queue {} is {}", queue, value);
return value;
} catch (Exception e) {
logger.warn(e.getMessage(), e);
return -1;
}
}
}

View File

@ -200,18 +200,4 @@ public class MirroredSubscriptionTest extends SmokeTestBase {
Wait.assertEquals(NUMBER_OF_MESSAGES, () -> getMessageCount(secondURI, "client" + clientID + ".subscription" + clientID));
}
}
long getMessageCount(String uri, String queueName) throws Exception {
SimpleManagement management = new SimpleManagement(uri, null, null);
try {
return management.getMessageCountOnQueue(queueName);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
// if an exception happened during a retry
// we just return -1, so the retries will keep coming
return -1;
}
}
}

View File

@ -361,19 +361,4 @@ public class CheckTest extends SmokeTestBase {
Wait.assertEquals(0, () -> getMessageCount(simpleManagement, queueName), 1_000);
}
// using a method here to capture eventual exceptions allowing retries
int getMessageCount(SimpleManagement management, String queueName) throws Exception {
try {
return (int) management.getMessageCountOnQueue(queueName);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
// if an exception happened during a retry
// we just return -1, so the retries will keep coming
return -1;
}
}
}

View File

@ -345,7 +345,7 @@ public class ClusteredMirrorSoakTest extends SoakTestBase {
processDC2_node_A.waitFor();
processDC2_node_A = startServer(DC2_NODE_A, 2, 5000, new File(getServerLocation(DC2_NODE_A), "broker.properties"));
Wait.assertEquals(0L, () -> getCount(simpleManagementDC1A, snfQueue), 250_000, 1000);
Wait.assertEquals(0L, () -> getMessageCount(simpleManagementDC1A, snfQueue), 250_000, 1000);
try (Connection connection = connectionFactoryDC2A.createConnection()) {
connection.start();
@ -560,7 +560,7 @@ public class ClusteredMirrorSoakTest extends SoakTestBase {
processDC2_node_B.waitFor();
processDC2_node_B = startServer(DC2_NODE_B, 3, 5000, new File(getServerLocation(DC2_NODE_B), "broker.properties"));
Wait.assertEquals(0L, () -> getCount(simpleManagementDC1B, snfQueue), 250_000, 1000);
Wait.assertEquals(0L, () -> getMessageCount(simpleManagementDC1B, snfQueue), 250_000, 1000);
Wait.assertEquals(numberOfMessages / 2, () -> simpleManagementDC2B.getMessageCountOnQueue("nodeB.my-order"), 10000);
logger.debug("Consuming from DC2B with {}", simpleManagementDC2B.getMessageCountOnQueue("nodeB.my-order"));
@ -574,12 +574,6 @@ public class ClusteredMirrorSoakTest extends SoakTestBase {
logger.debug("DC1B nodeB.my-order=0");
}
public long getCount(SimpleManagement simpleManagement, String queue) throws Exception {
long value = simpleManagement.getMessageCountOnQueue(queue);
logger.debug("count on queue {} is {}", queue, value);
return value;
}
private static void consume(ConnectionFactory factory, String clientID, String subscriptionID, int start, int numberOfMessages, boolean expectEmpty) throws Exception {
try (Connection connection = factory.createConnection()) {
connection.setClientID(clientID);

View File

@ -228,9 +228,9 @@ public class IdempotentACKTest extends SoakTestBase {
SimpleManagement simpleManagementDC1A = new SimpleManagement(DC1_NODEA_URI, null, null);
SimpleManagement simpleManagementDC2A = new SimpleManagement(DC2_NODEA_URI, null, null);
Wait.assertEquals(0, () -> getCount(simpleManagementDC1A, snfQueue));
Wait.assertEquals(numberOfMessages, () -> getCount(simpleManagementDC1A, QUEUE_NAME));
Wait.assertEquals(numberOfMessages, () -> getCount(simpleManagementDC2A, QUEUE_NAME));
Wait.assertEquals(0, () -> getMessageCount(simpleManagementDC1A, snfQueue));
Wait.assertEquals(numberOfMessages, () -> getMessageCount(simpleManagementDC1A, QUEUE_NAME));
Wait.assertEquals(numberOfMessages, () -> getMessageCount(simpleManagementDC2A, QUEUE_NAME));
CountDownLatch latchKill = new CountDownLatch(consumers);
@ -296,10 +296,10 @@ public class IdempotentACKTest extends SoakTestBase {
// after all flushed messages, we should have 0 messages on both nodes
Wait.assertEquals(0, () -> getCount(simpleManagementDC1A, snfQueue));
Wait.assertEquals(0, () -> getCount(simpleManagementDC2A, snfQueue));
Wait.assertEquals(0, () -> getCount(simpleManagementDC1A, QUEUE_NAME));
Wait.assertEquals(0, () -> getCount(simpleManagementDC2A, QUEUE_NAME));
Wait.assertEquals(0, () -> getMessageCount(simpleManagementDC1A, snfQueue));
Wait.assertEquals(0, () -> getMessageCount(simpleManagementDC2A, snfQueue));
Wait.assertEquals(0, () -> getMessageCount(simpleManagementDC1A, QUEUE_NAME));
Wait.assertEquals(0, () -> getMessageCount(simpleManagementDC2A, QUEUE_NAME));
}
private void restartDC1_ServerA() throws Exception {
@ -308,10 +308,4 @@ public class IdempotentACKTest extends SoakTestBase {
processDC1_node_A = startServer(DC1_NODE_A, -1, -1, new File(getServerLocation(DC1_NODE_A), "broker.properties"));
ServerUtil.waitForServerToStart(0, 10_000);
}
public long getCount(SimpleManagement simpleManagement, String queue) throws Exception {
long value = simpleManagement.getMessageCountOnQueue(queue);
logger.debug("count on queue {} is {}", queue, value);
return value;
}
}

View File

@ -262,10 +262,10 @@ public class InterruptedLargeMessageTest extends SoakTestBase {
session.commit();
}
Wait.assertEquals(0, () -> getCount(simpleManagementDC1A, SNF_QUEUE));
Wait.assertEquals(0, () -> getCount(simpleManagementDC2A, SNF_QUEUE));
Wait.assertEquals(0, () -> getCount(simpleManagementDC2A, QUEUE_NAME));
Wait.assertEquals(0, () -> getCount(simpleManagementDC1A, QUEUE_NAME));
Wait.assertEquals(0, () -> getMessageCount(simpleManagementDC1A, SNF_QUEUE));
Wait.assertEquals(0, () -> getMessageCount(simpleManagementDC2A, SNF_QUEUE));
Wait.assertEquals(0, () -> getMessageCount(simpleManagementDC2A, QUEUE_NAME));
Wait.assertEquals(0, () -> getMessageCount(simpleManagementDC1A, QUEUE_NAME));
Wait.assertEquals(0, () -> getNumberOfLargeMessages(DC1_NODE_A), 5000);
Wait.assertEquals(0, () -> getNumberOfLargeMessages(DC2_NODE_A), 5000);
@ -296,10 +296,4 @@ public class InterruptedLargeMessageTest extends SoakTestBase {
processDC2_node_A = startServer(DC2_NODE_A, -1, -1, new File(getServerLocation(DC2_NODE_A), "broker.properties"));
ServerUtil.waitForServerToStart(2, 10_000);
}
public long getCount(SimpleManagement simpleManagement, String queue) throws Exception {
long value = simpleManagement.getMessageCountOnQueue(queue);
logger.debug("count on queue {} is {}", queue, value);
return value;
}
}

View File

@ -175,8 +175,8 @@ public class PagedSNFSoakTest extends SoakTestBase {
Wait.assertEquals((long) numberOfMessages, () -> simpleManagementDC1A.getMessageCountOnQueue(QUEUE_NAME), 5000, 100);
Wait.assertEquals((long) 0, () -> getCount("DC1", simpleManagementDC1A, SNF_QUEUE), 5_000, 100);
Wait.assertEquals((long) numberOfMessages, () -> getCount("DC2", simpleManagementDC2A, QUEUE_NAME), 5_000, 100);
Wait.assertEquals((long) 0, () -> getMessageCount(simpleManagementDC1A, SNF_QUEUE), 5_000, 100);
Wait.assertEquals((long) numberOfMessages, () -> getMessageCount(simpleManagementDC2A, QUEUE_NAME), 5_000, 100);
try (Connection connection = connectionFactoryDC1A.createConnection()) {
connection.start();
@ -330,15 +330,4 @@ public class PagedSNFSoakTest extends SoakTestBase {
processDC2_node_A = startServer(DC2_NODE_A, -1, -1, new File(getServerLocation(DC2_NODE_A), "broker.properties"));
ServerUtil.waitForServerToStart(2, 10_000);
}
public long getCount(String place, SimpleManagement simpleManagement, String queue) throws Exception {
try {
long value = simpleManagement.getMessageCountOnQueue(queue);
logger.info("count on {}, queue {} is {}", place, queue, value);
return value;
} catch (Exception e) {
logger.warn(e.getMessage(), e);
return -1;
}
}
}

View File

@ -265,10 +265,10 @@ public class ReplicatedMirrorTargetTest extends SoakTestBase {
runAfter(() -> managementDC1.close());
runAfter(() -> managementDC2.close());
Wait.assertEquals(0, () -> getCount(managementDC1, clientIDA + "." + subscriptionID));
Wait.assertEquals(0, () -> getCount(managementDC2, clientIDA + "." + subscriptionID));
Wait.assertEquals(0, () -> getCount(managementDC1, clientIDB + "." + subscriptionID));
Wait.assertEquals(0, () -> getCount(managementDC2, clientIDB + "." + subscriptionID));
Wait.assertEquals(0, () -> getMessageCount(managementDC1, clientIDA + "." + subscriptionID));
Wait.assertEquals(0, () -> getMessageCount(managementDC2, clientIDA + "." + subscriptionID));
Wait.assertEquals(0, () -> getMessageCount(managementDC1, clientIDB + "." + subscriptionID));
Wait.assertEquals(0, () -> getMessageCount(managementDC2, clientIDB + "." + subscriptionID));
ExecutorService executorService = Executors.newFixedThreadPool(3);
runAfter(executorService::shutdownNow);
@ -338,12 +338,12 @@ public class ReplicatedMirrorTargetTest extends SoakTestBase {
consumerDone.await(SNF_TIMEOUT, TimeUnit.MILLISECONDS);
Wait.assertEquals(0, () -> getCount(managementDC1, snfQueue), SNF_TIMEOUT);
Wait.assertEquals(0, () -> getCount(managementDC2, snfQueue), SNF_TIMEOUT);
Wait.assertEquals(NUMBER_MESSAGES - NUMBER_MESSAGES_RECEIVE, () -> getCount(managementDC1, clientIDA + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
Wait.assertEquals(NUMBER_MESSAGES - NUMBER_MESSAGES_RECEIVE, () -> getCount(managementDC1, clientIDB + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
Wait.assertEquals(NUMBER_MESSAGES - NUMBER_MESSAGES_RECEIVE, () -> getCount(managementDC2, clientIDA + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
Wait.assertEquals(NUMBER_MESSAGES - NUMBER_MESSAGES_RECEIVE, () -> getCount(managementDC2, clientIDB + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
Wait.assertEquals(0, () -> getMessageCount(managementDC1, snfQueue), SNF_TIMEOUT);
Wait.assertEquals(0, () -> getMessageCount(managementDC2, snfQueue), SNF_TIMEOUT);
Wait.assertEquals(NUMBER_MESSAGES - NUMBER_MESSAGES_RECEIVE, () -> getMessageCount(managementDC1, clientIDA + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
Wait.assertEquals(NUMBER_MESSAGES - NUMBER_MESSAGES_RECEIVE, () -> getMessageCount(managementDC1, clientIDB + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
Wait.assertEquals(NUMBER_MESSAGES - NUMBER_MESSAGES_RECEIVE, () -> getMessageCount(managementDC2, clientIDA + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
Wait.assertEquals(NUMBER_MESSAGES - NUMBER_MESSAGES_RECEIVE, () -> getMessageCount(managementDC2, clientIDB + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
destroyServers();
@ -409,15 +409,4 @@ public class ReplicatedMirrorTargetTest extends SoakTestBase {
}
}
}
public long getCount(SimpleManagement simpleManagement, String queue) {
try {
long value = simpleManagement.getMessageCountOnQueue(queue);
logger.info("Queue {} count = {}", queue, value);
return value;
} catch (Exception e) {
logger.warn(e.getMessage(), e);
return -1;
}
}
}

View File

@ -222,10 +222,10 @@ public class SingleMirrorSoakTest extends SoakTestBase {
runAfter(() -> managementDC1.close());
runAfter(() -> managementDC2.close());
Wait.assertEquals(0, () -> getCount(managementDC1, clientIDA + "." + subscriptionID));
Wait.assertEquals(0, () -> getCount(managementDC2, clientIDA + "." + subscriptionID));
Wait.assertEquals(0, () -> getCount(managementDC1, clientIDB + "." + subscriptionID));
Wait.assertEquals(0, () -> getCount(managementDC2, clientIDB + "." + subscriptionID));
Wait.assertEquals(0, () -> getMessageCount(managementDC1, clientIDA + "." + subscriptionID));
Wait.assertEquals(0, () -> getMessageCount(managementDC2, clientIDA + "." + subscriptionID));
Wait.assertEquals(0, () -> getMessageCount(managementDC1, clientIDB + "." + subscriptionID));
Wait.assertEquals(0, () -> getMessageCount(managementDC2, clientIDB + "." + subscriptionID));
ExecutorService executorService = Executors.newFixedThreadPool(3);
runAfter(executorService::shutdownNow);
@ -289,12 +289,12 @@ public class SingleMirrorSoakTest extends SoakTestBase {
consumerDone.await(SNF_TIMEOUT, TimeUnit.MILLISECONDS);
Wait.assertEquals(0, () -> getCount(managementDC1, snfQueue), SNF_TIMEOUT);
Wait.assertEquals(0, () -> getCount(managementDC2, snfQueue), SNF_TIMEOUT);
Wait.assertEquals(NUMBER_MESSAGES - NUMBER_MESSAGES_RECEIVE, () -> getCount(managementDC1, clientIDA + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
Wait.assertEquals(NUMBER_MESSAGES - NUMBER_MESSAGES_RECEIVE, () -> getCount(managementDC1, clientIDB + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
Wait.assertEquals(NUMBER_MESSAGES - NUMBER_MESSAGES_RECEIVE, () -> getCount(managementDC2, clientIDA + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
Wait.assertEquals(NUMBER_MESSAGES - NUMBER_MESSAGES_RECEIVE, () -> getCount(managementDC2, clientIDB + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
Wait.assertEquals(0, () -> getMessageCount(managementDC1, snfQueue), SNF_TIMEOUT);
Wait.assertEquals(0, () -> getMessageCount(managementDC2, snfQueue), SNF_TIMEOUT);
Wait.assertEquals(NUMBER_MESSAGES - NUMBER_MESSAGES_RECEIVE, () -> getMessageCount(managementDC1, clientIDA + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
Wait.assertEquals(NUMBER_MESSAGES - NUMBER_MESSAGES_RECEIVE, () -> getMessageCount(managementDC1, clientIDB + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
Wait.assertEquals(NUMBER_MESSAGES - NUMBER_MESSAGES_RECEIVE, () -> getMessageCount(managementDC2, clientIDA + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
Wait.assertEquals(NUMBER_MESSAGES - NUMBER_MESSAGES_RECEIVE, () -> getMessageCount(managementDC2, clientIDB + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
destroyServers();
@ -360,15 +360,4 @@ public class SingleMirrorSoakTest extends SoakTestBase {
}
}
}
public long getCount(SimpleManagement simpleManagement, String queue) {
try {
long value = simpleManagement.getMessageCountOnQueue(queue);
logger.info("Queue {} count = {}", queue, value);
return value;
} catch (Exception e) {
logger.warn(e.getMessage(), e);
return -1;
}
}
}