ARTEMIS-127 Fix some concurrency idioms for ActimeMQ Tests

This commit is contained in:
Thiago Kronig 2015-06-10 20:37:58 -03:00
parent 64ecb9565d
commit ae6a2b87ea
24 changed files with 95 additions and 106 deletions

View File

@ -36,7 +36,7 @@ public class JmsCreateConsumerInOnMessageTest extends TestSupport implements Mes
private MessageConsumer testConsumer;
private MessageProducer producer;
private Topic topic;
private Object lock = new Object();
private final Object lock = new Object();
/*
* @see junit.framework.TestCase#setUp()
@ -71,8 +71,8 @@ public class JmsCreateConsumerInOnMessageTest extends TestSupport implements Mes
public void testCreateConsumer() throws Exception {
Message msg = super.createMessage();
producer.send(msg);
if (testConsumer == null) {
synchronized (lock) {
synchronized (lock) {
while(testConsumer == null) {
lock.wait(3000);
}
}

View File

@ -83,8 +83,6 @@ public class JmsMultipleClientsTestSupport {
protected List<Connection> connections = Collections.synchronizedList(new ArrayList<Connection>());
protected MessageIdList allMessagesList = new MessageIdList();
private AtomicInteger producerLock;
protected void startProducers(Destination dest, int msgCount) throws Exception {
startProducers(createConnectionFactory(), dest, msgCount);
}
@ -92,7 +90,7 @@ public class JmsMultipleClientsTestSupport {
protected void startProducers(final ConnectionFactory factory, final Destination dest, final int msgCount) throws Exception {
// Use concurrent send
if (useConcurrentSend) {
producerLock = new AtomicInteger(producerCount);
final AtomicInteger producerLock = new AtomicInteger(producerCount);
for (int i = 0; i < producerCount; i++) {
Thread t = new Thread(new Runnable() {

View File

@ -61,7 +61,7 @@ public class LargeMessageTestSupport extends ClientTestSupport implements Messag
protected int deliveryMode = DeliveryMode.PERSISTENT;
protected IdGenerator idGen = new IdGenerator();
protected boolean validMessageConsumption = true;
protected AtomicInteger messageCount = new AtomicInteger(0);
protected final AtomicInteger messageCount = new AtomicInteger(0);
protected int prefetchValue = 10000000;
@ -182,9 +182,9 @@ public class LargeMessageTestSupport extends ClientTestSupport implements Messag
producer.send(msg);
}
long now = System.currentTimeMillis();
while (now + 60000 > System.currentTimeMillis() && messageCount.get() < MESSAGE_COUNT) {
LOG.info("message count = " + messageCount);
synchronized (messageCount) {
synchronized (messageCount) {
while (now + 60000 > System.currentTimeMillis() && messageCount.get() < MESSAGE_COUNT) {
LOG.info("message count = " + messageCount);
messageCount.wait(1000);
}
}

View File

@ -154,7 +154,7 @@ public class OnePrefetchAsyncConsumerTest extends EmbeddedBrokerTestSupport {
}
private class TestServerSession implements ServerSession {
TestServerSessionPool pool;
final TestServerSessionPool pool;
Session session;
public TestServerSession(TestServerSessionPool pool) throws JMSException {

View File

@ -54,7 +54,7 @@ public class QueueResendDuringShutdownTest {
private Connection producerConnection;
private Queue queue;
private Object messageReceiveSync = new Object();
private final Object messageReceiveSync = new Object();
private int receiveCount;
@Before
@ -239,7 +239,7 @@ public class QueueResendDuringShutdownTest {
protected void waitForMessage (long delayMs) {
try {
synchronized ( this.messageReceiveSync ) {
if ( this.receiveCount == 0 ) {
while ( this.receiveCount == 0 ) {
this.messageReceiveSync.wait(delayMs);
}
}

View File

@ -563,7 +563,7 @@ public class AMQ2149Test {
}
class TeardownTask implements Callable<Boolean> {
private Object brokerLock;
private final Object brokerLock;
private BrokerService broker;
public TeardownTask(Object brokerLock, BrokerService broker) {

View File

@ -49,7 +49,7 @@ public class AMQ4607Test extends JmsMultipleBrokersTestSupport implements Uncaug
public boolean duplex = true;
protected Map<String, MessageConsumer> consumerMap;
Map<Thread, Throwable> unhandeledExceptions = new HashMap<Thread, Throwable>();
final Map<Thread, Throwable> unhandeledExceptions = new HashMap<Thread, Throwable>();
private void assertNoUnhandeledExceptions() {
for( Entry<Thread, Throwable> e: unhandeledExceptions.entrySet()) {

View File

@ -25,6 +25,9 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.command.ActiveMQQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class CraigsBugTest extends EmbeddedBrokerTestSupport {
private String connectionUri;
@ -49,9 +52,7 @@ public class CraigsBugTest extends EmbeddedBrokerTestSupport {
conn.start();
try {
synchronized (this) {
wait(3000);
}
new CountDownLatch(1).await(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}

View File

@ -25,6 +25,7 @@ import org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent;
import javax.jms.*;
import java.io.File;
import java.net.URISyntaxException;
import java.util.concurrent.CountDownLatch;
public class TryJmsClient
{
@ -59,9 +60,7 @@ public class TryJmsClient
startMessageSend();
synchronized(this) {
this.wait();
}
new CountDownLatch(1).await();
}
private void startUsageMonitor(final BrokerService brokerService) {

View File

@ -25,6 +25,7 @@ import org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent;
import javax.jms.*;
import java.io.File;
import java.net.URISyntaxException;
import java.util.concurrent.CountDownLatch;
public class TryJmsManager {
@ -59,9 +60,7 @@ public class TryJmsManager {
startMessageConsumer();
synchronized(this) {
this.wait();
}
new CountDownLatch(1).await();
}
private void startUsageMonitor(final BrokerService brokerService) {

View File

@ -73,14 +73,19 @@ public class ConsumerBean extends Assert implements MessageListener {
long start = System.currentTimeMillis();
try {
if (hasReceivedMessage()) {
synchronized (messages) {
synchronized(messages)
{
try
{
while (hasReceivedMessage())
{
messages.wait(4000);
}
}
} catch (InterruptedException e) {
LOG.info("Caught: " + e);
catch (InterruptedException e)
{
LOG.info("Caught: " + e);
}
}
long end = System.currentTimeMillis() - start;
@ -101,18 +106,18 @@ public class ConsumerBean extends Assert implements MessageListener {
LOG.info("Waiting for (" + maxRemainingMessageCount + ") message(s) to arrive");
long start = System.currentTimeMillis();
long endTime = start + maxWaitTime;
while (maxRemainingMessageCount > 0) {
try {
synchronized (messages) {
synchronized (messages) {
while (maxRemainingMessageCount > 0) {
try {
messages.wait(1000);
if (hasReceivedMessages(messageCount) || System.currentTimeMillis() > endTime) {
break;
}
} catch (InterruptedException e) {
LOG.info("Caught: " + e);
}
if (hasReceivedMessages(messageCount) || System.currentTimeMillis() > endTime) {
break;
}
} catch (InterruptedException e) {
LOG.info("Caught: " + e);
maxRemainingMessageCount = Math.max(0, messageCount - messages.size());
}
maxRemainingMessageCount = Math.max(0, messageCount - messages.size());
}
long end = System.currentTimeMillis() - start;
LOG.info("End of wait for " + end + " millis");

View File

@ -43,13 +43,13 @@ public class SpringConsumer extends ConsumerBean implements MessageListener {
try {
ConnectionFactory factory = template.getConnectionFactory();
connection = factory.createConnection();
final Connection c = connection = factory.createConnection();
// we might be a reusable connection in spring
// so lets only set the client ID once if its not set
synchronized (connection) {
if (connection.getClientID() == null) {
connection.setClientID(myId);
synchronized (c) {
if (c.getClientID() == null) {
c.setClientID(myId);
}
}

View File

@ -617,7 +617,7 @@ public class PListTest {
}
}
Map<PList, Object> locks = new HashMap<PList, Object>();
final Map<PList, Object> locks = new HashMap<PList, Object>();
private Object plistLocks(PList plist) {
Object lock = null;

View File

@ -250,7 +250,7 @@ public class JMSInputStreamTest extends JmsTestSupport {
}
out.flush();
synchronized (complete) {
if (!complete.get()) {
while (!complete.get()) {
complete.wait(30000);
}
}

View File

@ -55,7 +55,7 @@ public class TopicClusterTest extends TestCase implements MessageListener {
protected Destination destination;
protected boolean topic = true;
protected AtomicInteger receivedMessageCount = new AtomicInteger(0);
protected final AtomicInteger receivedMessageCount = new AtomicInteger(0);
protected int deliveryMode = DeliveryMode.NON_PERSISTENT;
protected MessageProducer[] producers;
protected Connection[] connections;
@ -166,7 +166,7 @@ public class TopicClusterTest extends TestCase implements MessageListener {
}
}
synchronized (receivedMessageCount) {
if (receivedMessageCount.get() < expectedReceiveCount()) {
while (receivedMessageCount.get() < expectedReceiveCount()) {
receivedMessageCount.wait(20000);
}
}

View File

@ -21,6 +21,7 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
@ -73,14 +74,12 @@ public class AMQ1925Test extends TestCase implements ExceptionListener {
// The runnable is likely to interrupt during the session#commit, since
// this takes the longest
final Object starter = new Object();
final CountDownLatch starter = new CountDownLatch(1);
final AtomicBoolean restarted = new AtomicBoolean();
new Thread(new Runnable() {
public void run() {
try {
synchronized (starter) {
starter.wait();
}
starter.await();
// Simulate broker failure & restart
bs.stop();
@ -97,9 +96,6 @@ public class AMQ1925Test extends TestCase implements ExceptionListener {
}
}).start();
synchronized (starter) {
starter.notifyAll();
}
for (int i = 0; i < MESSAGE_COUNT; i++) {
Message message = consumer.receive(500);
assertNotNull("No Message " + i + " found", message);
@ -108,9 +104,7 @@ public class AMQ1925Test extends TestCase implements ExceptionListener {
assertFalse("Timing problem, restarted too soon", restarted
.get());
if (i == 10) {
synchronized (starter) {
starter.notifyAll();
}
starter.countDown();
}
if (i > MESSAGE_COUNT - 100) {
assertTrue("Timing problem, restarted too late", restarted
@ -143,14 +137,12 @@ public class AMQ1925Test extends TestCase implements ExceptionListener {
// The runnable is likely to interrupt during the session#commit, since
// this takes the longest
final Object starter = new Object();
final CountDownLatch starter = new CountDownLatch(1);
final AtomicBoolean restarted = new AtomicBoolean();
new Thread(new Runnable() {
public void run() {
try {
synchronized (starter) {
starter.wait();
}
starter.await();
// Simulate broker failure & restart
bs.stop();
@ -167,9 +159,6 @@ public class AMQ1925Test extends TestCase implements ExceptionListener {
}
}).start();
synchronized (starter) {
starter.notifyAll();
}
Collection<Integer> results = new ArrayList<Integer>(MESSAGE_COUNT);
for (int i = 0; i < MESSAGE_COUNT; i++) {
Message message1 = consumer1.receive(20);
@ -191,9 +180,7 @@ public class AMQ1925Test extends TestCase implements ExceptionListener {
assertFalse("Timing problem, restarted too soon", restarted
.get());
if (i == 10) {
synchronized (starter) {
starter.notifyAll();
}
starter.countDown();
}
if (i > MESSAGE_COUNT - 50) {
assertTrue("Timing problem, restarted too late", restarted

View File

@ -46,7 +46,7 @@ public abstract class UdpTestSupport extends TestCase implements TransportListen
protected Transport producer;
protected Transport consumer;
protected Object lock = new Object();
protected final Object lock = new Object();
protected Command receivedCommand;
protected TransportServer server;
protected boolean large;
@ -251,10 +251,10 @@ public abstract class UdpTestSupport extends TestCase implements TransportListen
Command answer = null;
synchronized (lock) {
answer = receivedCommand;
if (answer == null) {
while (answer == null) {
lock.wait(waitForCommandTimeout);
answer = receivedCommand;
}
answer = receivedCommand;
}
assertNotNull("Should have received a Command by now!", answer);

View File

@ -107,7 +107,7 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport {
// periodically start a durable sub that has a backlog
final int consumersToActivate = 5;
final Object addConsumerSignal = new Object();
final CountDownLatch addConsumerSignal = new CountDownLatch(1);
Executors.newCachedThreadPool(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
@ -120,9 +120,7 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport {
MessageConsumer consumer = null;
for (int i = 0; i < consumersToActivate; i++) {
LOG.info("Waiting for add signal from producer...");
synchronized (addConsumerSignal) {
addConsumerSignal.wait(30 * 60 * 1000);
}
addConsumerSignal.await(30, TimeUnit.MINUTES);
TimedMessageListener listener = new TimedMessageListener();
consumer = createDurableSubscriber(factory.createConnection(), destination, "consumer" + (i + 1));
LOG.info("Created consumer " + consumer);
@ -254,7 +252,7 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport {
final int numIterations,
Session session,
MessageProducer producer,
Object addConsumerSignal) throws Exception {
CountDownLatch addConsumerSignal) throws Exception {
long start;
long count = 0;
double batchMax = 0, max = 0, sum = 0;
@ -269,10 +267,8 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport {
max = Math.max(max, (System.currentTimeMillis() - singleSendstart));
if (++count % 500 == 0) {
if (addConsumerSignal != null) {
synchronized (addConsumerSignal) {
addConsumerSignal.notifyAll();
LOG.info("Signalled add consumer");
}
addConsumerSignal.countDown();
LOG.info("Signalled add consumer");
}
};
if (count % 5000 == 0) {

View File

@ -95,7 +95,7 @@ public class ConcurrentProducerQueueConsumerTest extends TestSupport
// periodically start a queue consumer
final int consumersToActivate = 5;
final Object addConsumerSignal = new Object();
final CountDownLatch addConsumerSignal = new CountDownLatch(1);
Executors.newCachedThreadPool(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
@ -108,9 +108,7 @@ public class ConcurrentProducerQueueConsumerTest extends TestSupport
MessageConsumer consumer = null;
for (int i = 0; i < consumersToActivate; i++) {
LOG.info("Waiting for add signal from producer...");
synchronized (addConsumerSignal) {
addConsumerSignal.wait(30 * 60 * 1000);
}
addConsumerSignal.await(30, TimeUnit.MINUTES);
TimedMessageListener listener = new TimedMessageListener();
consumer = createConsumer(factory.createConnection(), destination);
LOG.info("Created consumer " + consumer);
@ -241,7 +239,7 @@ public class ConcurrentProducerQueueConsumerTest extends TestSupport
final int numIterations,
Session session,
MessageProducer producer,
Object addConsumerSignal) throws Exception {
CountDownLatch addConsumerSignal) throws Exception {
long start;
long count = 0;
double batchMax = 0, max = 0, sum = 0;
@ -257,10 +255,8 @@ public class ConcurrentProducerQueueConsumerTest extends TestSupport
max = Math.max(max, (System.currentTimeMillis() - singleSendstart));
if (++count % 500 == 0) {
if (addConsumerSignal != null) {
synchronized (addConsumerSignal) {
addConsumerSignal.notifyAll();
LOG.info("Signalled add consumer");
}
addConsumerSignal.countDown();
LOG.info("Signalled add consumer");
}
}
;

View File

@ -44,7 +44,7 @@ public class MultiBrokersMultiClientsTest extends JmsMultipleBrokersTestSupport
private static final Logger LOG = LoggerFactory.getLogger(MultiBrokersMultiClientsTest.class);
protected Map<String, MessageConsumer> consumerMap;
Map<Thread, Throwable> unhandeledExceptions = new HashMap<Thread, Throwable>();
final Map<Thread, Throwable> unhandeledExceptions = new HashMap<Thread, Throwable>();
public void testTopicAllConnected() throws Exception {
bridgeAllBrokers();

View File

@ -265,7 +265,7 @@ public class NoDuplicateOnTopicNetworkTest extends CombinationTestSupport {
private MessageConsumer consumer;
private final String durableID = "DURABLE_ID";
private List<String> receivedStrings = Collections.synchronizedList(new ArrayList<String>());
private final List<String> receivedStrings = Collections.synchronizedList(new ArrayList<String>());
private int numMessages = 10;
private CountDownLatch recievedLatch = new CountDownLatch(numMessages);

View File

@ -47,8 +47,8 @@ public class ReliableReconnectTest extends org.apache.activemq.TestSupport {
protected int deliveryMode = DeliveryMode.PERSISTENT;
protected String consumerClientId;
protected Destination destination;
protected AtomicBoolean closeBroker = new AtomicBoolean(false);
protected AtomicInteger messagesReceived = new AtomicInteger(0);
protected final AtomicBoolean closeBroker = new AtomicBoolean(false);
protected final AtomicInteger messagesReceived = new AtomicInteger(0);
protected BrokerService broker;
protected int firstBatch = MESSAGE_COUNT / 10;
private IdGenerator idGen = new IdGenerator();
@ -159,7 +159,7 @@ public class ReliableReconnectTest extends org.apache.activemq.TestSupport {
connection.close();
spawnConsumer();
synchronized (closeBroker) {
if (!closeBroker.get()) {
while (!closeBroker.get()) {
closeBroker.wait();
}
}
@ -168,7 +168,7 @@ public class ReliableReconnectTest extends org.apache.activemq.TestSupport {
startBroker(false);
// System.err.println("Started Broker again");
synchronized (messagesReceived) {
if (messagesReceived.get() < MESSAGE_COUNT) {
while (messagesReceived.get() < MESSAGE_COUNT) {
messagesReceived.wait(60000);
}
}

View File

@ -51,7 +51,7 @@ public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTest
public static final int TIMEOUT = 30000;
protected Map<String, MessageConsumer> consumerMap;
Map<Thread, Throwable> unhandledExceptions = new HashMap<Thread, Throwable>();
final Map<Thread, Throwable> unhandledExceptions = new HashMap<Thread, Throwable>();
private void assertNoUnhandledExceptions() {
for( Entry<Thread, Throwable> e: unhandledExceptions.entrySet()) {

View File

@ -140,20 +140,28 @@ public class MessageIdList extends Assert implements MessageListener {
long start = System.currentTimeMillis();
for (int i = 0; i < messageCount; i++) {
try {
if (hasReceivedMessages(messageCount)) {
break;
}
long duration = System.currentTimeMillis() - start;
if (duration >= maximumDuration) {
break;
}
synchronized (semaphore) {
synchronized (semaphore)
{
for (int i = 0; i < messageCount; i++)
{
try
{
if (hasReceivedMessages(messageCount))
{
break;
}
long duration = System.currentTimeMillis() - start;
if (duration >= maximumDuration)
{
break;
}
semaphore.wait(maximumDuration - duration);
}
} catch (InterruptedException e) {
LOG.info("Caught: " + e);
catch (InterruptedException e)
{
LOG.info("Caught: " + e);
}
}
}
long end = System.currentTimeMillis() - start;