NO-JIRA Avoiding Intermittent failures on FederatedQueueTest

This commit is contained in:
Clebert Suconic 2020-04-08 16:49:57 -04:00
parent 679bc1a3a0
commit fa67499509
2 changed files with 47 additions and 48 deletions

View File

@ -26,13 +26,14 @@ import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.FederationConfiguration;
import org.apache.activemq.artemis.core.config.federation.FederationQueuePolicyConfiguration;
import org.apache.activemq.artemis.core.config.federation.FederationUpstreamConfiguration;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Before;
@ -51,6 +52,12 @@ public class FederatedQueueTest extends FederatedTestBase {
}
@Override
protected void configureQueues(ActiveMQServer server) throws Exception {
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false));
createSimpleQueue(server, getName());
}
protected ConnectionFactory getCF(int i) throws Exception {
return new ActiveMQConnectionFactory("vm://" + i);
}
@ -110,10 +117,8 @@ public class FederatedQueueTest extends FederatedTestBase {
MessageConsumer consumer0 = session0.createConsumer(queue0);
MessageConsumer consumer1 = session1.createConsumer(queue1);
Wait.assertTrue(() -> getServer(1).getPostOffice().getBinding(SimpleString.toSimpleString(queueName)) != null);
//Wait for local and federated consumer to be established on Server 1
assertTrue(Wait.waitFor(() -> getServer(1).locateQueue(SimpleString.toSimpleString(queueName)).getConsumerCount() == 2,
5000, 100));
Wait.waitFor(() -> getConsumerCount(getServer(1), queueName, 2));
MessageProducer producer1 = session1.createProducer(queue1);
producer1.send(session1.createTextMessage("hello"));
@ -265,10 +270,6 @@ public class FederatedQueueTest extends FederatedTestBase {
@Test
public void testFederatedQueueBiDirectionalUpstream() throws Exception {
String queueName = getName();
//Set queue up on both brokers
for (int i = 0; i < 2; i++) {
getServer(i).createQueue(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false);
}
FederationConfiguration federationConfiguration0 = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", queueName);
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration0);
getServer(0).getFederationManager().deploy();
@ -283,10 +284,6 @@ public class FederatedQueueTest extends FederatedTestBase {
@Test
public void testFederatedQueueBiDirectionalDownstream() throws Exception {
String queueName = getName();
//Set queue up on both brokers
for (int i = 0; i < 2; i++) {
getServer(i).createQueue(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false);
}
FederationConfiguration federationConfiguration0 = FederatedTestUtil.createQueueDownstreamFederationConfiguration("server1", queueName, "server0");
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration0);
getServer(0).getFederationManager().deploy();
@ -301,10 +298,6 @@ public class FederatedQueueTest extends FederatedTestBase {
@Test
public void testFederatedQueueBiDirectionalDownstreamUpstream() throws Exception {
String queueName = getName();
//Set queue up on both brokers
for (int i = 0; i < 2; i++) {
getServer(i).createQueue(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false);
}
FederationConfiguration federationConfiguration0 = FederatedTestUtil.createQueueDownstreamFederationConfiguration("server1-downstream",
"server1", queueName, null, false, "server0");
@ -319,10 +312,6 @@ public class FederatedQueueTest extends FederatedTestBase {
@Test
public void testFederatedQueueBiDirectionalDownstreamUpstreamSharedConnection() throws Exception {
String queueName = getName();
//Set queue up on both brokers
for (int i = 0; i < 2; i++) {
getServer(i).createQueue(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false);
}
FederationConfiguration federationConfiguration0 = FederatedTestUtil.createQueueDownstreamFederationConfiguration("server1-downstream",
"server1", queueName, null, true, "server0");
@ -338,10 +327,6 @@ public class FederatedQueueTest extends FederatedTestBase {
@Test
public void testFederatedQueueShareUpstreamConnectionFalse() throws Exception {
String queueName = getName();
//Set queue up on both brokers
for (int i = 0; i < 2; i++) {
getServer(i).createQueue(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false);
}
FederationConfiguration federationConfiguration0 = FederatedTestUtil.createQueueDownstreamFederationConfiguration("server1-downstream",
"server1", queueName, null, false, "server0");
@ -355,10 +340,6 @@ public class FederatedQueueTest extends FederatedTestBase {
@Test
public void testFederatedQueueShareUpstreamConnectionTrue() throws Exception {
String queueName = getName();
//Set queue up on both brokers
for (int i = 0; i < 2; i++) {
getServer(i).createQueue(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false);
}
FederationConfiguration federationConfiguration0 = FederatedTestUtil.createQueueDownstreamFederationConfiguration("server1-downstream",
"server1", queueName, null, true, "server0");
@ -459,11 +440,6 @@ public class FederatedQueueTest extends FederatedTestBase {
public void testFederatedQueueChainOfBrokers() throws Exception {
String queueName = getName();
//Set queue up on all three brokers
for (int i = 0; i < 3; i++) {
getServer(i).createQueue(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false);
}
//Connect broker 0 (consumer will be here at end of chain) to broker 1
FederationConfiguration federationConfiguration0 = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", queueName, true);
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration0);
@ -502,11 +478,6 @@ public class FederatedQueueTest extends FederatedTestBase {
public void testFederatedQueueRemoteBrokerRestart() throws Exception {
String queueName = getName();
//Set queue up on both brokers
for (int i = 0; i < 2; i++) {
getServer(i).createQueue(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false);
}
FederationConfiguration federationConfiguration = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", queueName);
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration);
getServer(0).getFederationManager().deploy();
@ -538,6 +509,8 @@ public class FederatedQueueTest extends FederatedTestBase {
assertNull(consumer0.receiveNoWait());
getServer(1).start();
Wait.assertTrue(getServer(1)::isActive);
createSimpleQueue(getServer(1), getName());
connection1 = cf1.createConnection();
connection1.start();
@ -546,22 +519,27 @@ public class FederatedQueueTest extends FederatedTestBase {
producer = session1.createProducer(queue1);
producer.send(session1.createTextMessage("hello"));
Wait.assertTrue(() -> getServer(1).getPostOffice().getBinding(SimpleString.toSimpleString(queueName)) != null);
Wait.waitFor(() -> ((QueueBinding) getServer(1).getPostOffice().getBinding(SimpleString.toSimpleString(queueName))).consumerCount() == 1);
Wait.waitFor(() -> getConsumerCount(getServer(1), queueName, 1));
assertNotNull(consumer0.receive(1000));
}
private boolean getConsumerCount(ActiveMQServer server, String queueName, int count) {
QueueBinding binding = (QueueBinding)server.getPostOffice().getBinding(SimpleString.toSimpleString(queueName));
if (binding == null) {
return false;
}
if (binding.consumerCount() != count) {
return false;
}
return true;
}
@Test
public void testFederatedQueueLocalBrokerRestart() throws Exception {
String queueName = getName();
//Set queue up on both brokers
for (int i = 0; i < 2; i++) {
getServer(i).createQueue(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false);
}
FederationConfiguration federationConfiguration = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", queueName);
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration);
getServer(0).getFederationManager().deploy();
@ -594,6 +572,7 @@ public class FederatedQueueTest extends FederatedTestBase {
getServer(0).start();
Wait.waitFor(() -> getServer(0).isActive());
createSimpleQueue(getServer(0), getName());
connection0 = getCF(0).createConnection();
connection0.start();
@ -608,7 +587,7 @@ public class FederatedQueueTest extends FederatedTestBase {
.getBinding(SimpleString.toSimpleString(queueName)))
.consumerCount() == 1);
assertNotNull(consumer0.receive(1000));
assertNotNull(consumer0.receive(5000));
}
private Message createTextMessage(Session session1, String group) throws JMSException {

View File

@ -20,9 +20,13 @@ import java.util.ArrayList;
import java.util.List;
import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Before;
@ -47,11 +51,27 @@ public class FederatedTestBase extends ActiveMQTestBase {
config.addConnectorConfiguration("server" + j, "vm://" + j);
}
ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(config, mBeanServer, false));
servers.add(server);
server.start();
configureQueues(server);
}
}
protected void configureQueues(ActiveMQServer server) throws Exception {
}
protected void createSimpleQueue(ActiveMQServer server, String queueName) throws Exception {
SimpleString simpleStringQueueName = SimpleString.toSimpleString(queueName);
try {
server.addAddressInfo(new AddressInfo(simpleStringQueueName, RoutingType.ANYCAST));
server.createQueue(simpleStringQueueName, RoutingType.ANYCAST, simpleStringQueueName, null, true, false);
} catch (Exception ignored) {
}
}
protected int numberOfServers() {
return 3;
}