NO-JIRA Fixing intermittent failure
This commit is contained in:
parent
c699b5ed1d
commit
108ee5d24b
|
@ -58,6 +58,10 @@ public class LiveOnlyActivation extends Activation {
|
|||
this.liveOnlyPolicy = liveOnlyPolicy;
|
||||
}
|
||||
|
||||
public LiveOnlyPolicy getLiveOnlyPolicy() {
|
||||
return liveOnlyPolicy;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
|
|
|
@ -124,6 +124,7 @@ import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
|
|||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||
import org.apache.activemq.artemis.core.server.impl.Activation;
|
||||
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
||||
import org.apache.activemq.artemis.core.server.impl.LiveOnlyActivation;
|
||||
import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
|
@ -293,6 +294,16 @@ public abstract class ActiveMQTestBase extends Assert {
|
|||
if (server == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// disable scaledown on tearDown, otherwise it takes a lot of time
|
||||
try {
|
||||
((LiveOnlyActivation) server.getActivation()).getLiveOnlyPolicy().getScaleDownPolicy().setEnabled(false);
|
||||
} catch (Throwable ignored) {
|
||||
// don't care about anything here
|
||||
// if can't find activation, livePolicy or no LiveONlyActivation... don't care!!!
|
||||
// all I care is f you have scaleDownPolicy, it should be set to false at this point
|
||||
}
|
||||
|
||||
try {
|
||||
final ClusterManager clusterManager = server.getClusterManager();
|
||||
if (clusterManager != null) {
|
||||
|
|
|
@ -114,6 +114,7 @@ public class FederatedQueueTest extends FederatedTestBase {
|
|||
MessageConsumer consumer0 = session0.createConsumer(queue0);
|
||||
MessageConsumer consumer1 = session1.createConsumer(queue1);
|
||||
|
||||
Wait.assertTrue(() -> getServer(1).getPostOffice().getBinding(SimpleString.toSimpleString(queueName)) != null);
|
||||
//Wait for local and federated consumer to be established on Server 1
|
||||
assertTrue(Wait.waitFor(() -> getServer(1).locateQueue(SimpleString.toSimpleString(queueName)).getConsumerCount() == 2,
|
||||
5000, 100));
|
||||
|
@ -123,7 +124,7 @@ public class FederatedQueueTest extends FederatedTestBase {
|
|||
|
||||
//Consumer 0 should receive the message over consumer because of adjusted priority
|
||||
//to favor the federated broker
|
||||
assertNull(consumer1.receive(500));
|
||||
assertNull(consumer1.receiveNoWait());
|
||||
assertNotNull(consumer0.receive(1000));
|
||||
|
||||
consumer0.close();
|
||||
|
@ -215,7 +216,7 @@ public class FederatedQueueTest extends FederatedTestBase {
|
|||
producer.send(session1.createTextMessage("hello"));
|
||||
|
||||
assertNotNull(consumer1.receive(1000));
|
||||
assertNull(consumer0.receive(10));
|
||||
assertNull(consumer0.receiveNoWait());
|
||||
consumer1.close();
|
||||
|
||||
//Groups
|
||||
|
@ -228,7 +229,7 @@ public class FederatedQueueTest extends FederatedTestBase {
|
|||
consumer1 = session1.createConsumer(queue1);
|
||||
|
||||
producer.send(createTextMessage(session1, "groupA"));
|
||||
assertNull(consumer1.receive(10));
|
||||
assertNull(consumer1.receiveNoWait());
|
||||
assertNotNull(consumer0.receive(1000));
|
||||
}
|
||||
|
||||
|
@ -253,7 +254,7 @@ public class FederatedQueueTest extends FederatedTestBase {
|
|||
Queue queue0 = session0.createQueue(queueName);
|
||||
MessageConsumer consumer0 = session0.createConsumer(queue0);
|
||||
|
||||
assertNull(consumer0.receive(100));
|
||||
assertNull(consumer0.receiveNoWait());
|
||||
|
||||
FederationConfiguration federationConfiguration = createUpstreamFederationConfiguration("server1", queueName);
|
||||
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration);
|
||||
|
@ -415,6 +416,8 @@ public class FederatedQueueTest extends FederatedTestBase {
|
|||
producer1.send(session1.createTextMessage("hello"));
|
||||
assertNotNull(consumer0.receive(1000));
|
||||
|
||||
Wait.assertTrue(() -> getServer(0).getPostOffice().getBinding(SimpleString.toSimpleString(queueName)) != null);
|
||||
Wait.assertTrue(() -> getServer(1).getPostOffice().getBinding(SimpleString.toSimpleString(queueName)) != null);
|
||||
//Wait to see if extra consumers are created - this tests to make sure there is no loop and tests the FederatedQueue metaDataFilterString
|
||||
//is working properly - should only be 1 consumer on each (1 for the local consumer on broker0 and 1 for the federated consumer on broker1)
|
||||
assertFalse(Wait.waitFor(() -> getServer(0).locateQueue(SimpleString.toSimpleString(queueName)).getConsumerCount() > 1, 500, 100));
|
||||
|
@ -536,7 +539,7 @@ public class FederatedQueueTest extends FederatedTestBase {
|
|||
connection1.close();
|
||||
getServer(1).stop();
|
||||
|
||||
assertNull(consumer0.receive(100));
|
||||
assertNull(consumer0.receiveNoWait());
|
||||
|
||||
getServer(1).start();
|
||||
|
||||
|
@ -547,6 +550,8 @@ public class FederatedQueueTest extends FederatedTestBase {
|
|||
producer = session1.createProducer(queue1);
|
||||
producer.send(session1.createTextMessage("hello"));
|
||||
|
||||
Wait.assertTrue(() -> getServer(1).getPostOffice().getBinding(SimpleString.toSimpleString(queueName)) != null);
|
||||
|
||||
Wait.waitFor(() -> ((QueueBinding) getServer(1).getPostOffice().getBinding(SimpleString.toSimpleString(queueName))).consumerCount() == 1);
|
||||
|
||||
assertNotNull(consumer0.receive(1000));
|
||||
|
@ -601,6 +606,7 @@ public class FederatedQueueTest extends FederatedTestBase {
|
|||
consumer0 = session0.createConsumer(queue0);
|
||||
producer.send(session1.createTextMessage("hello"));
|
||||
|
||||
Wait.assertTrue(() -> getServer(1).getPostOffice().getBinding(SimpleString.toSimpleString(queueName)) != null);
|
||||
Wait.waitFor(() -> ((QueueBinding) getServer(1)
|
||||
.getPostOffice()
|
||||
.getBinding(SimpleString.toSimpleString(queueName)))
|
||||
|
|
|
@ -86,7 +86,7 @@ public class ScaleDownRemoveSFTest extends ClusterTestBase {
|
|||
|
||||
// consume a message from queue 1
|
||||
addConsumer(1, 0, queueName1, null, false);
|
||||
ClientMessage clientMessage = consumers[1].getConsumer().receive(250);
|
||||
ClientMessage clientMessage = consumers[1].getConsumer().receive(5_000);
|
||||
Assert.assertNotNull(clientMessage);
|
||||
clientMessage.acknowledge();
|
||||
consumers[1].getSession().commit();
|
||||
|
@ -105,12 +105,12 @@ public class ScaleDownRemoveSFTest extends ClusterTestBase {
|
|||
servers[0].stop();
|
||||
|
||||
addConsumer(0, 1, queueName1, null);
|
||||
clientMessage = consumers[0].getConsumer().receive(250);
|
||||
clientMessage = consumers[0].getConsumer().receive(10_000);
|
||||
Assert.assertNotNull(clientMessage);
|
||||
clientMessage.acknowledge();
|
||||
|
||||
// ensure there are no more messages on queue 1
|
||||
clientMessage = consumers[0].getConsumer().receive(250);
|
||||
clientMessage = consumers[0].getConsumer().receiveImmediate();
|
||||
Assert.assertNull(clientMessage);
|
||||
removeConsumer(0);
|
||||
|
||||
|
|
Loading…
Reference in New Issue