This closes #1632
This commit is contained in:
commit
f18b50a7ca
|
@ -18,11 +18,14 @@ package org.apache.activemq.artemis.junit;
|
|||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.junit.Assert;
|
||||
|
||||
/**
|
||||
* Utility adapted from: org.apache.activemq.util.Wait
|
||||
*/
|
||||
public class Wait {
|
||||
|
||||
|
||||
public static final long MAX_WAIT_MILLIS = 30 * 1000;
|
||||
public static final int SLEEP_MILLIS = 1000;
|
||||
|
||||
|
@ -31,19 +34,57 @@ public class Wait {
|
|||
boolean isSatisfied() throws Exception;
|
||||
}
|
||||
|
||||
public interface LongCondition {
|
||||
long getCount() throws Exception;
|
||||
}
|
||||
|
||||
public interface IntCondition {
|
||||
int getCount() throws Exception;
|
||||
}
|
||||
|
||||
public static boolean waitFor(Condition condition) throws Exception {
|
||||
return waitFor(condition, MAX_WAIT_MILLIS);
|
||||
}
|
||||
|
||||
public static void assertEquals(long size, LongCondition condition) throws Exception {
|
||||
boolean result = waitFor(() -> condition.getCount() == size);
|
||||
|
||||
if (!result) {
|
||||
Assert.fail(size + " != " + condition.getCount());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static void assertEquals(int size, IntCondition condition) throws Exception {
|
||||
boolean result = waitFor(() -> condition.getCount() == size);
|
||||
|
||||
if (!result) {
|
||||
Assert.fail(size + " != " + condition.getCount());
|
||||
}
|
||||
}
|
||||
|
||||
public static void assertTrue(Condition condition) throws Exception {
|
||||
assertTrue("Condition wasn't met", condition);
|
||||
}
|
||||
|
||||
|
||||
public static void assertTrue(String failureMessage, Condition condition) throws Exception {
|
||||
boolean result = waitFor(condition);
|
||||
|
||||
if (!result) {
|
||||
Assert.fail(failureMessage);
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean waitFor(final Condition condition, final long duration) throws Exception {
|
||||
return waitFor(condition, duration, SLEEP_MILLIS);
|
||||
}
|
||||
|
||||
public static boolean waitFor(final Condition condition,
|
||||
final long duration,
|
||||
final long durationMillis,
|
||||
final long sleepMillis) throws Exception {
|
||||
|
||||
final long expiry = System.currentTimeMillis() + duration;
|
||||
final long expiry = System.currentTimeMillis() + durationMillis;
|
||||
boolean conditionSatisified = condition.isSatisfied();
|
||||
while (!conditionSatisified && System.currentTimeMillis() < expiry) {
|
||||
TimeUnit.MILLISECONDS.sleep(sleepMillis);
|
||||
|
@ -52,4 +93,5 @@ public class Wait {
|
|||
return conditionSatisified;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -576,22 +576,27 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
}
|
||||
|
||||
private void initializeCriticalAnalyzer() throws Exception {
|
||||
|
||||
// Some tests will play crazy frequenceistop/start
|
||||
CriticalAnalyzer analyzer = this.getCriticalAnalyzer();
|
||||
if (analyzer == null) {
|
||||
if (configuration.isCriticalAnalyzer()) {
|
||||
// this will have its own ScheduledPool
|
||||
this.analyzer = new CriticalAnalyzerImpl();
|
||||
analyzer = new CriticalAnalyzerImpl();
|
||||
} else {
|
||||
this.analyzer = EmptyCriticalAnalyzer.getInstance();
|
||||
analyzer = EmptyCriticalAnalyzer.getInstance();
|
||||
}
|
||||
|
||||
this.analyzer = analyzer;
|
||||
}
|
||||
|
||||
/* Calling this for cases where the server was stopped and now is being restarted... failback, etc...*/
|
||||
this.analyzer.clear();
|
||||
analyzer.clear();
|
||||
|
||||
this.getCriticalAnalyzer().setCheckTime(configuration.getCriticalAnalyzerCheckPeriod(), TimeUnit.MILLISECONDS).setTimeout(configuration.getCriticalAnalyzerTimeout(), TimeUnit.MILLISECONDS);
|
||||
analyzer.setCheckTime(configuration.getCriticalAnalyzerCheckPeriod(), TimeUnit.MILLISECONDS).setTimeout(configuration.getCriticalAnalyzerTimeout(), TimeUnit.MILLISECONDS);
|
||||
|
||||
if (configuration.isCriticalAnalyzer()) {
|
||||
this.getCriticalAnalyzer().start();
|
||||
analyzer.start();
|
||||
}
|
||||
|
||||
CriticalAction criticalAction = null;
|
||||
|
@ -645,7 +650,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
break;
|
||||
}
|
||||
|
||||
this.getCriticalAnalyzer().addAction(criticalAction);
|
||||
analyzer.addAction(criticalAction);
|
||||
}
|
||||
|
||||
private void sendCriticalNotification(final CriticalComponent criticalComponent) {
|
||||
|
|
|
@ -65,6 +65,12 @@
|
|||
<scope>test</scope>
|
||||
<version>${byteman.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-junit</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.jboss.byteman</groupId>
|
||||
<artifactId>byteman-bmunit</artifactId>
|
||||
|
|
|
@ -146,7 +146,7 @@ public class PagingOMETest extends ActiveMQTestBase {
|
|||
|
||||
session.start();
|
||||
|
||||
Assert.assertTrue(Wait.waitFor(() -> numberOfMessages == queue.getMessageCount()));
|
||||
Wait.assertTrue(() -> numberOfMessages == queue.getMessageCount());
|
||||
|
||||
// The consumer has to be created after the queue.getMessageCount assertion
|
||||
// otherwise delivery could alter the messagecount and give us a false failure
|
||||
|
|
|
@ -181,6 +181,12 @@
|
|||
<scope>provided</scope>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-junit</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.jboss.logging</groupId>
|
||||
<artifactId>jboss-logging</artifactId>
|
||||
|
|
|
@ -240,8 +240,8 @@ public class AddressingTest extends ActiveMQTestBase {
|
|||
ClientConsumer consumer = session.createConsumer(queueName);
|
||||
// there is a consumer now so the message should be routed
|
||||
producer.send(session.createMessage(true));
|
||||
Wait.waitFor(() -> queue.getMessageCount() == 1);
|
||||
assertEquals(1, queue.getMessageCount());
|
||||
Wait.assertEquals(1, queue::getMessageCount);
|
||||
|
||||
|
||||
consumer.close();
|
||||
// the last consumer was closed so the queue should exist but be purged
|
||||
|
|
|
@ -110,7 +110,7 @@ public class AmqpBrokerReuqestedHearbeatsTest extends AmqpClientTestSupport {
|
|||
|
||||
connection.close();
|
||||
|
||||
assertTrue("Connection should get cleaned up.", Wait.waitFor(() -> server.getConnectionCount() == 0));
|
||||
Wait.assertEquals(0, server::getConnectionCount);
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
|
@ -139,6 +139,6 @@ public class AmqpBrokerReuqestedHearbeatsTest extends AmqpClientTestSupport {
|
|||
|
||||
connection.close();
|
||||
|
||||
assertTrue("Connection should get cleaned up.", Wait.waitFor(() -> server.getConnectionCount() == 0));
|
||||
Wait.assertEquals(0, server::getConnectionCount);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -59,7 +59,7 @@ public class AmqpDescribedTypePayloadTest extends JMSClientTestSupport {
|
|||
sender.close();
|
||||
|
||||
Queue queue = getProxyToQueue(getQueueName());
|
||||
assertTrue("Should be one message on Queue.", Wait.waitFor(() -> queue.getMessageCount() == 1));
|
||||
Wait.assertEquals(1, queue::getMessageCount);
|
||||
|
||||
AmqpReceiver receiver = session.createReceiver(getQueueName());
|
||||
receiver.flow(1);
|
||||
|
@ -86,7 +86,7 @@ public class AmqpDescribedTypePayloadTest extends JMSClientTestSupport {
|
|||
connection.close();
|
||||
|
||||
Queue queue = getProxyToQueue(getQueueName());
|
||||
assertTrue("Should be one message on Queue.", Wait.waitFor(() -> queue.getMessageCount() == 1));
|
||||
Wait.assertEquals(1, queue::getMessageCount);
|
||||
|
||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(getBrokerOpenWireConnectionURI());
|
||||
Connection jmsConnection = factory.createConnection();
|
||||
|
@ -119,7 +119,7 @@ public class AmqpDescribedTypePayloadTest extends JMSClientTestSupport {
|
|||
sender.close();
|
||||
|
||||
Queue queue = getProxyToQueue(getQueueName());
|
||||
assertTrue("Message did not arrive", Wait.waitFor(() -> queue.getMessageCount() == 1));
|
||||
Wait.assertEquals(1, queue::getMessageCount);
|
||||
|
||||
// Receive and resend with Qpid JMS client
|
||||
JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI());
|
||||
|
|
|
@ -58,7 +58,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
|
|||
AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS);
|
||||
assertNull(received);
|
||||
|
||||
assertTrue("Message should have expired", Wait.waitFor(() -> queueView.getMessagesExpired() == 1));
|
||||
Wait.assertEquals(1, queueView::getMessagesExpired);
|
||||
|
||||
connection.close();
|
||||
}
|
||||
|
@ -122,7 +122,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
|
|||
AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS);
|
||||
assertNull(received);
|
||||
|
||||
assertTrue("Message should have expired", Wait.waitFor(() -> queueView.getMessagesExpired() == 1));
|
||||
Wait.assertEquals(1, queueView::getMessagesExpired);
|
||||
|
||||
connection.close();
|
||||
}
|
||||
|
@ -157,7 +157,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
|
|||
AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS);
|
||||
assertNull(received);
|
||||
|
||||
assertTrue("Message should have expired", Wait.waitFor(() -> queueView.getMessagesExpired() == 1));
|
||||
Wait.assertEquals(1, queueView::getMessagesExpired);
|
||||
|
||||
connection.close();
|
||||
}
|
||||
|
@ -256,7 +256,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
|
|||
AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS);
|
||||
assertNull(received);
|
||||
|
||||
assertTrue("Message should have expired", Wait.waitFor(() -> queueView.getMessagesExpired() == 1));
|
||||
Wait.assertEquals(1, queueView::getMessagesExpired);
|
||||
|
||||
connection.close();
|
||||
}
|
||||
|
|
|
@ -235,11 +235,11 @@ public class AmqpFullyQualifiedNameTest extends JMSClientTestSupport {
|
|||
assertNotNull(consumer3.receive(2000));
|
||||
|
||||
Queue queue1 = getProxyToQueue(anycastQ1.toString());
|
||||
assertTrue("Message not consumed on Q1", Wait.waitFor(() -> queue1.getMessageCount() == 0));
|
||||
Wait.assertEquals(0, queue1::getMessageCount);
|
||||
Queue queue2 = getProxyToQueue(anycastQ2.toString());
|
||||
assertTrue("Message not consumed on Q2", Wait.waitFor(() -> queue2.getMessageCount() == 0));
|
||||
Wait.assertEquals(0, queue2::getMessageCount);
|
||||
Queue queue3 = getProxyToQueue(anycastQ3.toString());
|
||||
assertTrue("Message not consumed on Q3", Wait.waitFor(() -> queue3.getMessageCount() == 0));
|
||||
Wait.assertEquals(0, queue3::getMessageCount);
|
||||
|
||||
connection.close();
|
||||
//queues are empty now
|
||||
|
|
|
@ -65,9 +65,7 @@ public class AmqpInboundConnectionTest extends AmqpClientTestSupport {
|
|||
connection.disconnect(true);
|
||||
}
|
||||
|
||||
Wait.waitFor(amqpConnection::isClosed);
|
||||
|
||||
assertTrue(amqpConnection.isClosed());
|
||||
Wait.assertTrue(amqpConnection::isClosed);
|
||||
assertEquals(AmqpSupport.CONNECTION_FORCED, amqpConnection.getConnection().getRemoteCondition().getCondition());
|
||||
} finally {
|
||||
amqpConnection.close();
|
||||
|
@ -267,10 +265,8 @@ public class AmqpInboundConnectionTest extends AmqpClientTestSupport {
|
|||
|
||||
connection2.getStateInspector().assertValid();
|
||||
connection2.close();
|
||||
|
||||
assertTrue(Wait.waitFor(() -> server.getConnectionCount() == 1));
|
||||
|
||||
Wait.assertEquals(1, server::getConnectionCount);
|
||||
connection1.close();
|
||||
assertEquals(0, server.getConnectionCount());
|
||||
Wait.assertEquals(0, server::getConnectionCount);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -62,12 +62,8 @@ public class AmqpOutboundConnectionTest extends AmqpClientTestSupport {
|
|||
} finally {
|
||||
securityEnabled = false;
|
||||
}
|
||||
try {
|
||||
Wait.waitFor(remote::isActive);
|
||||
} catch (Exception e) {
|
||||
remote.stop();
|
||||
throw e;
|
||||
}
|
||||
|
||||
Wait.assertTrue(remote::isActive);
|
||||
|
||||
final Map<String, Object> config = new LinkedHashMap<>(); config.put(TransportConstants.HOST_PROP_NAME, "localhost");
|
||||
config.put(TransportConstants.PORT_PROP_NAME, String.valueOf(AMQP_PORT + 1));
|
||||
|
@ -99,14 +95,11 @@ public class AmqpOutboundConnectionTest extends AmqpClientTestSupport {
|
|||
connector.createConnection();
|
||||
|
||||
try {
|
||||
Wait.waitFor(() -> remote.getConnectionCount() > 0);
|
||||
assertEquals(1, remote.getConnectionCount());
|
||||
Wait.waitFor(connectionOpened::get);
|
||||
assertTrue("Remote connection was not opened - authentication error?", connectionOpened.get());
|
||||
Wait.assertEquals(1, remote::getConnectionCount);
|
||||
Wait.assertTrue(connectionOpened::get);
|
||||
lifeCycleListener.stop();
|
||||
|
||||
Wait.waitFor(() -> remote.getConnectionCount() == 0);
|
||||
assertEquals(0, remote.getConnectionCount());
|
||||
Wait.assertEquals(0, remote::getConnectionCount);
|
||||
} finally {
|
||||
lifeCycleListener.stop();
|
||||
remote.stop();
|
||||
|
|
|
@ -193,7 +193,7 @@ public class AmqpPresettledReceiverTest extends AmqpClientTestSupport {
|
|||
message.setText("Test-Message");
|
||||
sender.send(message);
|
||||
|
||||
assertTrue("Message did not arrive", Wait.waitFor(() -> queue.getMessageCount() == 1));
|
||||
Wait.assertEquals(0, queue::getMessageCount);
|
||||
|
||||
AmqpReceiver receiver = session.createReceiver(getQueueName(), null, false, true);
|
||||
|
||||
|
@ -229,7 +229,7 @@ public class AmqpPresettledReceiverTest extends AmqpClientTestSupport {
|
|||
message.setText("Test-Message");
|
||||
sender.send(message);
|
||||
|
||||
assertTrue("Message did not arrive", Wait.waitFor(() -> queue.getMessageCount() == 1));
|
||||
Wait.assertEquals(1, queue::getMessageCount);
|
||||
|
||||
AmqpReceiver receiver = session.createReceiver(getQueueName(), null, false, true);
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
|
|||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||
import org.apache.activemq.artemis.tests.util.Wait;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpClient;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpConnection;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpMessage;
|
||||
|
@ -76,7 +77,7 @@ public class AmqpPurgeOnNoConsumersTest extends AmqpClientTestSupport {
|
|||
|
||||
t.join(5000);
|
||||
|
||||
assertEquals(0, queueView.getMessageCount());
|
||||
Wait.assertEquals(0, queueView::getMessageCount);
|
||||
|
||||
connection.close();
|
||||
}
|
||||
|
|
|
@ -63,8 +63,8 @@ public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
|
|||
|
||||
Queue queueView = getProxyToQueue(destinationName);
|
||||
|
||||
assertTrue("Messages did not get queued", Wait.waitFor(() -> queueView.getMessageCount() == MSG_COUNT));
|
||||
assertEquals(0, queueView.getDeliveringCount());
|
||||
Wait.assertEquals(MSG_COUNT, queueView::getMessageCount);
|
||||
Wait.assertEquals(0, queueView::getDeliveringCount);
|
||||
|
||||
receiver.drain(MSG_COUNT);
|
||||
for (int i = 0; i < MSG_COUNT; ++i) {
|
||||
|
|
|
@ -274,7 +274,7 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
|
|||
|
||||
server.destroyQueue(new SimpleString(getQueueName()), null, false, true);
|
||||
|
||||
assertTrue("Receiver should have closed", Wait.waitFor(receiver::isClosed));
|
||||
Wait.assertTrue("Receiver should have closed", receiver::isClosed);
|
||||
} finally {
|
||||
connection.close();
|
||||
}
|
||||
|
|
|
@ -702,7 +702,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
|
|||
message1.setMessageId("ID:Message:1");
|
||||
sender.send(message1);
|
||||
|
||||
assertTrue("Message did not arrive", Wait.waitFor(() -> queue.getMessageCount() == 1));
|
||||
Wait.assertEquals(1, queue::getMessageCount);
|
||||
receiver1.flow(1);
|
||||
message1 = receiver1.receive(50, TimeUnit.SECONDS);
|
||||
assertNotNull("Should have read a message", message1);
|
||||
|
@ -716,7 +716,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
|
|||
message2.setMessageId("ID:Message:2");
|
||||
sender.send(message2);
|
||||
|
||||
assertTrue("Message did not arrive", Wait.waitFor(() -> queue.getMessageCount() == 1));
|
||||
Wait.assertEquals(1, queue::getMessageCount);
|
||||
receiver1.flow(1);
|
||||
message2 = receiver1.receive(50, TimeUnit.SECONDS);
|
||||
assertNotNull("Should have read a message", message2);
|
||||
|
@ -770,7 +770,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
|
|||
sender.send(message);
|
||||
connection.close();
|
||||
|
||||
assertTrue("Message did not arrive", Wait.waitFor(() -> queueView1.getMessageCount() == 1));
|
||||
Wait.assertEquals(1, queueView1::getMessageCount);
|
||||
|
||||
// Restart the server and the Queue should be empty
|
||||
server.stop();
|
||||
|
@ -783,9 +783,9 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
|
|||
|
||||
final Queue queueView2 = getProxyToQueue(getQueueName());
|
||||
if (durable) {
|
||||
assertTrue("Message should not have returned", Wait.waitFor(() -> queueView2.getMessageCount() == 1));
|
||||
Wait.assertTrue("Message should not have returned", () -> queueView2.getMessageCount() == 1);
|
||||
} else {
|
||||
assertTrue("Message should have been restored", Wait.waitFor(() -> queueView2.getMessageCount() == 0));
|
||||
Wait.assertTrue("Message should have been restored", () -> queueView2.getMessageCount() == 0);
|
||||
}
|
||||
|
||||
receiver.flow(1);
|
||||
|
@ -841,13 +841,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
|
|||
pendingAck.accept();
|
||||
}
|
||||
|
||||
assertTrue("Should be no inflight messages: " + destinationView.getDeliveringCount(), Wait.waitFor(new Wait.Condition() {
|
||||
|
||||
@Override
|
||||
public boolean isSatisfied() throws Exception {
|
||||
return destinationView.getDeliveringCount() == 0;
|
||||
}
|
||||
}));
|
||||
Wait.assertEquals(0, destinationView::getDeliveringCount);
|
||||
|
||||
sender.close();
|
||||
receiver.close();
|
||||
|
@ -1114,7 +1108,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
|
|||
|
||||
assertTrue("did not read all messages, waiting on: " + done.getCount(), done.await(10, TimeUnit.SECONDS));
|
||||
assertFalse("should not be any errors on receive", error.get());
|
||||
assertTrue("Should be no inflight messages.", Wait.waitFor(() -> queueView.getDeliveringCount() == 0));
|
||||
Wait.assertEquals(0, queueView::getDeliveringCount);
|
||||
|
||||
sender.close();
|
||||
receiver.close();
|
||||
|
|
|
@ -144,7 +144,7 @@ public class AmqpSenderTest extends AmqpClientTestSupport {
|
|||
}
|
||||
|
||||
Queue queueView = getProxyToQueue(getQueueName());
|
||||
assertTrue("All messages should arrive", Wait.waitFor(() -> queueView.getMessageCount() == MSG_COUNT));
|
||||
Wait.assertTrue("All messages should arrive", () -> queueView.getMessageCount() == MSG_COUNT);
|
||||
|
||||
sender.close();
|
||||
|
||||
|
@ -174,7 +174,7 @@ public class AmqpSenderTest extends AmqpClientTestSupport {
|
|||
}
|
||||
|
||||
Queue queueView = getProxyToQueue(getQueueName());
|
||||
assertTrue("All messages should arrive", Wait.waitFor(() -> queueView.getMessageCount() == MSG_COUNT));
|
||||
Wait.assertTrue("All messages should arrive", () -> queueView.getMessageCount() == MSG_COUNT);
|
||||
|
||||
sender.close();
|
||||
connection.close();
|
||||
|
|
|
@ -162,7 +162,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
|
|||
|
||||
session.commit();
|
||||
|
||||
assertTrue("Message was not queued", Wait.waitFor(() -> queue.getMessageCount() == 1));
|
||||
Wait.assertEquals(1, queue::getMessageCount);
|
||||
|
||||
sender.close();
|
||||
connection.close();
|
||||
|
@ -206,7 +206,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
|
|||
message.setText("Test-Message");
|
||||
sender.send(message);
|
||||
|
||||
assertTrue("Message did not arrive", Wait.waitFor(() -> queue.getMessageCount() == 1));
|
||||
Wait.assertEquals(1, queue::getMessageCount);
|
||||
|
||||
AmqpReceiver receiver = session.createReceiver(getQueueName());
|
||||
|
||||
|
@ -238,7 +238,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
|
|||
message.setText("Test-Message");
|
||||
sender.send(message);
|
||||
|
||||
assertTrue("Message did not arrive", Wait.waitFor(() -> queue.getMessageCount() == 1));
|
||||
Wait.assertEquals(1, queue::getMessageCount);
|
||||
|
||||
AmqpReceiver receiver = session.createReceiver(getQueueName());
|
||||
|
||||
|
@ -282,7 +282,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
|
|||
message.setText("Test-Message");
|
||||
sender.send(message);
|
||||
|
||||
assertTrue("Message did not arrive", Wait.waitFor(() -> queue.getMessageCount() == 1));
|
||||
Wait.assertEquals(1, queue::getMessageCount);
|
||||
|
||||
AmqpReceiver receiver = session.createReceiver(getQueueName());
|
||||
|
||||
|
|
|
@ -42,15 +42,15 @@ public class JMSConnectionTest extends JMSClientTestSupport {
|
|||
|
||||
Queue queueView = getProxyToQueue(getQueueName());
|
||||
|
||||
assertTrue("Connection not counted", Wait.waitFor(() -> server.getConnectionCount() == 1));
|
||||
assertTrue("Consumer not counted", Wait.waitFor(() -> queueView.getConsumerCount() == 1));
|
||||
Wait.assertEquals(1, server::getConnectionCount);
|
||||
Wait.assertEquals(1, server::getTotalConsumerCount);
|
||||
|
||||
assertEquals(1, queueView.getConsumerCount());
|
||||
|
||||
connection.close();
|
||||
|
||||
assertTrue("Consumer not closed", Wait.waitFor(() -> queueView.getConsumerCount() == 0));
|
||||
assertTrue("Connection not released", Wait.waitFor(() -> server.getConnectionCount() == 0));
|
||||
Wait.assertEquals(0, server::getConnectionCount);
|
||||
Wait.assertEquals(0, server::getTotalConsumerCount);
|
||||
} finally {
|
||||
connection.close();
|
||||
}
|
||||
|
|
|
@ -640,7 +640,7 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport {
|
|||
connection.close();
|
||||
|
||||
Queue queueView = getProxyToQueue(getQueueName());
|
||||
assertTrue("Not all messages were enqueud", Wait.waitFor(() -> queueView.getMessageCount() == NUM_MESSAGES));
|
||||
Wait.assertEquals(NUM_MESSAGES, queueView::getMessageCount);
|
||||
|
||||
// Create a consumer and prefetch the messages
|
||||
connection = createConnection();
|
||||
|
@ -652,7 +652,7 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport {
|
|||
consumer.close();
|
||||
connection.close();
|
||||
|
||||
assertTrue("Not all messages were enqueud", Wait.waitFor(() -> queueView.getMessageCount() == NUM_MESSAGES));
|
||||
Wait.assertEquals(NUM_MESSAGES, queueView::getMessageCount);
|
||||
} finally {
|
||||
connection.close();
|
||||
}
|
||||
|
@ -730,7 +730,7 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport {
|
|||
Queue queueView = getProxyToQueue(getQueueName());
|
||||
|
||||
connection.close();
|
||||
assertTrue("Not all messages consumed", Wait.waitFor(() -> queueView.getMessageCount() == 0));
|
||||
Wait.assertEquals(0, queueView::getMessageCount);
|
||||
|
||||
long taken = (System.currentTimeMillis() - time);
|
||||
System.out.println("Microbenchamrk ran in " + taken + " milliseconds, sending/receiving " + numMessages);
|
||||
|
@ -762,7 +762,7 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport {
|
|||
connection.close();
|
||||
Queue queueView = getProxyToQueue(getQueueName());
|
||||
|
||||
assertTrue("Not all messages enqueued", Wait.waitFor(() -> queueView.getMessageCount() == numMessages));
|
||||
Wait.assertEquals(numMessages, queueView::getMessageCount);
|
||||
|
||||
// Now create a new connection and receive and acknowledge
|
||||
connection = createConnection();
|
||||
|
@ -788,7 +788,7 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport {
|
|||
// Wait for Acks to be processed and message removed from queue.
|
||||
Thread.sleep(500);
|
||||
|
||||
assertTrue("Not all messages consumed", Wait.waitFor(() -> queueView.getMessageCount() == 0));
|
||||
Wait.assertEquals(0, queueView::getMessageCount);
|
||||
long taken = (System.currentTimeMillis() - time) / 1000;
|
||||
System.out.println("taken = " + taken);
|
||||
} finally {
|
||||
|
|
|
@ -63,9 +63,7 @@ public class JMSMessageTypesTest extends JMSClientTestSupport {
|
|||
Assert.assertEquals(1, addressControl.getQueueNames().length);
|
||||
addressControl.sendMessage(null, org.apache.activemq.artemis.api.core.Message.BYTES_TYPE, Base64.encodeBytes("test".getBytes()), false, fullUser, fullPass);
|
||||
|
||||
Wait.waitFor(() -> addressControl.getMessageCount() == 1);
|
||||
|
||||
Assert.assertEquals(1, addressControl.getMessageCount());
|
||||
Wait.assertEquals(1, addressControl::getMessageCount);
|
||||
|
||||
Connection connection = createConnection("myClientId");
|
||||
try {
|
||||
|
@ -95,7 +93,7 @@ public class JMSMessageTypesTest extends JMSClientTestSupport {
|
|||
Assert.assertEquals(1, addressControl.getQueueNames().length);
|
||||
addressControl.sendMessage(null, org.apache.activemq.artemis.api.core.Message.TEXT_TYPE, "test", false, fullUser, fullPass);
|
||||
|
||||
Wait.waitFor(() -> addressControl.getMessageCount() == 1);
|
||||
Wait.assertEquals(1, addressControl::getMessageCount);
|
||||
|
||||
Assert.assertEquals(1, addressControl.getMessageCount());
|
||||
|
||||
|
|
|
@ -58,7 +58,7 @@ public class JMSQueueBrowserTest extends JMSClientTestSupport {
|
|||
sendMessages(name.getMethodName(), MSG_COUNT, false);
|
||||
|
||||
Queue queueView = getProxyToQueue(getQueueName());
|
||||
assertTrue("Messages did not arrive", Wait.waitFor(() -> queueView.getMessageCount() == MSG_COUNT));
|
||||
Wait.assertEquals(MSG_COUNT, queueView::getMessageCount);
|
||||
|
||||
QueueBrowser browser = session.createBrowser(queue);
|
||||
assertNotNull(browser);
|
||||
|
@ -151,7 +151,7 @@ public class JMSQueueBrowserTest extends JMSClientTestSupport {
|
|||
sendMessages(name.getMethodName(), 5, false);
|
||||
|
||||
Queue queueView = getProxyToQueue(getQueueName());
|
||||
assertTrue("Messages did not arrive", Wait.waitFor(() -> queueView.getMessageCount() == 5));
|
||||
Wait.assertEquals(5, queueView::getMessageCount);
|
||||
|
||||
QueueBrowser browser = session.createBrowser(queue);
|
||||
assertNotNull(browser);
|
||||
|
@ -179,7 +179,7 @@ public class JMSQueueBrowserTest extends JMSClientTestSupport {
|
|||
sendMessages(name.getMethodName(), 5, false);
|
||||
|
||||
Queue queueView = getProxyToQueue(getQueueName());
|
||||
assertTrue("Messages did not arrive", Wait.waitFor(() -> queueView.getMessageCount() == 5));
|
||||
Wait.assertEquals(5, queueView::getMessageCount);
|
||||
|
||||
QueueBrowser browser = session.createBrowser(queue);
|
||||
assertNotNull(browser);
|
||||
|
@ -206,7 +206,7 @@ public class JMSQueueBrowserTest extends JMSClientTestSupport {
|
|||
sendMessages(name.getMethodName(), 5, false);
|
||||
|
||||
Queue queueView = getProxyToQueue(getQueueName());
|
||||
assertTrue("Messages did not arrive", Wait.waitFor(() -> queueView.getMessageCount() == 5));
|
||||
Wait.assertEquals(5, queueView::getMessageCount);
|
||||
|
||||
QueueBrowser browser = session.createBrowser(queue);
|
||||
assertNotNull(browser);
|
||||
|
@ -233,7 +233,7 @@ public class JMSQueueBrowserTest extends JMSClientTestSupport {
|
|||
sendMessages(name.getMethodName(), 5, false);
|
||||
|
||||
Queue queueView = getProxyToQueue(getQueueName());
|
||||
assertTrue("Messages did not arrive", Wait.waitFor(() -> queueView.getMessageCount() == 5));
|
||||
Wait.assertEquals(5, queueView::getMessageCount);
|
||||
|
||||
// Send some TX work but don't commit.
|
||||
MessageProducer txProducer = session.createProducer(queue);
|
||||
|
@ -278,7 +278,7 @@ public class JMSQueueBrowserTest extends JMSClientTestSupport {
|
|||
sendMessages(name.getMethodName(), MSG_COUNT, false);
|
||||
|
||||
Queue queueView = getProxyToQueue(getQueueName());
|
||||
assertTrue("Messages did not arrive", Wait.waitFor(() -> queueView.getMessageCount() == MSG_COUNT));
|
||||
Wait.assertEquals(MSG_COUNT, queueView::getMessageCount);
|
||||
|
||||
QueueBrowser browser = session.createBrowser(queue);
|
||||
assertNotNull(browser);
|
||||
|
|
|
@ -53,7 +53,7 @@ public class JMSTransactionTest extends JMSClientTestSupport {
|
|||
|
||||
Queue queueView = getProxyToQueue(getQueueName());
|
||||
|
||||
assertTrue("Message didn't arrive on queue", Wait.waitFor(() -> queueView.getMessageCount() == 10));
|
||||
Wait.assertEquals(10, queueView::getMessageCount);
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
|
@ -74,7 +74,7 @@ public class JMSTransactionTest extends JMSClientTestSupport {
|
|||
session.close();
|
||||
|
||||
Queue queueView = getProxyToQueue(getQueueName());
|
||||
assertTrue("Messages arrived on queue", Wait.waitFor(() -> queueView.getMessageCount() == 0));
|
||||
Wait.assertEquals(0, queueView::getMessageCount);
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
|
@ -97,7 +97,7 @@ public class JMSTransactionTest extends JMSClientTestSupport {
|
|||
session.close();
|
||||
|
||||
Queue queueView = getProxyToQueue(getQueueName());
|
||||
assertTrue("Messages arrived on queue", Wait.waitFor(() -> queueView.getMessageCount() == 0));
|
||||
Wait.assertEquals(0, queueView::getMessageCount);
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
|
@ -128,7 +128,7 @@ public class JMSTransactionTest extends JMSClientTestSupport {
|
|||
session.close();
|
||||
|
||||
Queue queueView = getProxyToQueue(getQueueName());
|
||||
assertTrue("Messages not consumed", Wait.waitFor(() -> queueView.getMessageCount() == 0));
|
||||
Wait.assertEquals(0, queueView::getMessageCount);
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
|
@ -158,7 +158,7 @@ public class JMSTransactionTest extends JMSClientTestSupport {
|
|||
session.rollback();
|
||||
|
||||
Queue queueView = getProxyToQueue(getQueueName());
|
||||
assertTrue("Messages consumed", Wait.waitFor(() -> queueView.getMessageCount() == 10));
|
||||
Wait.assertEquals(10, queueView::getMessageCount);
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
|
@ -181,7 +181,7 @@ public class JMSTransactionTest extends JMSClientTestSupport {
|
|||
session.commit();
|
||||
|
||||
Queue queueView = getProxyToQueue(getQueueName());
|
||||
assertTrue("Messages not enqueued", Wait.waitFor(() -> queueView.getMessageCount() == MSG_COUNT));
|
||||
Wait.assertEquals(MSG_COUNT, queueView::getMessageCount);
|
||||
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
|
||||
|
@ -193,9 +193,7 @@ public class JMSTransactionTest extends JMSClientTestSupport {
|
|||
|
||||
session.rollback();
|
||||
|
||||
Wait.waitFor(() -> MSG_COUNT == queueView.getConsumerCount());
|
||||
|
||||
assertEquals(MSG_COUNT, queueView.getMessageCount());
|
||||
Wait.assertEquals(MSG_COUNT, queueView::getMessageCount);
|
||||
|
||||
// Consume again..check we receive all the messages.
|
||||
Set<Integer> messageNumbers = new HashSet<>();
|
||||
|
|
|
@ -152,8 +152,8 @@ public class HangConsumerTest extends ActiveMQTestBase {
|
|||
// a flush to guarantee any pending task is finished on flushing out delivery and pending msgs
|
||||
queue.flushExecutor();
|
||||
Wait.waitFor(() -> getMessageCount(queue) == 2);
|
||||
Assert.assertEquals(2, getMessageCount(queue));
|
||||
Assert.assertEquals(2, getMessagesAdded(queue));
|
||||
Wait.assertEquals(2, queue::getMessageCount);
|
||||
Wait.assertEquals(2, queue::getMessagesAdded);
|
||||
|
||||
ClientMessage msg = consumer.receive(5000);
|
||||
Assert.assertNotNull(msg);
|
||||
|
|
|
@ -16,40 +16,9 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.tests.util;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Utility adapted from: org.apache.activemq.util.Wait
|
||||
*/
|
||||
public class Wait {
|
||||
|
||||
public static final long MAX_WAIT_MILLIS = 30 * 1000;
|
||||
public static final int SLEEP_MILLIS = 1000;
|
||||
|
||||
public interface Condition {
|
||||
|
||||
boolean isSatisfied() throws Exception;
|
||||
}
|
||||
|
||||
public static boolean waitFor(Condition condition) throws Exception {
|
||||
return waitFor(condition, MAX_WAIT_MILLIS);
|
||||
}
|
||||
|
||||
public static boolean waitFor(final Condition condition, final long duration) throws Exception {
|
||||
return waitFor(condition, duration, SLEEP_MILLIS);
|
||||
}
|
||||
|
||||
public static boolean waitFor(final Condition condition,
|
||||
final long durationMillis,
|
||||
final long sleepMillis) throws Exception {
|
||||
|
||||
final long expiry = System.currentTimeMillis() + durationMillis;
|
||||
boolean conditionSatisified = condition.isSatisfied();
|
||||
while (!conditionSatisified && System.currentTimeMillis() < expiry) {
|
||||
TimeUnit.MILLISECONDS.sleep(sleepMillis);
|
||||
conditionSatisified = condition.isSatisfied();
|
||||
}
|
||||
return conditionSatisified;
|
||||
}
|
||||
public class Wait extends org.apache.activemq.artemis.junit.Wait {
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue