diff --git a/tests/activemq5-unit-tests/pom.xml b/tests/activemq5-unit-tests/pom.xml
index d6d87e8e92..da44d3eb95 100644
--- a/tests/activemq5-unit-tests/pom.xml
+++ b/tests/activemq5-unit-tests/pom.xml
@@ -20,7 +20,7 @@
org.apache.activemq.tests
artemis-tests-pom
- 1.0.0-SNAPSHOT
+ 1.0.1-SNAPSHOT
activemq5-unit-tests
diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
index 86580e1c75..b7397301d2 100644
--- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
+++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
@@ -38,7 +38,7 @@ import org.apache.activemq.broker.BrokerService;
public class ArtemisBrokerWrapper extends ArtemisBrokerBase
{
- protected Map testQueues = new HashMap();
+ protected final Map testQueues = new HashMap();
public ArtemisBrokerWrapper(BrokerService brokerService)
{
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsCreateConsumerInOnMessageTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsCreateConsumerInOnMessageTest.java
index 7a219e2cc3..c0a4f5fd68 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsCreateConsumerInOnMessageTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsCreateConsumerInOnMessageTest.java
@@ -36,7 +36,7 @@ public class JmsCreateConsumerInOnMessageTest extends TestSupport implements Mes
private MessageConsumer testConsumer;
private MessageProducer producer;
private Topic topic;
- private Object lock = new Object();
+ private final Object lock = new Object();
/*
* @see junit.framework.TestCase#setUp()
@@ -71,8 +71,8 @@ public class JmsCreateConsumerInOnMessageTest extends TestSupport implements Mes
public void testCreateConsumer() throws Exception {
Message msg = super.createMessage();
producer.send(msg);
- if (testConsumer == null) {
- synchronized (lock) {
+ synchronized (lock) {
+ while(testConsumer == null) {
lock.wait(3000);
}
}
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
index 5eaab8dda7..5c73a6e4e0 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
@@ -83,8 +83,6 @@ public class JmsMultipleClientsTestSupport {
protected List connections = Collections.synchronizedList(new ArrayList());
protected MessageIdList allMessagesList = new MessageIdList();
- private AtomicInteger producerLock;
-
protected void startProducers(Destination dest, int msgCount) throws Exception {
startProducers(createConnectionFactory(), dest, msgCount);
}
@@ -92,7 +90,7 @@ public class JmsMultipleClientsTestSupport {
protected void startProducers(final ConnectionFactory factory, final Destination dest, final int msgCount) throws Exception {
// Use concurrent send
if (useConcurrentSend) {
- producerLock = new AtomicInteger(producerCount);
+ final AtomicInteger producerLock = new AtomicInteger(producerCount);
for (int i = 0; i < producerCount; i++) {
Thread t = new Thread(new Runnable() {
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeMessageTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeMessageTestSupport.java
index fc772185cf..d1ab8a5ba7 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeMessageTestSupport.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeMessageTestSupport.java
@@ -61,7 +61,7 @@ public class LargeMessageTestSupport extends ClientTestSupport implements Messag
protected int deliveryMode = DeliveryMode.PERSISTENT;
protected IdGenerator idGen = new IdGenerator();
protected boolean validMessageConsumption = true;
- protected AtomicInteger messageCount = new AtomicInteger(0);
+ protected final AtomicInteger messageCount = new AtomicInteger(0);
protected int prefetchValue = 10000000;
@@ -182,9 +182,9 @@ public class LargeMessageTestSupport extends ClientTestSupport implements Messag
producer.send(msg);
}
long now = System.currentTimeMillis();
- while (now + 60000 > System.currentTimeMillis() && messageCount.get() < MESSAGE_COUNT) {
- LOG.info("message count = " + messageCount);
- synchronized (messageCount) {
+ synchronized (messageCount) {
+ while (now + 60000 > System.currentTimeMillis() && messageCount.get() < MESSAGE_COUNT) {
+ LOG.info("message count = " + messageCount);
messageCount.wait(1000);
}
}
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java
index 085119847f..26c6bf102a 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java
@@ -154,7 +154,7 @@ public class OnePrefetchAsyncConsumerTest extends EmbeddedBrokerTestSupport {
}
private class TestServerSession implements ServerSession {
- TestServerSessionPool pool;
+ final TestServerSessionPool pool;
Session session;
public TestServerSession(TestServerSessionPool pool) throws JMSException {
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java
index cf4929a551..422b5ffe44 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java
@@ -43,7 +43,7 @@ public class DbRestartJDBCQueueMasterSlaveLeaseTest extends DbRestartJDBCQueueMa
LeaseLockerIOExceptionHandler ioExceptionHandler = new LeaseLockerIOExceptionHandler();
ioExceptionHandler.setIgnoreSQLExceptions(false);
ioExceptionHandler.setStopStartConnectors(false);
- ioExceptionHandler.setResumeCheckSleepPeriod(500l);
+ ioExceptionHandler.setResumeCheckSleepPeriod(500L);
brokerService.setIoExceptionHandler(ioExceptionHandler);
}
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer1Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer1Test.java
index e17b362cf6..4368f7927a 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer1Test.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer1Test.java
@@ -82,7 +82,7 @@ public class AbortSlowConsumer1Test extends AbortSlowConsumerBase {
consumer.close();
TimeUnit.SECONDS.sleep(5);
- assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty());
+ assertTrue("no exceptions : " + exceptions, exceptions.isEmpty());
}
@Test(timeout = 60 * 1000)
@@ -99,6 +99,6 @@ public class AbortSlowConsumer1Test extends AbortSlowConsumerBase {
conn.close();
TimeUnit.SECONDS.sleep(5);
- assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty());
+ assertTrue("no exceptions : " + exceptions, exceptions.isEmpty());
}
}
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
index 91209379ad..d02085aed6 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
@@ -67,7 +67,7 @@ public class QueuePurgeTest extends CombinationTestSupport {
broker.setDataDirectoryFile(testDataDir);
broker.setUseJmx(true);
broker.setDeleteAllMessagesOnStartup(true);
- broker.getSystemUsage().getMemoryUsage().setLimit(1024l*1024*64);
+ broker.getSystemUsage().getMemoryUsage().setLimit(1024L*1024*64);
KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
persistenceAdapter.setDirectory(new File(testDataDir, "kahadb"));
broker.setPersistenceAdapter(persistenceAdapter);
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueResendDuringShutdownTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueResendDuringShutdownTest.java
index 0439fa847c..c7154a9768 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueResendDuringShutdownTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueResendDuringShutdownTest.java
@@ -54,7 +54,7 @@ public class QueueResendDuringShutdownTest {
private Connection producerConnection;
private Queue queue;
- private Object messageReceiveSync = new Object();
+ private final Object messageReceiveSync = new Object();
private int receiveCount;
@Before
@@ -239,7 +239,7 @@ public class QueueResendDuringShutdownTest {
protected void waitForMessage (long delayMs) {
try {
synchronized ( this.messageReceiveSync ) {
- if ( this.receiveCount == 0 ) {
+ while ( this.receiveCount == 0 ) {
this.messageReceiveSync.wait(delayMs);
}
}
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
index f8fab10a13..e5431c065d 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
@@ -106,7 +106,7 @@ public class StoreQueueCursorOrderTest {
@Override
public void run() {
}
- }, 2l) {};
+ }, 2L) {};
msg.getMessageId().setFutureOrSequenceLong(future);
underTest.addMessageLast(msg);
@@ -116,12 +116,12 @@ public class StoreQueueCursorOrderTest {
msg = getMessage(1);
messages[0] = msg;
msg.setMemoryUsage(systemUsage.getMemoryUsage());
- msg.getMessageId().setFutureOrSequenceLong(1l);
+ msg.getMessageId().setFutureOrSequenceLong(1L);
underTest.addMessageLast(msg);
assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled());
- assertEquals("setBatch unset", 0l, queueMessageStore.batch.get());
+ assertEquals("setBatch unset", 0L, queueMessageStore.batch.get());
int dequeueCount = 0;
@@ -171,9 +171,9 @@ public class StoreQueueCursorOrderTest {
FutureTask future = new FutureTask(new Runnable() {
@Override
public void run() {
- msgRef.getMessageId().setFutureOrSequenceLong(1l);
+ msgRef.getMessageId().setFutureOrSequenceLong(1L);
}
- }, 1l) {};
+ }, 1L) {};
msg.getMessageId().setFutureOrSequenceLong(future);
Executors.newSingleThreadExecutor().submit(future);
underTest.addMessageLast(msg);
@@ -184,12 +184,12 @@ public class StoreQueueCursorOrderTest {
msg = getMessage(1);
messages[0] = msg;
msg.setMemoryUsage(systemUsage.getMemoryUsage());
- msg.getMessageId().setFutureOrSequenceLong(1l);
+ msg.getMessageId().setFutureOrSequenceLong(1L);
underTest.addMessageLast(msg);
assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled());
- assertEquals("setBatch unset", 0l, queueMessageStore.batch.get());
+ assertEquals("setBatch unset", 0L, queueMessageStore.batch.get());
int dequeueCount = 0;
@@ -239,9 +239,9 @@ public class StoreQueueCursorOrderTest {
FutureTask future = new FutureTask(new Runnable() {
@Override
public void run() {
- msgRef.getMessageId().setFutureOrSequenceLong(0l);
+ msgRef.getMessageId().setFutureOrSequenceLong(0L);
}
- }, 0l) {};
+ }, 0L) {};
msg.getMessageId().setFutureOrSequenceLong(future);
Executors.newSingleThreadExecutor().submit(future);
underTest.addMessageLast(msg);
@@ -257,16 +257,16 @@ public class StoreQueueCursorOrderTest {
FutureTask future2 = new FutureTask(new Runnable() {
@Override
public void run() {
- msgRe2f.getMessageId().setFutureOrSequenceLong(1l);
+ msgRe2f.getMessageId().setFutureOrSequenceLong(1L);
}
- }, 1l) {};
+ }, 1L) {};
msg.getMessageId().setFutureOrSequenceLong(future2);
Executors.newSingleThreadExecutor().submit(future2);
underTest.addMessageLast(msg);
assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled());
- assertEquals("setBatch set", 1l, queueMessageStore.batch.get());
+ assertEquals("setBatch set", 1L, queueMessageStore.batch.get());
int dequeueCount = 0;
@@ -316,9 +316,9 @@ public class StoreQueueCursorOrderTest {
FutureTask future0 = new FutureTask(new Runnable() {
@Override
public void run() {
- msgRef.getMessageId().setFutureOrSequenceLong(0l);
+ msgRef.getMessageId().setFutureOrSequenceLong(0L);
}
- }, 0l) {};
+ }, 0L) {};
msg.getMessageId().setFutureOrSequenceLong(future0);
underTest.addMessageLast(msg);
Executors.newSingleThreadExecutor().submit(future0);
@@ -332,9 +332,9 @@ public class StoreQueueCursorOrderTest {
FutureTask future1 = new FutureTask(new Runnable() {
@Override
public void run() {
- msgRef1.getMessageId().setFutureOrSequenceLong(3l);
+ msgRef1.getMessageId().setFutureOrSequenceLong(3L);
}
- }, 3l) {};
+ }, 3L) {};
msg.getMessageId().setFutureOrSequenceLong(future1);
underTest.addMessageLast(msg);
@@ -342,7 +342,7 @@ public class StoreQueueCursorOrderTest {
msg = getMessage(2);
messages[1] = msg;
msg.setMemoryUsage(systemUsage.getMemoryUsage());
- msg.getMessageId().setFutureOrSequenceLong(1l);
+ msg.getMessageId().setFutureOrSequenceLong(1L);
underTest.addMessageLast(msg);
assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled());
@@ -354,12 +354,12 @@ public class StoreQueueCursorOrderTest {
msg = getMessage(3);
messages[2] = msg;
msg.setMemoryUsage(systemUsage.getMemoryUsage());
- msg.getMessageId().setFutureOrSequenceLong(3l);
+ msg.getMessageId().setFutureOrSequenceLong(3L);
underTest.addMessageLast(msg);
assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled());
- assertEquals("setBatch set", 2l, queueMessageStore.batch.get());
+ assertEquals("setBatch set", 2L, queueMessageStore.batch.get());
int dequeueCount = 0;
@@ -405,13 +405,13 @@ public class StoreQueueCursorOrderTest {
ActiveMQTextMessage msg = getMessage(0);
messages[0] = msg;
msg.setMemoryUsage(systemUsage.getMemoryUsage());
- msg.getMessageId().setFutureOrSequenceLong(0l);
+ msg.getMessageId().setFutureOrSequenceLong(0L);
underTest.addMessageLast(msg);
msg = getMessage(1);
messages[1] = msg;
msg.setMemoryUsage(systemUsage.getMemoryUsage());
- msg.getMessageId().setFutureOrSequenceLong(1l);
+ msg.getMessageId().setFutureOrSequenceLong(1L);
underTest.addMessageLast(msg);
assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled());
@@ -419,12 +419,12 @@ public class StoreQueueCursorOrderTest {
msg = getMessage(2);
messages[2] = msg;
msg.setMemoryUsage(systemUsage.getMemoryUsage());
- msg.getMessageId().setFutureOrSequenceLong(2l);
+ msg.getMessageId().setFutureOrSequenceLong(2L);
underTest.addMessageLast(msg);
assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled());
- assertEquals("setBatch set", 2l, queueMessageStore.batch.get());
+ assertEquals("setBatch set", 2L, queueMessageStore.batch.get());
int dequeueCount = 0;
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
index 0ce584d1e5..41b0f0dc02 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
@@ -260,7 +260,7 @@ public class JmsSchedulerTest extends JobSchedulerTestSupport {
// wait for the producer to block, which should happen immediately, and also wait long
// enough for the delay to elapse. We should see no deliveries as the send should block
// on the first message.
- Thread.sleep(10000l);
+ Thread.sleep(10000L);
assertEquals(100, latch.getCount());
@@ -268,7 +268,7 @@ public class JmsSchedulerTest extends JobSchedulerTestSupport {
broker.getSystemUsage().getJobSchedulerUsage().setLimit(1024 * 1024 * 33);
// Wait long enough that the messages are enqueued and the delivery delay has elapsed.
- Thread.sleep(10000l);
+ Thread.sleep(10000L);
// Make sure we sent all the messages we expected to send
Wait.waitFor(new Wait.Condition() {
@@ -276,12 +276,12 @@ public class JmsSchedulerTest extends JobSchedulerTestSupport {
public boolean isSatisified() throws Exception {
return producer.getSentCount() == producer.getMessageCount();
}
- }, 20000l);
+ }, 20000L);
assertEquals("Producer didn't send all messages", producer.getMessageCount(), producer.getSentCount());
// Make sure we got all the messages we expected to get
- latch.await(20000l, TimeUnit.MILLISECONDS);
+ latch.await(20000L, TimeUnit.MILLISECONDS);
assertEquals("Consumer did not receive all messages.", 0, latch.getCount());
}
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
index b2eba61366..f172a919f3 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
@@ -30,6 +30,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
import javax.jms.*;
@@ -65,22 +66,22 @@ public class AMQ2149Test {
private static final String BROKER_CONNECTOR = "tcp://localhost:61617";
private static final String DEFAULT_BROKER_URL = "failover:("+ BROKER_CONNECTOR
+")?maxReconnectDelay=1000&useExponentialBackOff=false";
-
+
private final String SEQ_NUM_PROPERTY = "seqNum";
final int MESSAGE_LENGTH_BYTES = 75 * 1024;
final long SLEEP_BETWEEN_SEND_MS = 25;
final int NUM_SENDERS_AND_RECEIVERS = 10;
final Object brokerLock = new Object();
-
+
private static final long DEFAULT_BROKER_STOP_PERIOD = 10 * 1000;
private static final long DEFAULT_NUM_TO_SEND = 1400;
-
+
long brokerStopPeriod = DEFAULT_BROKER_STOP_PERIOD;
long numtoSend = DEFAULT_NUM_TO_SEND;
long sleepBetweenSend = SLEEP_BETWEEN_SEND_MS;
String brokerURL = DEFAULT_BROKER_URL;
-
+
int numBrokerRestarts = 0;
final static int MAX_BROKER_RESTARTS = 4;
BrokerService broker;
@@ -88,15 +89,15 @@ public class AMQ2149Test {
protected File dataDirFile;
final LoggingBrokerPlugin[] plugins = new LoggingBrokerPlugin[]{new LoggingBrokerPlugin()};
-
-
+
+
public void createBroker(Configurer configurer) throws Exception {
broker = new BrokerService();
configurePersistenceAdapter(broker);
-
+
broker.getSystemUsage().getMemoryUsage().setLimit(MESSAGE_LENGTH_BYTES * 200 * NUM_SENDERS_AND_RECEIVERS);
- broker.addConnector(BROKER_CONNECTOR);
+ broker.addConnector(BROKER_CONNECTOR);
broker.setBrokerName(testName.getMethodName());
broker.setDataDirectoryFile(dataDirFile);
if (configurer != null) {
@@ -104,7 +105,7 @@ public class AMQ2149Test {
}
broker.start();
}
-
+
protected void configurePersistenceAdapter(BrokerService brokerService) throws Exception {
}
@@ -135,7 +136,7 @@ public class AMQ2149Test {
executor.shutdownNow();
exceptions.clear();
}
-
+
private String buildLongString() {
final StringBuilder stringBuilder = new StringBuilder(
MESSAGE_LENGTH_BYTES);
@@ -156,8 +157,8 @@ public class AMQ2149Test {
private final MessageConsumer messageConsumer;
- private volatile long nextExpectedSeqNum = 0;
-
+ private AtomicLong nextExpectedSeqNum = new AtomicLong();
+
private final boolean transactional;
private String lastId = null;
@@ -182,11 +183,11 @@ public class AMQ2149Test {
public void close() throws JMSException {
connection.close();
}
-
+
public long getNextExpectedSeqNo() {
- return nextExpectedSeqNum;
+ return nextExpectedSeqNum.get();
}
-
+
final int TRANSACITON_BATCH = 500;
boolean resumeOnNextOrPreviousIsOk = false;
public void onMessage(Message message) {
@@ -194,7 +195,7 @@ public class AMQ2149Test {
final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY);
if ((seqNum % TRANSACITON_BATCH) == 0) {
LOG.info(dest + " received " + seqNum);
-
+
if (transactional) {
LOG.info("committing..");
session.commit();
@@ -202,25 +203,26 @@ public class AMQ2149Test {
}
if (resumeOnNextOrPreviousIsOk) {
// after an indoubt commit we need to accept what we get (within reason)
- if (seqNum != nextExpectedSeqNum) {
- if (seqNum == nextExpectedSeqNum - (TRANSACITON_BATCH -1)) {
- nextExpectedSeqNum -= (TRANSACITON_BATCH -1);
+ if (seqNum != nextExpectedSeqNum.get()) {
+ final long l = nextExpectedSeqNum.get();
+ if (seqNum == l - (TRANSACITON_BATCH -1)) {
+ nextExpectedSeqNum.compareAndSet(l, l - (TRANSACITON_BATCH -1) );
LOG.info("In doubt commit failed, getting replay at:" + nextExpectedSeqNum);
}
}
resumeOnNextOrPreviousIsOk = false;
}
- if (seqNum != nextExpectedSeqNum) {
+ if (seqNum != nextExpectedSeqNum.get()) {
LOG.warn(dest + " received " + seqNum
+ " in msg: " + message.getJMSMessageID()
+ " expected "
+ nextExpectedSeqNum
- + ", lastId: " + lastId
+ + ", lastId: " + lastId
+ ", message:" + message);
fail(dest + " received " + seqNum + " expected "
+ nextExpectedSeqNum);
}
- ++nextExpectedSeqNum;
+ nextExpectedSeqNum.incrementAndGet();
lastId = message.getJMSMessageID();
} catch (TransactionRolledBackException expectedSometimesOnFailoverRecovery) {
LOG.info("got rollback: " + expectedSometimesOnFailoverRecovery);
@@ -228,12 +230,12 @@ public class AMQ2149Test {
// in doubt - either commit command or reply missing
// don't know if we will get a replay
resumeOnNextOrPreviousIsOk = true;
- nextExpectedSeqNum++;
+ nextExpectedSeqNum.incrementAndGet();
LOG.info("in doubt transaction completion: ok to get next or previous batch. next:" + nextExpectedSeqNum);
} else {
resumeOnNextOrPreviousIsOk = false;
// batch will be replayed
- nextExpectedSeqNum -= (TRANSACITON_BATCH -1);
+ nextExpectedSeqNum.addAndGet(-(TRANSACITON_BATCH - 1));
}
} catch (Throwable e) {
@@ -255,6 +257,7 @@ public class AMQ2149Test {
private final MessageProducer messageProducer;
private volatile long nextSequenceNumber = 0;
+ private final Object guard = new Object();
public Sender(javax.jms.Destination dest) throws JMSException {
this.dest = dest;
@@ -269,15 +272,24 @@ public class AMQ2149Test {
public void run() {
final String longString = buildLongString();
+ long nextSequenceNumber = this.nextSequenceNumber;
while (nextSequenceNumber < numtoSend) {
try {
final Message message = session
.createTextMessage(longString);
message.setLongProperty(SEQ_NUM_PROPERTY,
nextSequenceNumber);
- ++nextSequenceNumber;
- messageProducer.send(message);
-
+ synchronized (guard)
+ {
+ if (nextSequenceNumber == this.nextSequenceNumber)
+ {
+ this.nextSequenceNumber = nextSequenceNumber + 1;
+ messageProducer.send(message);
+ } else {
+ continue;
+ }
+ }
+
if ((nextSequenceNumber % 500) == 0) {
LOG.info(dest + " sent " + nextSequenceNumber);
}
@@ -353,13 +365,13 @@ public class AMQ2149Test {
// no need to run this unless there are some issues with the others
public void vanilaVerify_testOrder() throws Exception {
-
+
createBroker(new Configurer() {
public void configure(BrokerService broker) throws Exception {
- broker.deleteAllMessages();
+ broker.deleteAllMessages();
}
});
-
+
verifyOrderedMessageReceipt();
verifyStats(false);
}
@@ -368,22 +380,22 @@ public class AMQ2149Test {
public void testOrderWithRestart() throws Exception {
createBroker(new Configurer() {
public void configure(BrokerService broker) throws Exception {
- broker.deleteAllMessages();
+ broker.deleteAllMessages();
}
});
-
+
final Timer timer = new Timer();
schedualRestartTask(timer, new Configurer() {
- public void configure(BrokerService broker) throws Exception {
+ public void configure(BrokerService broker) throws Exception {
}
});
-
+
try {
verifyOrderedMessageReceipt();
} finally {
timer.cancel();
}
-
+
verifyStats(true);
}
@@ -394,16 +406,16 @@ public class AMQ2149Test {
broker.deleteAllMessages();
}
});
-
+
final Timer timer = new Timer();
schedualRestartTask(timer, null);
-
+
try {
verifyOrderedMessageReceipt(ActiveMQDestination.TOPIC_TYPE);
} finally {
timer.cancel();
}
-
+
verifyStats(true);
}
@@ -416,33 +428,33 @@ public class AMQ2149Test {
public void testTopicTransactionalOrderWithRestart() throws Exception {
doTestTransactionalOrderWithRestart(ActiveMQDestination.TOPIC_TYPE);
}
-
+
public void doTestTransactionalOrderWithRestart(byte destinationType) throws Exception {
numtoSend = 10000;
sleepBetweenSend = 3;
brokerStopPeriod = 10 * 1000;
-
+
createBroker(new Configurer() {
public void configure(BrokerService broker) throws Exception {
broker.deleteAllMessages();
}
});
-
+
final Timer timer = new Timer();
schedualRestartTask(timer, null);
-
+
try {
verifyOrderedMessageReceipt(destinationType, 1, true);
} finally {
timer.cancel();
}
-
+
verifyStats(true);
}
private void verifyStats(boolean brokerRestarts) throws Exception {
RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
-
+
for (Destination dest : regionBroker.getQueueRegion().getDestinationMap().values()) {
DestinationStatistics stats = dest.getDestinationStatistics();
if (brokerRestarts) {
@@ -453,7 +465,7 @@ public class AMQ2149Test {
+ " " + stats.getEnqueues().getCount() + " <= " +stats.getDequeues().getCount());
} else {
assertEquals("qneue/dequeue match for: " + dest.getName(),
- stats.getEnqueues().getCount(), stats.getDequeues().getCount());
+ stats.getEnqueues().getCount(), stats.getDequeues().getCount());
}
}
}
@@ -496,20 +508,20 @@ public class AMQ2149Test {
}
return task;
}
-
+
private void verifyOrderedMessageReceipt(byte destinationType) throws Exception {
verifyOrderedMessageReceipt(destinationType, NUM_SENDERS_AND_RECEIVERS, false);
}
-
+
private void verifyOrderedMessageReceipt() throws Exception {
verifyOrderedMessageReceipt(ActiveMQDestination.QUEUE_TYPE, NUM_SENDERS_AND_RECEIVERS, false);
}
-
+
private void verifyOrderedMessageReceipt(byte destinationType, int concurrentPairs, boolean transactional) throws Exception {
Vector threads = new Vector();
Vector receivers = new Vector();
-
+
for (int i = 0; i < concurrentPairs; ++i) {
final javax.jms.Destination destination =
ActiveMQDestination.createDestination("test.dest." + i, destinationType);
@@ -518,7 +530,7 @@ public class AMQ2149Test {
thread.start();
threads.add(thread);
}
-
+
final long expiry = System.currentTimeMillis() + 1000 * 60 * 4;
while(!threads.isEmpty() && exceptions.isEmpty() && System.currentTimeMillis() < expiry) {
Thread sendThread = threads.firstElement();
@@ -563,7 +575,7 @@ public class AMQ2149Test {
}
class TeardownTask implements Callable {
- private Object brokerLock;
+ private final Object brokerLock;
private BrokerService broker;
public TeardownTask(Object brokerLock, BrokerService broker) {
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2314Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2314Test.java
index 369385c8b3..08be5db1e6 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2314Test.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2314Test.java
@@ -157,7 +157,7 @@ public class AMQ2314Test extends CombinationTestSupport {
broker.setUseJmx(true);
broker.setAdvisorySupport(false);
broker.setDeleteAllMessagesOnStartup(true);
- broker.getSystemUsage().getMemoryUsage().setLimit(1024l*1024*64);
+ broker.getSystemUsage().getMemoryUsage().setLimit(1024L*1024*64);
broker.addConnector("tcp://localhost:0").setName("Default");
broker.start();
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3779Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3779Test.java
index 5a410e87a4..c7a486f60f 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3779Test.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3779Test.java
@@ -47,7 +47,7 @@ public class AMQ3779Test extends AutoFailTestSupport {
}
}
};
- logger.getRootLogger().addAppender(appender);
+ Logger.getRootLogger().addAppender(appender);
try {
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java
index 21c389f390..7c549b4ba0 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java
@@ -101,14 +101,14 @@ public class AMQ4485LowLimitTest extends JmsMultipleBrokersTestSupport {
addNetworkConnector(broker);
}
broker.setSchedulePeriodForDestinationPurge(0);
- broker.getSystemUsage().getMemoryUsage().setLimit(256 * 1024 * 1024l);
+ broker.getSystemUsage().getMemoryUsage().setLimit(256 * 1024 * 1024L);
PolicyMap policyMap = new PolicyMap();
PolicyEntry policyEntry = new PolicyEntry();
policyEntry.setExpireMessagesPeriod(0);
policyEntry.setQueuePrefetch(1000);
- policyEntry.setMemoryLimit(2 * 1024 * 1024l);
+ policyEntry.setMemoryLimit(2 * 1024 * 1024L);
policyEntry.setProducerFlowControl(false);
policyEntry.setEnableAudit(true);
policyEntry.setUseCache(true);
@@ -117,7 +117,7 @@ public class AMQ4485LowLimitTest extends JmsMultipleBrokersTestSupport {
PolicyEntry inPolicyEntry = new PolicyEntry();
inPolicyEntry.setExpireMessagesPeriod(0);
inPolicyEntry.setQueuePrefetch(1000);
- inPolicyEntry.setMemoryLimit(5 * 1024 * 1024l);
+ inPolicyEntry.setMemoryLimit(5 * 1024 * 1024L);
inPolicyEntry.setProducerFlowControl(true);
inPolicyEntry.setEnableAudit(true);
inPolicyEntry.setUseCache(true);
@@ -252,7 +252,7 @@ public class AMQ4485LowLimitTest extends JmsMultipleBrokersTestSupport {
}
return true;
}
- }, 1000 * 60 * 1000l, 20*1000));
+ }, 1000 * 60 * 1000L, 20*1000));
assertTrue("No exceptions:" + exceptions, exceptions.isEmpty());
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.java
index c2cf53ae2a..8e4e4b7a96 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.java
@@ -98,7 +98,7 @@ public class AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest extends Jms
PolicyEntry policyEntry = new PolicyEntry();
policyEntry.setExpireMessagesPeriod(0);
policyEntry.setQueuePrefetch(1000);
- policyEntry.setMemoryLimit(1024 * 1024l);
+ policyEntry.setMemoryLimit(1024 * 1024L);
policyEntry.setOptimizedDispatch(false);
policyEntry.setProducerFlowControl(false);
policyEntry.setEnableAudit(true);
@@ -171,7 +171,7 @@ public class AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest extends Jms
}
return true;
}
- }, 1000 * 60 * 1000l));
+ }, 1000 * 60 * 1000L));
assertTrue("No exceptions:" + exceptions, exceptions.isEmpty());
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java
index 265b692e6f..b567c93c05 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java
@@ -49,7 +49,7 @@ public class AMQ4607Test extends JmsMultipleBrokersTestSupport implements Uncaug
public boolean duplex = true;
protected Map consumerMap;
- Map unhandeledExceptions = new HashMap();
+ final Map unhandeledExceptions = new HashMap();
private void assertNoUnhandeledExceptions() {
for( Entry e: unhandeledExceptions.entrySet()) {
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java
index b9246fbe00..1e0ccb9127 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java
@@ -101,9 +101,9 @@ public class AMQ4636Test {
JDBCPersistenceAdapter jdbc = new TestJDBCPersistenceAdapter();
jdbc.setDataSource(embeddedDataSource);
- jdbc.setLockKeepAlivePeriod(1000l);
+ jdbc.setLockKeepAlivePeriod(1000L);
LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
- leaseDatabaseLocker.setLockAcquireSleepInterval(2000l);
+ leaseDatabaseLocker.setLockAcquireSleepInterval(2000L);
jdbc.setLocker(leaseDatabaseLocker);
broker = new BrokerService();
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java
index 0d7f44bfee..ba7ee4dd4d 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java
@@ -227,7 +227,7 @@ public class AMQ5266SingleDestTest {
}
// verify empty dlq
- assertEquals("No pending messages", 0l, ((RegionBroker) brokerService.getRegionBroker()).getDestinationStatistics().getMessages().getCount());
+ assertEquals("No pending messages", 0L, ((RegionBroker) brokerService.getRegionBroker()).getDestinationStatistics().getMessages().getCount());
}
public class ExportQueuePublisher {
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java
index f956da6d75..d71a9e42a6 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java
@@ -25,6 +25,9 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.command.ActiveMQQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
public class CraigsBugTest extends EmbeddedBrokerTestSupport {
private String connectionUri;
@@ -49,9 +52,7 @@ public class CraigsBugTest extends EmbeddedBrokerTestSupport {
conn.start();
try {
- synchronized (this) {
- wait(3000);
- }
+ new CountDownLatch(1).await(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java
index 688d066d2a..ad6df7fc00 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java
@@ -82,9 +82,9 @@ public class TrapMessageInJDBCStoreTest extends TestCase {
jdbc.setCleanupPeriod(0);
testTransactionContext = new TestTransactionContext(jdbc);
- jdbc.setLockKeepAlivePeriod(1000l);
+ jdbc.setLockKeepAlivePeriod(1000L);
LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
- leaseDatabaseLocker.setLockAcquireSleepInterval(2000l);
+ leaseDatabaseLocker.setLockAcquireSleepInterval(2000L);
jdbc.setLocker(leaseDatabaseLocker);
broker.setPersistenceAdapter(jdbc);
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsClient.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsClient.java
index c8b4503c4d..1f25109d5c 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsClient.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsClient.java
@@ -25,6 +25,7 @@ import org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent;
import javax.jms.*;
import java.io.File;
import java.net.URISyntaxException;
+import java.util.concurrent.CountDownLatch;
public class TryJmsClient
{
@@ -59,9 +60,7 @@ public class TryJmsClient
startMessageSend();
- synchronized(this) {
- this.wait();
- }
+ new CountDownLatch(1).await();
}
private void startUsageMonitor(final BrokerService brokerService) {
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsManager.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsManager.java
index 3f5898719d..c8eb7b34d8 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsManager.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsManager.java
@@ -25,6 +25,7 @@ import org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent;
import javax.jms.*;
import java.io.File;
import java.net.URISyntaxException;
+import java.util.concurrent.CountDownLatch;
public class TryJmsManager {
@@ -59,9 +60,7 @@ public class TryJmsManager {
startMessageConsumer();
- synchronized(this) {
- this.wait();
- }
+ new CountDownLatch(1).await();
}
private void startUsageMonitor(final BrokerService brokerService) {
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQStreamMessageTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQStreamMessageTest.java
index 9e0f468416..e042217ced 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQStreamMessageTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQStreamMessageTest.java
@@ -979,7 +979,7 @@ public class ActiveMQStreamMessageTest extends TestCase {
message.writeObject(new Byte((byte) 2));
message.writeObject(new Short((short) 2));
message.writeObject(new Integer(2));
- message.writeObject(new Long(2l));
+ message.writeObject(new Long(2L));
message.writeObject(new Float(2.0f));
message.writeObject(new Double(2.0d));
}catch(Exception e) {
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQTextMessageTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQTextMessageTest.java
index 28fc307c5e..13c71847c5 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQTextMessageTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQTextMessageTest.java
@@ -49,7 +49,7 @@ public class ActiveMQTextMessageTest extends TestCase {
String string = "str";
msg.setText(string);
Message copy = msg.copy();
- assertTrue(msg.getText() == ((ActiveMQTextMessage) copy).getText());
+ assertSame(msg.getText(), ((ActiveMQTextMessage) copy).getText());
}
public void testSetText() {
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/spring/ConsumerBean.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/spring/ConsumerBean.java
index 8f22c33d40..4e1ab59b32 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/spring/ConsumerBean.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/spring/ConsumerBean.java
@@ -73,14 +73,19 @@ public class ConsumerBean extends Assert implements MessageListener {
long start = System.currentTimeMillis();
- try {
- if (hasReceivedMessage()) {
- synchronized (messages) {
+ synchronized(messages)
+ {
+ try
+ {
+ while (hasReceivedMessage())
+ {
messages.wait(4000);
}
}
- } catch (InterruptedException e) {
- LOG.info("Caught: " + e);
+ catch (InterruptedException e)
+ {
+ LOG.info("Caught: " + e);
+ }
}
long end = System.currentTimeMillis() - start;
@@ -101,18 +106,18 @@ public class ConsumerBean extends Assert implements MessageListener {
LOG.info("Waiting for (" + maxRemainingMessageCount + ") message(s) to arrive");
long start = System.currentTimeMillis();
long endTime = start + maxWaitTime;
- while (maxRemainingMessageCount > 0) {
- try {
- synchronized (messages) {
+ synchronized (messages) {
+ while (maxRemainingMessageCount > 0) {
+ try {
messages.wait(1000);
+ if (hasReceivedMessages(messageCount) || System.currentTimeMillis() > endTime) {
+ break;
+ }
+ } catch (InterruptedException e) {
+ LOG.info("Caught: " + e);
}
- if (hasReceivedMessages(messageCount) || System.currentTimeMillis() > endTime) {
- break;
- }
- } catch (InterruptedException e) {
- LOG.info("Caught: " + e);
+ maxRemainingMessageCount = Math.max(0, messageCount - messages.size());
}
- maxRemainingMessageCount = Math.max(0, messageCount - messages.size());
}
long end = System.currentTimeMillis() - start;
LOG.info("End of wait for " + end + " millis");
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/spring/SpringConsumer.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/spring/SpringConsumer.java
index 118e0361c8..ed0a48af8b 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/spring/SpringConsumer.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/spring/SpringConsumer.java
@@ -43,13 +43,13 @@ public class SpringConsumer extends ConsumerBean implements MessageListener {
try {
ConnectionFactory factory = template.getConnectionFactory();
- connection = factory.createConnection();
+ final Connection c = connection = factory.createConnection();
// we might be a reusable connection in spring
// so lets only set the client ID once if its not set
- synchronized (connection) {
- if (connection.getClientID() == null) {
- connection.setClientID(myId);
+ synchronized (c) {
+ if (c.getClientID() == null) {
+ c.setClientID(myId);
}
}
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerTest.java
index df10d73e89..2502110a6e 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerTest.java
@@ -71,17 +71,17 @@ public class JDBCIOExceptionHandlerTest extends TestCase {
JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
jdbc.setDataSource(dataSource);
- jdbc.setLockKeepAlivePeriod(1000l);
+ jdbc.setLockKeepAlivePeriod(1000L);
if (leaseLocker) {
LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
leaseDatabaseLocker.setHandleStartException(true);
- leaseDatabaseLocker.setLockAcquireSleepInterval(2000l);
+ leaseDatabaseLocker.setLockAcquireSleepInterval(2000L);
jdbc.setLocker(leaseDatabaseLocker);
}
broker.setPersistenceAdapter(jdbc);
LeaseLockerIOExceptionHandler ioExceptionHandler = new LeaseLockerIOExceptionHandler();
- ioExceptionHandler.setResumeCheckSleepPeriod(1000l);
+ ioExceptionHandler.setResumeCheckSleepPeriod(1000L);
ioExceptionHandler.setStopStartConnectors(startStopConnectors);
broker.setIoExceptionHandler(ioExceptionHandler);
String connectionUri = broker.addConnector(TRANSPORT_URL).getPublishableConnectString();
@@ -129,18 +129,18 @@ public class JDBCIOExceptionHandlerTest extends TestCase {
JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
jdbc.setDataSource(dataSource);
- jdbc.setLockKeepAlivePeriod(1000l);
+ jdbc.setLockKeepAlivePeriod(1000L);
if (lease) {
LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
leaseDatabaseLocker.setHandleStartException(true);
- leaseDatabaseLocker.setLockAcquireSleepInterval(2000l);
+ leaseDatabaseLocker.setLockAcquireSleepInterval(2000L);
jdbc.setLocker(leaseDatabaseLocker);
}
broker.setPersistenceAdapter(jdbc);
LeaseLockerIOExceptionHandler ioExceptionHandler = new LeaseLockerIOExceptionHandler();
- ioExceptionHandler.setResumeCheckSleepPeriod(1000l);
+ ioExceptionHandler.setResumeCheckSleepPeriod(1000L);
ioExceptionHandler.setStopStartConnectors(false);
broker.setIoExceptionHandler(ioExceptionHandler);
slave.set(broker);
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java
index 34796a4a3b..2399738ea9 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java
@@ -150,7 +150,7 @@ public class JDBCMessagePriorityTest extends MessagePriorityTest {
for (int priority = 0; priority < maxPriority; priority++) {
producers.add(new ProducerThread(topic, MSG_NUM, priority));
messageCounts[priority] = new AtomicInteger(0);
- messageIds[priority] = 1l;
+ messageIds[priority] = 1L;
}
for (ProducerThread producer : producers) {
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/LeaseDatabaseLockerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/LeaseDatabaseLockerTest.java
index 774d172bb1..6042ae6e53 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/LeaseDatabaseLockerTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/LeaseDatabaseLockerTest.java
@@ -188,7 +188,7 @@ public class LeaseDatabaseLockerTest {
statement = connection.prepareStatement(jdbc.getStatements().getLeaseUpdateStatement());
statement.setString(1, null);
- statement.setLong(2, 0l);
+ statement.setLong(2, 0L);
statement.setString(3, fakeId);
assertEquals("we released " + fakeId, 1, statement.executeUpdate());
LOG.info("released " + fakeId);
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java
index 15abe3d764..4ccb51efe6 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java
@@ -189,7 +189,7 @@ public class KahaDBFastEnqueueTest {
MessageProducer producer = session.createProducer(destination);
Long start = System.currentTimeMillis();
- long i = 0l;
+ long i = 0L;
while ( (i=count.getAndDecrement()) > 0) {
Message message = null;
if (useBytesMessage) {
@@ -238,8 +238,8 @@ public class KahaDBFastEnqueueTest {
public void testRollover() throws Exception {
byte flip = 0x1;
for (long i=0; i exceptions = new Vector();
ExecutorService executor;
@@ -617,7 +613,7 @@ public class PListTest {
}
}
- Map locks = new HashMap();
+ final Map locks = new HashMap();
private Object plistLocks(PList plist) {
Object lock = null;
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java
index f3926628f1..b07c8cce6b 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java
@@ -250,7 +250,7 @@ public class JMSInputStreamTest extends JmsTestSupport {
}
out.flush();
synchronized (complete) {
- if (!complete.get()) {
+ while (!complete.get()) {
complete.wait(30000);
}
}
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/StubTransport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/StubTransport.java
index 8fb70ec966..a11d45a5c2 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/StubTransport.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/StubTransport.java
@@ -19,6 +19,7 @@ package org.apache.activemq.transport;
import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.util.ServiceStopper;
@@ -29,7 +30,7 @@ import org.apache.activemq.util.ServiceStopper;
public class StubTransport extends TransportSupport {
private Queue