ARTEMIS-5173 Removing test not well designed
This test has a variable number of messages sent. it would need to be rewrittent and improved. Best to just let it go.
This commit is contained in:
parent
899f88ab51
commit
c7ec3c7c77
|
@ -17,24 +17,13 @@
|
|||
package org.apache.activemq.artemis.tests.integration.cluster.distribution;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
|
||||
import org.apache.activemq.artemis.core.postoffice.impl.BindingsImpl;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||
|
@ -46,9 +35,6 @@ import org.apache.activemq.artemis.core.server.group.impl.Proposal;
|
|||
import org.apache.activemq.artemis.core.server.group.impl.Response;
|
||||
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
|
||||
import org.apache.activemq.artemis.core.server.management.Notification;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -523,243 +509,6 @@ public class ClusteredGroupingTest extends ClusterTestBase {
|
|||
assertNull(servers[0].getGroupingHandler().getProposal(SimpleString.of("id1.queue0"), false), "Group should have timed out");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGroupingWith3Nodes() throws Exception {
|
||||
final String ADDRESS = "queues.testaddress";
|
||||
final String QUEUE = "queue0";
|
||||
|
||||
setupServer(0, isFileStorage(), isNetty());
|
||||
setupServer(1, isFileStorage(), isNetty());
|
||||
setupServer(2, isFileStorage(), isNetty());
|
||||
|
||||
setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2);
|
||||
setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2);
|
||||
setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1);
|
||||
|
||||
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0, 10000, 500, 750);
|
||||
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1, 10000, 500, 750);
|
||||
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2, 10000, 500, 750);
|
||||
|
||||
startServers(0, 1, 2);
|
||||
|
||||
AddressSettings addressSettings = new AddressSettings();
|
||||
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
|
||||
servers[0].getAddressSettingsRepository().addMatch("#", addressSettings);
|
||||
servers[1].getAddressSettingsRepository().addMatch("#", addressSettings);
|
||||
servers[2].getAddressSettingsRepository().addMatch("#", addressSettings);
|
||||
|
||||
setupSessionFactory(0, isNetty());
|
||||
|
||||
// need to set up reconnect attempts on this session factory because the test will restart node 1
|
||||
setupSessionFactory(1, isNetty(), 15);
|
||||
|
||||
setupSessionFactory(2, isNetty());
|
||||
|
||||
createQueue(0, ADDRESS, QUEUE, null, true);
|
||||
createQueue(1, ADDRESS, QUEUE, null, true);
|
||||
createQueue(2, ADDRESS, QUEUE, null, true);
|
||||
|
||||
waitForBindings(0, ADDRESS, 1, 0, true);
|
||||
waitForBindings(1, ADDRESS, 1, 0, true);
|
||||
waitForBindings(2, ADDRESS, 1, 0, true);
|
||||
|
||||
waitForBindings(0, ADDRESS, 2, 0, false);
|
||||
waitForBindings(1, ADDRESS, 2, 0, false);
|
||||
waitForBindings(2, ADDRESS, 2, 0, false);
|
||||
|
||||
final ClientSessionFactory sf0 = sfs[0];
|
||||
final ClientSessionFactory sf1 = sfs[1];
|
||||
final ClientSessionFactory sf2 = sfs[2];
|
||||
|
||||
final ClientSession session = addClientSession(sf1.createSession(false, false, false));
|
||||
final ClientProducer producer = addClientProducer(session.createProducer(ADDRESS));
|
||||
List<String> groups = new ArrayList<>();
|
||||
|
||||
final AtomicInteger totalMessageProduced = new AtomicInteger(0);
|
||||
|
||||
// create a bunch of groups and save a few group IDs for use later
|
||||
for (int i = 0; i < 500; i++) {
|
||||
ClientMessage message = session.createMessage(true);
|
||||
String group = UUID.randomUUID().toString();
|
||||
message.putStringProperty(Message.HDR_GROUP_ID, SimpleString.of(group));
|
||||
SimpleString dupID = SimpleString.of(UUID.randomUUID().toString());
|
||||
message.putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID);
|
||||
if (i % 100 == 0) {
|
||||
groups.add(group);
|
||||
}
|
||||
producer.send(message);
|
||||
logger.trace("Sent message to server 1 with dupID: {}", dupID);
|
||||
}
|
||||
|
||||
session.commit();
|
||||
totalMessageProduced.addAndGet(500);
|
||||
logger.trace("Sent block of 500 messages to server 1. Total sent: {}", totalMessageProduced.get());
|
||||
session.close();
|
||||
|
||||
// need thread pool to service both consumers and producers plus a thread to cycle nodes
|
||||
ExecutorService executorService = Executors.newFixedThreadPool(groups.size() * 2 + 1, ActiveMQThreadFactory.defaultThreadFactory(getClass().getName()));
|
||||
|
||||
final AtomicInteger producerCounter = new AtomicInteger(0);
|
||||
final CountDownLatch okToConsume = new CountDownLatch(groups.size() + 1);
|
||||
|
||||
final AtomicInteger errors = new AtomicInteger(0);
|
||||
|
||||
final long timeToRun = System.currentTimeMillis() + 5000;
|
||||
|
||||
// spin up a bunch of threads to pump messages into some of the groups
|
||||
for (final String groupx : groups) {
|
||||
final Runnable r = () -> {
|
||||
|
||||
String group = groupx;
|
||||
|
||||
String basicID = UUID.randomUUID().toString();
|
||||
logger.debug("Starting producer thread...");
|
||||
ClientSessionFactory factory;
|
||||
ClientSession session12 = null;
|
||||
ClientProducer producer1 = null;
|
||||
int targetServer = 0;
|
||||
|
||||
try {
|
||||
|
||||
int count = producerCounter.incrementAndGet();
|
||||
if (count % 3 == 0) {
|
||||
factory = sf2;
|
||||
targetServer = 2;
|
||||
} else if (count % 2 == 0) {
|
||||
factory = sf1;
|
||||
targetServer = 1;
|
||||
} else {
|
||||
factory = sf0;
|
||||
}
|
||||
logger.debug("Creating producer session factory to node {}", targetServer);
|
||||
session12 = addClientSession(factory.createSession(false, true, true));
|
||||
producer1 = addClientProducer(session12.createProducer(ADDRESS));
|
||||
} catch (Exception e) {
|
||||
errors.incrementAndGet();
|
||||
logger.warn("Producer thread couldn't establish connection", e);
|
||||
return;
|
||||
}
|
||||
|
||||
int messageCount = 0;
|
||||
|
||||
while (timeToRun > System.currentTimeMillis()) {
|
||||
ClientMessage message = session12.createMessage(true);
|
||||
message.putStringProperty(Message.HDR_GROUP_ID, SimpleString.of(group));
|
||||
SimpleString dupID = SimpleString.of(basicID + ":" + messageCount);
|
||||
message.putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID);
|
||||
try {
|
||||
producer1.send(message);
|
||||
totalMessageProduced.incrementAndGet();
|
||||
messageCount++;
|
||||
} catch (ActiveMQException e) {
|
||||
logger.warn("Producer thread threw exception while sending messages to {}: {}", targetServer, e.getMessage());
|
||||
// in case of a failure we change the group to make possible errors more likely
|
||||
group = group + "afterFail";
|
||||
} catch (Exception e) {
|
||||
logger.warn("Producer thread threw unexpected exception while sending messages to {}: {}", targetServer, e.getMessage());
|
||||
group = group + "afterFail";
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
okToConsume.countDown();
|
||||
};
|
||||
|
||||
executorService.execute(r);
|
||||
}
|
||||
|
||||
Runnable r = () -> {
|
||||
try {
|
||||
try {
|
||||
Thread.sleep(2000);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
// ignore
|
||||
}
|
||||
cycleServer(1);
|
||||
} finally {
|
||||
okToConsume.countDown();
|
||||
}
|
||||
};
|
||||
|
||||
executorService.execute(r);
|
||||
|
||||
final AtomicInteger consumerCounter = new AtomicInteger(0);
|
||||
final AtomicInteger totalMessagesConsumed = new AtomicInteger(0);
|
||||
final CountDownLatch okToEndTest = new CountDownLatch(groups.size());
|
||||
|
||||
// spin up a bunch of threads to consume messages
|
||||
for (final String group : groups) {
|
||||
r = () -> {
|
||||
try {
|
||||
logger.debug("Waiting to start consumer thread...");
|
||||
okToConsume.await(20, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
return;
|
||||
}
|
||||
logger.debug("Starting consumer thread...");
|
||||
ClientSessionFactory factory;
|
||||
ClientSession session1 = null;
|
||||
ClientConsumer consumer = null;
|
||||
int targetServer = 0;
|
||||
|
||||
try {
|
||||
synchronized (consumerCounter) {
|
||||
if (consumerCounter.get() % 3 == 0) {
|
||||
factory = sf2;
|
||||
targetServer = 2;
|
||||
} else if (consumerCounter.get() % 2 == 0) {
|
||||
factory = sf1;
|
||||
targetServer = 1;
|
||||
} else {
|
||||
factory = sf0;
|
||||
}
|
||||
logger.debug("Creating consumer session factory to node {}", targetServer);
|
||||
session1 = addClientSession(factory.createSession(false, false, true));
|
||||
consumer = addClientConsumer(session1.createConsumer(QUEUE));
|
||||
session1.start();
|
||||
consumerCounter.incrementAndGet();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.debug("Consumer thread couldn't establish connection", e);
|
||||
errors.incrementAndGet();
|
||||
return;
|
||||
}
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
ClientMessage m = consumer.receive(1000);
|
||||
if (m == null) {
|
||||
okToEndTest.countDown();
|
||||
return;
|
||||
}
|
||||
m.acknowledge();
|
||||
logger.trace("Consumed message {} from server {}. Total consumed: {}", m.getStringProperty(Message.HDR_DUPLICATE_DETECTION_ID), targetServer, totalMessagesConsumed.incrementAndGet());
|
||||
} catch (ActiveMQException e) {
|
||||
errors.incrementAndGet();
|
||||
logger.warn("Consumer thread threw exception while receiving messages from server {}.: {}", targetServer, e.getMessage());
|
||||
} catch (Exception e) {
|
||||
errors.incrementAndGet();
|
||||
logger.warn("Consumer thread threw unexpected exception while receiving messages from server {}.: {}", targetServer, e.getMessage());
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
executorService.execute(r);
|
||||
}
|
||||
// wait for the threads to complete their consuming
|
||||
okToEndTest.await(20, TimeUnit.SECONDS);
|
||||
|
||||
executorService.shutdownNow();
|
||||
executorService.awaitTermination(10, TimeUnit.SECONDS);
|
||||
|
||||
assertEquals(0, errors.get());
|
||||
|
||||
assertEquals(totalMessageProduced.longValue(), totalMessagesConsumed.longValue());
|
||||
}
|
||||
|
||||
private void cycleServer(int node) {
|
||||
try {
|
||||
stopServers(node);
|
||||
|
|
Loading…
Reference in New Issue