This closes #26 error prone errors and fixes on activemq5-tests

This commit is contained in:
Clebert Suconic 2015-06-12 10:38:24 -04:00
commit 29d59e970b
52 changed files with 234 additions and 235 deletions

View File

@ -20,7 +20,7 @@
<parent> <parent>
<groupId>org.apache.activemq.tests</groupId> <groupId>org.apache.activemq.tests</groupId>
<artifactId>artemis-tests-pom</artifactId> <artifactId>artemis-tests-pom</artifactId>
<version>1.0.0-SNAPSHOT</version> <version>1.0.1-SNAPSHOT</version>
</parent> </parent>
<artifactId>activemq5-unit-tests</artifactId> <artifactId>activemq5-unit-tests</artifactId>

View File

@ -38,7 +38,7 @@ import org.apache.activemq.broker.BrokerService;
public class ArtemisBrokerWrapper extends ArtemisBrokerBase public class ArtemisBrokerWrapper extends ArtemisBrokerBase
{ {
protected Map<String, SimpleString> testQueues = new HashMap<String, SimpleString>(); protected final Map<String, SimpleString> testQueues = new HashMap<String, SimpleString>();
public ArtemisBrokerWrapper(BrokerService brokerService) public ArtemisBrokerWrapper(BrokerService brokerService)
{ {

View File

@ -36,7 +36,7 @@ public class JmsCreateConsumerInOnMessageTest extends TestSupport implements Mes
private MessageConsumer testConsumer; private MessageConsumer testConsumer;
private MessageProducer producer; private MessageProducer producer;
private Topic topic; private Topic topic;
private Object lock = new Object(); private final Object lock = new Object();
/* /*
* @see junit.framework.TestCase#setUp() * @see junit.framework.TestCase#setUp()
@ -71,8 +71,8 @@ public class JmsCreateConsumerInOnMessageTest extends TestSupport implements Mes
public void testCreateConsumer() throws Exception { public void testCreateConsumer() throws Exception {
Message msg = super.createMessage(); Message msg = super.createMessage();
producer.send(msg); producer.send(msg);
if (testConsumer == null) { synchronized (lock) {
synchronized (lock) { while(testConsumer == null) {
lock.wait(3000); lock.wait(3000);
} }
} }

View File

@ -83,8 +83,6 @@ public class JmsMultipleClientsTestSupport {
protected List<Connection> connections = Collections.synchronizedList(new ArrayList<Connection>()); protected List<Connection> connections = Collections.synchronizedList(new ArrayList<Connection>());
protected MessageIdList allMessagesList = new MessageIdList(); protected MessageIdList allMessagesList = new MessageIdList();
private AtomicInteger producerLock;
protected void startProducers(Destination dest, int msgCount) throws Exception { protected void startProducers(Destination dest, int msgCount) throws Exception {
startProducers(createConnectionFactory(), dest, msgCount); startProducers(createConnectionFactory(), dest, msgCount);
} }
@ -92,7 +90,7 @@ public class JmsMultipleClientsTestSupport {
protected void startProducers(final ConnectionFactory factory, final Destination dest, final int msgCount) throws Exception { protected void startProducers(final ConnectionFactory factory, final Destination dest, final int msgCount) throws Exception {
// Use concurrent send // Use concurrent send
if (useConcurrentSend) { if (useConcurrentSend) {
producerLock = new AtomicInteger(producerCount); final AtomicInteger producerLock = new AtomicInteger(producerCount);
for (int i = 0; i < producerCount; i++) { for (int i = 0; i < producerCount; i++) {
Thread t = new Thread(new Runnable() { Thread t = new Thread(new Runnable() {

View File

@ -61,7 +61,7 @@ public class LargeMessageTestSupport extends ClientTestSupport implements Messag
protected int deliveryMode = DeliveryMode.PERSISTENT; protected int deliveryMode = DeliveryMode.PERSISTENT;
protected IdGenerator idGen = new IdGenerator(); protected IdGenerator idGen = new IdGenerator();
protected boolean validMessageConsumption = true; protected boolean validMessageConsumption = true;
protected AtomicInteger messageCount = new AtomicInteger(0); protected final AtomicInteger messageCount = new AtomicInteger(0);
protected int prefetchValue = 10000000; protected int prefetchValue = 10000000;
@ -182,9 +182,9 @@ public class LargeMessageTestSupport extends ClientTestSupport implements Messag
producer.send(msg); producer.send(msg);
} }
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
while (now + 60000 > System.currentTimeMillis() && messageCount.get() < MESSAGE_COUNT) { synchronized (messageCount) {
LOG.info("message count = " + messageCount); while (now + 60000 > System.currentTimeMillis() && messageCount.get() < MESSAGE_COUNT) {
synchronized (messageCount) { LOG.info("message count = " + messageCount);
messageCount.wait(1000); messageCount.wait(1000);
} }
} }

View File

@ -154,7 +154,7 @@ public class OnePrefetchAsyncConsumerTest extends EmbeddedBrokerTestSupport {
} }
private class TestServerSession implements ServerSession { private class TestServerSession implements ServerSession {
TestServerSessionPool pool; final TestServerSessionPool pool;
Session session; Session session;
public TestServerSession(TestServerSessionPool pool) throws JMSException { public TestServerSession(TestServerSessionPool pool) throws JMSException {

View File

@ -43,7 +43,7 @@ public class DbRestartJDBCQueueMasterSlaveLeaseTest extends DbRestartJDBCQueueMa
LeaseLockerIOExceptionHandler ioExceptionHandler = new LeaseLockerIOExceptionHandler(); LeaseLockerIOExceptionHandler ioExceptionHandler = new LeaseLockerIOExceptionHandler();
ioExceptionHandler.setIgnoreSQLExceptions(false); ioExceptionHandler.setIgnoreSQLExceptions(false);
ioExceptionHandler.setStopStartConnectors(false); ioExceptionHandler.setStopStartConnectors(false);
ioExceptionHandler.setResumeCheckSleepPeriod(500l); ioExceptionHandler.setResumeCheckSleepPeriod(500L);
brokerService.setIoExceptionHandler(ioExceptionHandler); brokerService.setIoExceptionHandler(ioExceptionHandler);
} }

View File

@ -82,7 +82,7 @@ public class AbortSlowConsumer1Test extends AbortSlowConsumerBase {
consumer.close(); consumer.close();
TimeUnit.SECONDS.sleep(5); TimeUnit.SECONDS.sleep(5);
assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty()); assertTrue("no exceptions : " + exceptions, exceptions.isEmpty());
} }
@Test(timeout = 60 * 1000) @Test(timeout = 60 * 1000)
@ -99,6 +99,6 @@ public class AbortSlowConsumer1Test extends AbortSlowConsumerBase {
conn.close(); conn.close();
TimeUnit.SECONDS.sleep(5); TimeUnit.SECONDS.sleep(5);
assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty()); assertTrue("no exceptions : " + exceptions, exceptions.isEmpty());
} }
} }

View File

@ -67,7 +67,7 @@ public class QueuePurgeTest extends CombinationTestSupport {
broker.setDataDirectoryFile(testDataDir); broker.setDataDirectoryFile(testDataDir);
broker.setUseJmx(true); broker.setUseJmx(true);
broker.setDeleteAllMessagesOnStartup(true); broker.setDeleteAllMessagesOnStartup(true);
broker.getSystemUsage().getMemoryUsage().setLimit(1024l*1024*64); broker.getSystemUsage().getMemoryUsage().setLimit(1024L*1024*64);
KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter(); KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
persistenceAdapter.setDirectory(new File(testDataDir, "kahadb")); persistenceAdapter.setDirectory(new File(testDataDir, "kahadb"));
broker.setPersistenceAdapter(persistenceAdapter); broker.setPersistenceAdapter(persistenceAdapter);

View File

@ -54,7 +54,7 @@ public class QueueResendDuringShutdownTest {
private Connection producerConnection; private Connection producerConnection;
private Queue queue; private Queue queue;
private Object messageReceiveSync = new Object(); private final Object messageReceiveSync = new Object();
private int receiveCount; private int receiveCount;
@Before @Before
@ -239,7 +239,7 @@ public class QueueResendDuringShutdownTest {
protected void waitForMessage (long delayMs) { protected void waitForMessage (long delayMs) {
try { try {
synchronized ( this.messageReceiveSync ) { synchronized ( this.messageReceiveSync ) {
if ( this.receiveCount == 0 ) { while ( this.receiveCount == 0 ) {
this.messageReceiveSync.wait(delayMs); this.messageReceiveSync.wait(delayMs);
} }
} }

View File

@ -106,7 +106,7 @@ public class StoreQueueCursorOrderTest {
@Override @Override
public void run() { public void run() {
} }
}, 2l) {}; }, 2L) {};
msg.getMessageId().setFutureOrSequenceLong(future); msg.getMessageId().setFutureOrSequenceLong(future);
underTest.addMessageLast(msg); underTest.addMessageLast(msg);
@ -116,12 +116,12 @@ public class StoreQueueCursorOrderTest {
msg = getMessage(1); msg = getMessage(1);
messages[0] = msg; messages[0] = msg;
msg.setMemoryUsage(systemUsage.getMemoryUsage()); msg.setMemoryUsage(systemUsage.getMemoryUsage());
msg.getMessageId().setFutureOrSequenceLong(1l); msg.getMessageId().setFutureOrSequenceLong(1L);
underTest.addMessageLast(msg); underTest.addMessageLast(msg);
assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled()); assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled());
assertEquals("setBatch unset", 0l, queueMessageStore.batch.get()); assertEquals("setBatch unset", 0L, queueMessageStore.batch.get());
int dequeueCount = 0; int dequeueCount = 0;
@ -171,9 +171,9 @@ public class StoreQueueCursorOrderTest {
FutureTask<Long> future = new FutureTask<Long>(new Runnable() { FutureTask<Long> future = new FutureTask<Long>(new Runnable() {
@Override @Override
public void run() { public void run() {
msgRef.getMessageId().setFutureOrSequenceLong(1l); msgRef.getMessageId().setFutureOrSequenceLong(1L);
} }
}, 1l) {}; }, 1L) {};
msg.getMessageId().setFutureOrSequenceLong(future); msg.getMessageId().setFutureOrSequenceLong(future);
Executors.newSingleThreadExecutor().submit(future); Executors.newSingleThreadExecutor().submit(future);
underTest.addMessageLast(msg); underTest.addMessageLast(msg);
@ -184,12 +184,12 @@ public class StoreQueueCursorOrderTest {
msg = getMessage(1); msg = getMessage(1);
messages[0] = msg; messages[0] = msg;
msg.setMemoryUsage(systemUsage.getMemoryUsage()); msg.setMemoryUsage(systemUsage.getMemoryUsage());
msg.getMessageId().setFutureOrSequenceLong(1l); msg.getMessageId().setFutureOrSequenceLong(1L);
underTest.addMessageLast(msg); underTest.addMessageLast(msg);
assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled()); assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled());
assertEquals("setBatch unset", 0l, queueMessageStore.batch.get()); assertEquals("setBatch unset", 0L, queueMessageStore.batch.get());
int dequeueCount = 0; int dequeueCount = 0;
@ -239,9 +239,9 @@ public class StoreQueueCursorOrderTest {
FutureTask<Long> future = new FutureTask<Long>(new Runnable() { FutureTask<Long> future = new FutureTask<Long>(new Runnable() {
@Override @Override
public void run() { public void run() {
msgRef.getMessageId().setFutureOrSequenceLong(0l); msgRef.getMessageId().setFutureOrSequenceLong(0L);
} }
}, 0l) {}; }, 0L) {};
msg.getMessageId().setFutureOrSequenceLong(future); msg.getMessageId().setFutureOrSequenceLong(future);
Executors.newSingleThreadExecutor().submit(future); Executors.newSingleThreadExecutor().submit(future);
underTest.addMessageLast(msg); underTest.addMessageLast(msg);
@ -257,16 +257,16 @@ public class StoreQueueCursorOrderTest {
FutureTask<Long> future2 = new FutureTask<Long>(new Runnable() { FutureTask<Long> future2 = new FutureTask<Long>(new Runnable() {
@Override @Override
public void run() { public void run() {
msgRe2f.getMessageId().setFutureOrSequenceLong(1l); msgRe2f.getMessageId().setFutureOrSequenceLong(1L);
} }
}, 1l) {}; }, 1L) {};
msg.getMessageId().setFutureOrSequenceLong(future2); msg.getMessageId().setFutureOrSequenceLong(future2);
Executors.newSingleThreadExecutor().submit(future2); Executors.newSingleThreadExecutor().submit(future2);
underTest.addMessageLast(msg); underTest.addMessageLast(msg);
assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled()); assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled());
assertEquals("setBatch set", 1l, queueMessageStore.batch.get()); assertEquals("setBatch set", 1L, queueMessageStore.batch.get());
int dequeueCount = 0; int dequeueCount = 0;
@ -316,9 +316,9 @@ public class StoreQueueCursorOrderTest {
FutureTask<Long> future0 = new FutureTask<Long>(new Runnable() { FutureTask<Long> future0 = new FutureTask<Long>(new Runnable() {
@Override @Override
public void run() { public void run() {
msgRef.getMessageId().setFutureOrSequenceLong(0l); msgRef.getMessageId().setFutureOrSequenceLong(0L);
} }
}, 0l) {}; }, 0L) {};
msg.getMessageId().setFutureOrSequenceLong(future0); msg.getMessageId().setFutureOrSequenceLong(future0);
underTest.addMessageLast(msg); underTest.addMessageLast(msg);
Executors.newSingleThreadExecutor().submit(future0); Executors.newSingleThreadExecutor().submit(future0);
@ -332,9 +332,9 @@ public class StoreQueueCursorOrderTest {
FutureTask<Long> future1 = new FutureTask<Long>(new Runnable() { FutureTask<Long> future1 = new FutureTask<Long>(new Runnable() {
@Override @Override
public void run() { public void run() {
msgRef1.getMessageId().setFutureOrSequenceLong(3l); msgRef1.getMessageId().setFutureOrSequenceLong(3L);
} }
}, 3l) {}; }, 3L) {};
msg.getMessageId().setFutureOrSequenceLong(future1); msg.getMessageId().setFutureOrSequenceLong(future1);
underTest.addMessageLast(msg); underTest.addMessageLast(msg);
@ -342,7 +342,7 @@ public class StoreQueueCursorOrderTest {
msg = getMessage(2); msg = getMessage(2);
messages[1] = msg; messages[1] = msg;
msg.setMemoryUsage(systemUsage.getMemoryUsage()); msg.setMemoryUsage(systemUsage.getMemoryUsage());
msg.getMessageId().setFutureOrSequenceLong(1l); msg.getMessageId().setFutureOrSequenceLong(1L);
underTest.addMessageLast(msg); underTest.addMessageLast(msg);
assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled()); assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled());
@ -354,12 +354,12 @@ public class StoreQueueCursorOrderTest {
msg = getMessage(3); msg = getMessage(3);
messages[2] = msg; messages[2] = msg;
msg.setMemoryUsage(systemUsage.getMemoryUsage()); msg.setMemoryUsage(systemUsage.getMemoryUsage());
msg.getMessageId().setFutureOrSequenceLong(3l); msg.getMessageId().setFutureOrSequenceLong(3L);
underTest.addMessageLast(msg); underTest.addMessageLast(msg);
assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled()); assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled());
assertEquals("setBatch set", 2l, queueMessageStore.batch.get()); assertEquals("setBatch set", 2L, queueMessageStore.batch.get());
int dequeueCount = 0; int dequeueCount = 0;
@ -405,13 +405,13 @@ public class StoreQueueCursorOrderTest {
ActiveMQTextMessage msg = getMessage(0); ActiveMQTextMessage msg = getMessage(0);
messages[0] = msg; messages[0] = msg;
msg.setMemoryUsage(systemUsage.getMemoryUsage()); msg.setMemoryUsage(systemUsage.getMemoryUsage());
msg.getMessageId().setFutureOrSequenceLong(0l); msg.getMessageId().setFutureOrSequenceLong(0L);
underTest.addMessageLast(msg); underTest.addMessageLast(msg);
msg = getMessage(1); msg = getMessage(1);
messages[1] = msg; messages[1] = msg;
msg.setMemoryUsage(systemUsage.getMemoryUsage()); msg.setMemoryUsage(systemUsage.getMemoryUsage());
msg.getMessageId().setFutureOrSequenceLong(1l); msg.getMessageId().setFutureOrSequenceLong(1L);
underTest.addMessageLast(msg); underTest.addMessageLast(msg);
assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled()); assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled());
@ -419,12 +419,12 @@ public class StoreQueueCursorOrderTest {
msg = getMessage(2); msg = getMessage(2);
messages[2] = msg; messages[2] = msg;
msg.setMemoryUsage(systemUsage.getMemoryUsage()); msg.setMemoryUsage(systemUsage.getMemoryUsage());
msg.getMessageId().setFutureOrSequenceLong(2l); msg.getMessageId().setFutureOrSequenceLong(2L);
underTest.addMessageLast(msg); underTest.addMessageLast(msg);
assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled()); assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled());
assertEquals("setBatch set", 2l, queueMessageStore.batch.get()); assertEquals("setBatch set", 2L, queueMessageStore.batch.get());
int dequeueCount = 0; int dequeueCount = 0;

View File

@ -260,7 +260,7 @@ public class JmsSchedulerTest extends JobSchedulerTestSupport {
// wait for the producer to block, which should happen immediately, and also wait long // wait for the producer to block, which should happen immediately, and also wait long
// enough for the delay to elapse. We should see no deliveries as the send should block // enough for the delay to elapse. We should see no deliveries as the send should block
// on the first message. // on the first message.
Thread.sleep(10000l); Thread.sleep(10000L);
assertEquals(100, latch.getCount()); assertEquals(100, latch.getCount());
@ -268,7 +268,7 @@ public class JmsSchedulerTest extends JobSchedulerTestSupport {
broker.getSystemUsage().getJobSchedulerUsage().setLimit(1024 * 1024 * 33); broker.getSystemUsage().getJobSchedulerUsage().setLimit(1024 * 1024 * 33);
// Wait long enough that the messages are enqueued and the delivery delay has elapsed. // Wait long enough that the messages are enqueued and the delivery delay has elapsed.
Thread.sleep(10000l); Thread.sleep(10000L);
// Make sure we sent all the messages we expected to send // Make sure we sent all the messages we expected to send
Wait.waitFor(new Wait.Condition() { Wait.waitFor(new Wait.Condition() {
@ -276,12 +276,12 @@ public class JmsSchedulerTest extends JobSchedulerTestSupport {
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
return producer.getSentCount() == producer.getMessageCount(); return producer.getSentCount() == producer.getMessageCount();
} }
}, 20000l); }, 20000L);
assertEquals("Producer didn't send all messages", producer.getMessageCount(), producer.getSentCount()); assertEquals("Producer didn't send all messages", producer.getMessageCount(), producer.getSentCount());
// Make sure we got all the messages we expected to get // Make sure we got all the messages we expected to get
latch.await(20000l, TimeUnit.MILLISECONDS); latch.await(20000L, TimeUnit.MILLISECONDS);
assertEquals("Consumer did not receive all messages.", 0, latch.getCount()); assertEquals("Consumer did not receive all messages.", 0, latch.getCount());
} }

View File

@ -30,6 +30,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.*; import javax.jms.*;
@ -65,22 +66,22 @@ public class AMQ2149Test {
private static final String BROKER_CONNECTOR = "tcp://localhost:61617"; private static final String BROKER_CONNECTOR = "tcp://localhost:61617";
private static final String DEFAULT_BROKER_URL = "failover:("+ BROKER_CONNECTOR private static final String DEFAULT_BROKER_URL = "failover:("+ BROKER_CONNECTOR
+")?maxReconnectDelay=1000&useExponentialBackOff=false"; +")?maxReconnectDelay=1000&useExponentialBackOff=false";
private final String SEQ_NUM_PROPERTY = "seqNum"; private final String SEQ_NUM_PROPERTY = "seqNum";
final int MESSAGE_LENGTH_BYTES = 75 * 1024; final int MESSAGE_LENGTH_BYTES = 75 * 1024;
final long SLEEP_BETWEEN_SEND_MS = 25; final long SLEEP_BETWEEN_SEND_MS = 25;
final int NUM_SENDERS_AND_RECEIVERS = 10; final int NUM_SENDERS_AND_RECEIVERS = 10;
final Object brokerLock = new Object(); final Object brokerLock = new Object();
private static final long DEFAULT_BROKER_STOP_PERIOD = 10 * 1000; private static final long DEFAULT_BROKER_STOP_PERIOD = 10 * 1000;
private static final long DEFAULT_NUM_TO_SEND = 1400; private static final long DEFAULT_NUM_TO_SEND = 1400;
long brokerStopPeriod = DEFAULT_BROKER_STOP_PERIOD; long brokerStopPeriod = DEFAULT_BROKER_STOP_PERIOD;
long numtoSend = DEFAULT_NUM_TO_SEND; long numtoSend = DEFAULT_NUM_TO_SEND;
long sleepBetweenSend = SLEEP_BETWEEN_SEND_MS; long sleepBetweenSend = SLEEP_BETWEEN_SEND_MS;
String brokerURL = DEFAULT_BROKER_URL; String brokerURL = DEFAULT_BROKER_URL;
int numBrokerRestarts = 0; int numBrokerRestarts = 0;
final static int MAX_BROKER_RESTARTS = 4; final static int MAX_BROKER_RESTARTS = 4;
BrokerService broker; BrokerService broker;
@ -88,15 +89,15 @@ public class AMQ2149Test {
protected File dataDirFile; protected File dataDirFile;
final LoggingBrokerPlugin[] plugins = new LoggingBrokerPlugin[]{new LoggingBrokerPlugin()}; final LoggingBrokerPlugin[] plugins = new LoggingBrokerPlugin[]{new LoggingBrokerPlugin()};
public void createBroker(Configurer configurer) throws Exception { public void createBroker(Configurer configurer) throws Exception {
broker = new BrokerService(); broker = new BrokerService();
configurePersistenceAdapter(broker); configurePersistenceAdapter(broker);
broker.getSystemUsage().getMemoryUsage().setLimit(MESSAGE_LENGTH_BYTES * 200 * NUM_SENDERS_AND_RECEIVERS); broker.getSystemUsage().getMemoryUsage().setLimit(MESSAGE_LENGTH_BYTES * 200 * NUM_SENDERS_AND_RECEIVERS);
broker.addConnector(BROKER_CONNECTOR); broker.addConnector(BROKER_CONNECTOR);
broker.setBrokerName(testName.getMethodName()); broker.setBrokerName(testName.getMethodName());
broker.setDataDirectoryFile(dataDirFile); broker.setDataDirectoryFile(dataDirFile);
if (configurer != null) { if (configurer != null) {
@ -104,7 +105,7 @@ public class AMQ2149Test {
} }
broker.start(); broker.start();
} }
protected void configurePersistenceAdapter(BrokerService brokerService) throws Exception { protected void configurePersistenceAdapter(BrokerService brokerService) throws Exception {
} }
@ -135,7 +136,7 @@ public class AMQ2149Test {
executor.shutdownNow(); executor.shutdownNow();
exceptions.clear(); exceptions.clear();
} }
private String buildLongString() { private String buildLongString() {
final StringBuilder stringBuilder = new StringBuilder( final StringBuilder stringBuilder = new StringBuilder(
MESSAGE_LENGTH_BYTES); MESSAGE_LENGTH_BYTES);
@ -156,8 +157,8 @@ public class AMQ2149Test {
private final MessageConsumer messageConsumer; private final MessageConsumer messageConsumer;
private volatile long nextExpectedSeqNum = 0; private AtomicLong nextExpectedSeqNum = new AtomicLong();
private final boolean transactional; private final boolean transactional;
private String lastId = null; private String lastId = null;
@ -182,11 +183,11 @@ public class AMQ2149Test {
public void close() throws JMSException { public void close() throws JMSException {
connection.close(); connection.close();
} }
public long getNextExpectedSeqNo() { public long getNextExpectedSeqNo() {
return nextExpectedSeqNum; return nextExpectedSeqNum.get();
} }
final int TRANSACITON_BATCH = 500; final int TRANSACITON_BATCH = 500;
boolean resumeOnNextOrPreviousIsOk = false; boolean resumeOnNextOrPreviousIsOk = false;
public void onMessage(Message message) { public void onMessage(Message message) {
@ -194,7 +195,7 @@ public class AMQ2149Test {
final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY); final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY);
if ((seqNum % TRANSACITON_BATCH) == 0) { if ((seqNum % TRANSACITON_BATCH) == 0) {
LOG.info(dest + " received " + seqNum); LOG.info(dest + " received " + seqNum);
if (transactional) { if (transactional) {
LOG.info("committing.."); LOG.info("committing..");
session.commit(); session.commit();
@ -202,25 +203,26 @@ public class AMQ2149Test {
} }
if (resumeOnNextOrPreviousIsOk) { if (resumeOnNextOrPreviousIsOk) {
// after an indoubt commit we need to accept what we get (within reason) // after an indoubt commit we need to accept what we get (within reason)
if (seqNum != nextExpectedSeqNum) { if (seqNum != nextExpectedSeqNum.get()) {
if (seqNum == nextExpectedSeqNum - (TRANSACITON_BATCH -1)) { final long l = nextExpectedSeqNum.get();
nextExpectedSeqNum -= (TRANSACITON_BATCH -1); if (seqNum == l - (TRANSACITON_BATCH -1)) {
nextExpectedSeqNum.compareAndSet(l, l - (TRANSACITON_BATCH -1) );
LOG.info("In doubt commit failed, getting replay at:" + nextExpectedSeqNum); LOG.info("In doubt commit failed, getting replay at:" + nextExpectedSeqNum);
} }
} }
resumeOnNextOrPreviousIsOk = false; resumeOnNextOrPreviousIsOk = false;
} }
if (seqNum != nextExpectedSeqNum) { if (seqNum != nextExpectedSeqNum.get()) {
LOG.warn(dest + " received " + seqNum LOG.warn(dest + " received " + seqNum
+ " in msg: " + message.getJMSMessageID() + " in msg: " + message.getJMSMessageID()
+ " expected " + " expected "
+ nextExpectedSeqNum + nextExpectedSeqNum
+ ", lastId: " + lastId + ", lastId: " + lastId
+ ", message:" + message); + ", message:" + message);
fail(dest + " received " + seqNum + " expected " fail(dest + " received " + seqNum + " expected "
+ nextExpectedSeqNum); + nextExpectedSeqNum);
} }
++nextExpectedSeqNum; nextExpectedSeqNum.incrementAndGet();
lastId = message.getJMSMessageID(); lastId = message.getJMSMessageID();
} catch (TransactionRolledBackException expectedSometimesOnFailoverRecovery) { } catch (TransactionRolledBackException expectedSometimesOnFailoverRecovery) {
LOG.info("got rollback: " + expectedSometimesOnFailoverRecovery); LOG.info("got rollback: " + expectedSometimesOnFailoverRecovery);
@ -228,12 +230,12 @@ public class AMQ2149Test {
// in doubt - either commit command or reply missing // in doubt - either commit command or reply missing
// don't know if we will get a replay // don't know if we will get a replay
resumeOnNextOrPreviousIsOk = true; resumeOnNextOrPreviousIsOk = true;
nextExpectedSeqNum++; nextExpectedSeqNum.incrementAndGet();
LOG.info("in doubt transaction completion: ok to get next or previous batch. next:" + nextExpectedSeqNum); LOG.info("in doubt transaction completion: ok to get next or previous batch. next:" + nextExpectedSeqNum);
} else { } else {
resumeOnNextOrPreviousIsOk = false; resumeOnNextOrPreviousIsOk = false;
// batch will be replayed // batch will be replayed
nextExpectedSeqNum -= (TRANSACITON_BATCH -1); nextExpectedSeqNum.addAndGet(-(TRANSACITON_BATCH - 1));
} }
} catch (Throwable e) { } catch (Throwable e) {
@ -255,6 +257,7 @@ public class AMQ2149Test {
private final MessageProducer messageProducer; private final MessageProducer messageProducer;
private volatile long nextSequenceNumber = 0; private volatile long nextSequenceNumber = 0;
private final Object guard = new Object();
public Sender(javax.jms.Destination dest) throws JMSException { public Sender(javax.jms.Destination dest) throws JMSException {
this.dest = dest; this.dest = dest;
@ -269,15 +272,24 @@ public class AMQ2149Test {
public void run() { public void run() {
final String longString = buildLongString(); final String longString = buildLongString();
long nextSequenceNumber = this.nextSequenceNumber;
while (nextSequenceNumber < numtoSend) { while (nextSequenceNumber < numtoSend) {
try { try {
final Message message = session final Message message = session
.createTextMessage(longString); .createTextMessage(longString);
message.setLongProperty(SEQ_NUM_PROPERTY, message.setLongProperty(SEQ_NUM_PROPERTY,
nextSequenceNumber); nextSequenceNumber);
++nextSequenceNumber; synchronized (guard)
messageProducer.send(message); {
if (nextSequenceNumber == this.nextSequenceNumber)
{
this.nextSequenceNumber = nextSequenceNumber + 1;
messageProducer.send(message);
} else {
continue;
}
}
if ((nextSequenceNumber % 500) == 0) { if ((nextSequenceNumber % 500) == 0) {
LOG.info(dest + " sent " + nextSequenceNumber); LOG.info(dest + " sent " + nextSequenceNumber);
} }
@ -353,13 +365,13 @@ public class AMQ2149Test {
// no need to run this unless there are some issues with the others // no need to run this unless there are some issues with the others
public void vanilaVerify_testOrder() throws Exception { public void vanilaVerify_testOrder() throws Exception {
createBroker(new Configurer() { createBroker(new Configurer() {
public void configure(BrokerService broker) throws Exception { public void configure(BrokerService broker) throws Exception {
broker.deleteAllMessages(); broker.deleteAllMessages();
} }
}); });
verifyOrderedMessageReceipt(); verifyOrderedMessageReceipt();
verifyStats(false); verifyStats(false);
} }
@ -368,22 +380,22 @@ public class AMQ2149Test {
public void testOrderWithRestart() throws Exception { public void testOrderWithRestart() throws Exception {
createBroker(new Configurer() { createBroker(new Configurer() {
public void configure(BrokerService broker) throws Exception { public void configure(BrokerService broker) throws Exception {
broker.deleteAllMessages(); broker.deleteAllMessages();
} }
}); });
final Timer timer = new Timer(); final Timer timer = new Timer();
schedualRestartTask(timer, new Configurer() { schedualRestartTask(timer, new Configurer() {
public void configure(BrokerService broker) throws Exception { public void configure(BrokerService broker) throws Exception {
} }
}); });
try { try {
verifyOrderedMessageReceipt(); verifyOrderedMessageReceipt();
} finally { } finally {
timer.cancel(); timer.cancel();
} }
verifyStats(true); verifyStats(true);
} }
@ -394,16 +406,16 @@ public class AMQ2149Test {
broker.deleteAllMessages(); broker.deleteAllMessages();
} }
}); });
final Timer timer = new Timer(); final Timer timer = new Timer();
schedualRestartTask(timer, null); schedualRestartTask(timer, null);
try { try {
verifyOrderedMessageReceipt(ActiveMQDestination.TOPIC_TYPE); verifyOrderedMessageReceipt(ActiveMQDestination.TOPIC_TYPE);
} finally { } finally {
timer.cancel(); timer.cancel();
} }
verifyStats(true); verifyStats(true);
} }
@ -416,33 +428,33 @@ public class AMQ2149Test {
public void testTopicTransactionalOrderWithRestart() throws Exception { public void testTopicTransactionalOrderWithRestart() throws Exception {
doTestTransactionalOrderWithRestart(ActiveMQDestination.TOPIC_TYPE); doTestTransactionalOrderWithRestart(ActiveMQDestination.TOPIC_TYPE);
} }
public void doTestTransactionalOrderWithRestart(byte destinationType) throws Exception { public void doTestTransactionalOrderWithRestart(byte destinationType) throws Exception {
numtoSend = 10000; numtoSend = 10000;
sleepBetweenSend = 3; sleepBetweenSend = 3;
brokerStopPeriod = 10 * 1000; brokerStopPeriod = 10 * 1000;
createBroker(new Configurer() { createBroker(new Configurer() {
public void configure(BrokerService broker) throws Exception { public void configure(BrokerService broker) throws Exception {
broker.deleteAllMessages(); broker.deleteAllMessages();
} }
}); });
final Timer timer = new Timer(); final Timer timer = new Timer();
schedualRestartTask(timer, null); schedualRestartTask(timer, null);
try { try {
verifyOrderedMessageReceipt(destinationType, 1, true); verifyOrderedMessageReceipt(destinationType, 1, true);
} finally { } finally {
timer.cancel(); timer.cancel();
} }
verifyStats(true); verifyStats(true);
} }
private void verifyStats(boolean brokerRestarts) throws Exception { private void verifyStats(boolean brokerRestarts) throws Exception {
RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker(); RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
for (Destination dest : regionBroker.getQueueRegion().getDestinationMap().values()) { for (Destination dest : regionBroker.getQueueRegion().getDestinationMap().values()) {
DestinationStatistics stats = dest.getDestinationStatistics(); DestinationStatistics stats = dest.getDestinationStatistics();
if (brokerRestarts) { if (brokerRestarts) {
@ -453,7 +465,7 @@ public class AMQ2149Test {
+ " " + stats.getEnqueues().getCount() + " <= " +stats.getDequeues().getCount()); + " " + stats.getEnqueues().getCount() + " <= " +stats.getDequeues().getCount());
} else { } else {
assertEquals("qneue/dequeue match for: " + dest.getName(), assertEquals("qneue/dequeue match for: " + dest.getName(),
stats.getEnqueues().getCount(), stats.getDequeues().getCount()); stats.getEnqueues().getCount(), stats.getDequeues().getCount());
} }
} }
} }
@ -496,20 +508,20 @@ public class AMQ2149Test {
} }
return task; return task;
} }
private void verifyOrderedMessageReceipt(byte destinationType) throws Exception { private void verifyOrderedMessageReceipt(byte destinationType) throws Exception {
verifyOrderedMessageReceipt(destinationType, NUM_SENDERS_AND_RECEIVERS, false); verifyOrderedMessageReceipt(destinationType, NUM_SENDERS_AND_RECEIVERS, false);
} }
private void verifyOrderedMessageReceipt() throws Exception { private void verifyOrderedMessageReceipt() throws Exception {
verifyOrderedMessageReceipt(ActiveMQDestination.QUEUE_TYPE, NUM_SENDERS_AND_RECEIVERS, false); verifyOrderedMessageReceipt(ActiveMQDestination.QUEUE_TYPE, NUM_SENDERS_AND_RECEIVERS, false);
} }
private void verifyOrderedMessageReceipt(byte destinationType, int concurrentPairs, boolean transactional) throws Exception { private void verifyOrderedMessageReceipt(byte destinationType, int concurrentPairs, boolean transactional) throws Exception {
Vector<Thread> threads = new Vector<Thread>(); Vector<Thread> threads = new Vector<Thread>();
Vector<Receiver> receivers = new Vector<Receiver>(); Vector<Receiver> receivers = new Vector<Receiver>();
for (int i = 0; i < concurrentPairs; ++i) { for (int i = 0; i < concurrentPairs; ++i) {
final javax.jms.Destination destination = final javax.jms.Destination destination =
ActiveMQDestination.createDestination("test.dest." + i, destinationType); ActiveMQDestination.createDestination("test.dest." + i, destinationType);
@ -518,7 +530,7 @@ public class AMQ2149Test {
thread.start(); thread.start();
threads.add(thread); threads.add(thread);
} }
final long expiry = System.currentTimeMillis() + 1000 * 60 * 4; final long expiry = System.currentTimeMillis() + 1000 * 60 * 4;
while(!threads.isEmpty() && exceptions.isEmpty() && System.currentTimeMillis() < expiry) { while(!threads.isEmpty() && exceptions.isEmpty() && System.currentTimeMillis() < expiry) {
Thread sendThread = threads.firstElement(); Thread sendThread = threads.firstElement();
@ -563,7 +575,7 @@ public class AMQ2149Test {
} }
class TeardownTask implements Callable<Boolean> { class TeardownTask implements Callable<Boolean> {
private Object brokerLock; private final Object brokerLock;
private BrokerService broker; private BrokerService broker;
public TeardownTask(Object brokerLock, BrokerService broker) { public TeardownTask(Object brokerLock, BrokerService broker) {

View File

@ -157,7 +157,7 @@ public class AMQ2314Test extends CombinationTestSupport {
broker.setUseJmx(true); broker.setUseJmx(true);
broker.setAdvisorySupport(false); broker.setAdvisorySupport(false);
broker.setDeleteAllMessagesOnStartup(true); broker.setDeleteAllMessagesOnStartup(true);
broker.getSystemUsage().getMemoryUsage().setLimit(1024l*1024*64); broker.getSystemUsage().getMemoryUsage().setLimit(1024L*1024*64);
broker.addConnector("tcp://localhost:0").setName("Default"); broker.addConnector("tcp://localhost:0").setName("Default");
broker.start(); broker.start();

View File

@ -47,7 +47,7 @@ public class AMQ3779Test extends AutoFailTestSupport {
} }
} }
}; };
logger.getRootLogger().addAppender(appender); Logger.getRootLogger().addAppender(appender);
try { try {

View File

@ -101,14 +101,14 @@ public class AMQ4485LowLimitTest extends JmsMultipleBrokersTestSupport {
addNetworkConnector(broker); addNetworkConnector(broker);
} }
broker.setSchedulePeriodForDestinationPurge(0); broker.setSchedulePeriodForDestinationPurge(0);
broker.getSystemUsage().getMemoryUsage().setLimit(256 * 1024 * 1024l); broker.getSystemUsage().getMemoryUsage().setLimit(256 * 1024 * 1024L);
PolicyMap policyMap = new PolicyMap(); PolicyMap policyMap = new PolicyMap();
PolicyEntry policyEntry = new PolicyEntry(); PolicyEntry policyEntry = new PolicyEntry();
policyEntry.setExpireMessagesPeriod(0); policyEntry.setExpireMessagesPeriod(0);
policyEntry.setQueuePrefetch(1000); policyEntry.setQueuePrefetch(1000);
policyEntry.setMemoryLimit(2 * 1024 * 1024l); policyEntry.setMemoryLimit(2 * 1024 * 1024L);
policyEntry.setProducerFlowControl(false); policyEntry.setProducerFlowControl(false);
policyEntry.setEnableAudit(true); policyEntry.setEnableAudit(true);
policyEntry.setUseCache(true); policyEntry.setUseCache(true);
@ -117,7 +117,7 @@ public class AMQ4485LowLimitTest extends JmsMultipleBrokersTestSupport {
PolicyEntry inPolicyEntry = new PolicyEntry(); PolicyEntry inPolicyEntry = new PolicyEntry();
inPolicyEntry.setExpireMessagesPeriod(0); inPolicyEntry.setExpireMessagesPeriod(0);
inPolicyEntry.setQueuePrefetch(1000); inPolicyEntry.setQueuePrefetch(1000);
inPolicyEntry.setMemoryLimit(5 * 1024 * 1024l); inPolicyEntry.setMemoryLimit(5 * 1024 * 1024L);
inPolicyEntry.setProducerFlowControl(true); inPolicyEntry.setProducerFlowControl(true);
inPolicyEntry.setEnableAudit(true); inPolicyEntry.setEnableAudit(true);
inPolicyEntry.setUseCache(true); inPolicyEntry.setUseCache(true);
@ -252,7 +252,7 @@ public class AMQ4485LowLimitTest extends JmsMultipleBrokersTestSupport {
} }
return true; return true;
} }
}, 1000 * 60 * 1000l, 20*1000)); }, 1000 * 60 * 1000L, 20*1000));
assertTrue("No exceptions:" + exceptions, exceptions.isEmpty()); assertTrue("No exceptions:" + exceptions, exceptions.isEmpty());

View File

@ -98,7 +98,7 @@ public class AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest extends Jms
PolicyEntry policyEntry = new PolicyEntry(); PolicyEntry policyEntry = new PolicyEntry();
policyEntry.setExpireMessagesPeriod(0); policyEntry.setExpireMessagesPeriod(0);
policyEntry.setQueuePrefetch(1000); policyEntry.setQueuePrefetch(1000);
policyEntry.setMemoryLimit(1024 * 1024l); policyEntry.setMemoryLimit(1024 * 1024L);
policyEntry.setOptimizedDispatch(false); policyEntry.setOptimizedDispatch(false);
policyEntry.setProducerFlowControl(false); policyEntry.setProducerFlowControl(false);
policyEntry.setEnableAudit(true); policyEntry.setEnableAudit(true);
@ -171,7 +171,7 @@ public class AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest extends Jms
} }
return true; return true;
} }
}, 1000 * 60 * 1000l)); }, 1000 * 60 * 1000L));
assertTrue("No exceptions:" + exceptions, exceptions.isEmpty()); assertTrue("No exceptions:" + exceptions, exceptions.isEmpty());

View File

@ -49,7 +49,7 @@ public class AMQ4607Test extends JmsMultipleBrokersTestSupport implements Uncaug
public boolean duplex = true; public boolean duplex = true;
protected Map<String, MessageConsumer> consumerMap; protected Map<String, MessageConsumer> consumerMap;
Map<Thread, Throwable> unhandeledExceptions = new HashMap<Thread, Throwable>(); final Map<Thread, Throwable> unhandeledExceptions = new HashMap<Thread, Throwable>();
private void assertNoUnhandeledExceptions() { private void assertNoUnhandeledExceptions() {
for( Entry<Thread, Throwable> e: unhandeledExceptions.entrySet()) { for( Entry<Thread, Throwable> e: unhandeledExceptions.entrySet()) {

View File

@ -101,9 +101,9 @@ public class AMQ4636Test {
JDBCPersistenceAdapter jdbc = new TestJDBCPersistenceAdapter(); JDBCPersistenceAdapter jdbc = new TestJDBCPersistenceAdapter();
jdbc.setDataSource(embeddedDataSource); jdbc.setDataSource(embeddedDataSource);
jdbc.setLockKeepAlivePeriod(1000l); jdbc.setLockKeepAlivePeriod(1000L);
LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker(); LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
leaseDatabaseLocker.setLockAcquireSleepInterval(2000l); leaseDatabaseLocker.setLockAcquireSleepInterval(2000L);
jdbc.setLocker(leaseDatabaseLocker); jdbc.setLocker(leaseDatabaseLocker);
broker = new BrokerService(); broker = new BrokerService();

View File

@ -227,7 +227,7 @@ public class AMQ5266SingleDestTest {
} }
// verify empty dlq // verify empty dlq
assertEquals("No pending messages", 0l, ((RegionBroker) brokerService.getRegionBroker()).getDestinationStatistics().getMessages().getCount()); assertEquals("No pending messages", 0L, ((RegionBroker) brokerService.getRegionBroker()).getDestinationStatistics().getMessages().getCount());
} }
public class ExportQueuePublisher { public class ExportQueuePublisher {

View File

@ -25,6 +25,9 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.EmbeddedBrokerTestSupport; import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class CraigsBugTest extends EmbeddedBrokerTestSupport { public class CraigsBugTest extends EmbeddedBrokerTestSupport {
private String connectionUri; private String connectionUri;
@ -49,9 +52,7 @@ public class CraigsBugTest extends EmbeddedBrokerTestSupport {
conn.start(); conn.start();
try { try {
synchronized (this) { new CountDownLatch(1).await(3, TimeUnit.SECONDS);
wait(3000);
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }

View File

@ -82,9 +82,9 @@ public class TrapMessageInJDBCStoreTest extends TestCase {
jdbc.setCleanupPeriod(0); jdbc.setCleanupPeriod(0);
testTransactionContext = new TestTransactionContext(jdbc); testTransactionContext = new TestTransactionContext(jdbc);
jdbc.setLockKeepAlivePeriod(1000l); jdbc.setLockKeepAlivePeriod(1000L);
LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker(); LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
leaseDatabaseLocker.setLockAcquireSleepInterval(2000l); leaseDatabaseLocker.setLockAcquireSleepInterval(2000L);
jdbc.setLocker(leaseDatabaseLocker); jdbc.setLocker(leaseDatabaseLocker);
broker.setPersistenceAdapter(jdbc); broker.setPersistenceAdapter(jdbc);

View File

@ -25,6 +25,7 @@ import org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent;
import javax.jms.*; import javax.jms.*;
import java.io.File; import java.io.File;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.concurrent.CountDownLatch;
public class TryJmsClient public class TryJmsClient
{ {
@ -59,9 +60,7 @@ public class TryJmsClient
startMessageSend(); startMessageSend();
synchronized(this) { new CountDownLatch(1).await();
this.wait();
}
} }
private void startUsageMonitor(final BrokerService brokerService) { private void startUsageMonitor(final BrokerService brokerService) {

View File

@ -25,6 +25,7 @@ import org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent;
import javax.jms.*; import javax.jms.*;
import java.io.File; import java.io.File;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.concurrent.CountDownLatch;
public class TryJmsManager { public class TryJmsManager {
@ -59,9 +60,7 @@ public class TryJmsManager {
startMessageConsumer(); startMessageConsumer();
synchronized(this) { new CountDownLatch(1).await();
this.wait();
}
} }
private void startUsageMonitor(final BrokerService brokerService) { private void startUsageMonitor(final BrokerService brokerService) {

View File

@ -979,7 +979,7 @@ public class ActiveMQStreamMessageTest extends TestCase {
message.writeObject(new Byte((byte) 2)); message.writeObject(new Byte((byte) 2));
message.writeObject(new Short((short) 2)); message.writeObject(new Short((short) 2));
message.writeObject(new Integer(2)); message.writeObject(new Integer(2));
message.writeObject(new Long(2l)); message.writeObject(new Long(2L));
message.writeObject(new Float(2.0f)); message.writeObject(new Float(2.0f));
message.writeObject(new Double(2.0d)); message.writeObject(new Double(2.0d));
}catch(Exception e) { }catch(Exception e) {

View File

@ -49,7 +49,7 @@ public class ActiveMQTextMessageTest extends TestCase {
String string = "str"; String string = "str";
msg.setText(string); msg.setText(string);
Message copy = msg.copy(); Message copy = msg.copy();
assertTrue(msg.getText() == ((ActiveMQTextMessage) copy).getText()); assertSame(msg.getText(), ((ActiveMQTextMessage) copy).getText());
} }
public void testSetText() { public void testSetText() {

View File

@ -73,14 +73,19 @@ public class ConsumerBean extends Assert implements MessageListener {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
try { synchronized(messages)
if (hasReceivedMessage()) { {
synchronized (messages) { try
{
while (hasReceivedMessage())
{
messages.wait(4000); messages.wait(4000);
} }
} }
} catch (InterruptedException e) { catch (InterruptedException e)
LOG.info("Caught: " + e); {
LOG.info("Caught: " + e);
}
} }
long end = System.currentTimeMillis() - start; long end = System.currentTimeMillis() - start;
@ -101,18 +106,18 @@ public class ConsumerBean extends Assert implements MessageListener {
LOG.info("Waiting for (" + maxRemainingMessageCount + ") message(s) to arrive"); LOG.info("Waiting for (" + maxRemainingMessageCount + ") message(s) to arrive");
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
long endTime = start + maxWaitTime; long endTime = start + maxWaitTime;
while (maxRemainingMessageCount > 0) { synchronized (messages) {
try { while (maxRemainingMessageCount > 0) {
synchronized (messages) { try {
messages.wait(1000); messages.wait(1000);
if (hasReceivedMessages(messageCount) || System.currentTimeMillis() > endTime) {
break;
}
} catch (InterruptedException e) {
LOG.info("Caught: " + e);
} }
if (hasReceivedMessages(messageCount) || System.currentTimeMillis() > endTime) { maxRemainingMessageCount = Math.max(0, messageCount - messages.size());
break;
}
} catch (InterruptedException e) {
LOG.info("Caught: " + e);
} }
maxRemainingMessageCount = Math.max(0, messageCount - messages.size());
} }
long end = System.currentTimeMillis() - start; long end = System.currentTimeMillis() - start;
LOG.info("End of wait for " + end + " millis"); LOG.info("End of wait for " + end + " millis");

View File

@ -43,13 +43,13 @@ public class SpringConsumer extends ConsumerBean implements MessageListener {
try { try {
ConnectionFactory factory = template.getConnectionFactory(); ConnectionFactory factory = template.getConnectionFactory();
connection = factory.createConnection(); final Connection c = connection = factory.createConnection();
// we might be a reusable connection in spring // we might be a reusable connection in spring
// so lets only set the client ID once if its not set // so lets only set the client ID once if its not set
synchronized (connection) { synchronized (c) {
if (connection.getClientID() == null) { if (c.getClientID() == null) {
connection.setClientID(myId); c.setClientID(myId);
} }
} }

View File

@ -71,17 +71,17 @@ public class JDBCIOExceptionHandlerTest extends TestCase {
JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter(); JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
jdbc.setDataSource(dataSource); jdbc.setDataSource(dataSource);
jdbc.setLockKeepAlivePeriod(1000l); jdbc.setLockKeepAlivePeriod(1000L);
if (leaseLocker) { if (leaseLocker) {
LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker(); LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
leaseDatabaseLocker.setHandleStartException(true); leaseDatabaseLocker.setHandleStartException(true);
leaseDatabaseLocker.setLockAcquireSleepInterval(2000l); leaseDatabaseLocker.setLockAcquireSleepInterval(2000L);
jdbc.setLocker(leaseDatabaseLocker); jdbc.setLocker(leaseDatabaseLocker);
} }
broker.setPersistenceAdapter(jdbc); broker.setPersistenceAdapter(jdbc);
LeaseLockerIOExceptionHandler ioExceptionHandler = new LeaseLockerIOExceptionHandler(); LeaseLockerIOExceptionHandler ioExceptionHandler = new LeaseLockerIOExceptionHandler();
ioExceptionHandler.setResumeCheckSleepPeriod(1000l); ioExceptionHandler.setResumeCheckSleepPeriod(1000L);
ioExceptionHandler.setStopStartConnectors(startStopConnectors); ioExceptionHandler.setStopStartConnectors(startStopConnectors);
broker.setIoExceptionHandler(ioExceptionHandler); broker.setIoExceptionHandler(ioExceptionHandler);
String connectionUri = broker.addConnector(TRANSPORT_URL).getPublishableConnectString(); String connectionUri = broker.addConnector(TRANSPORT_URL).getPublishableConnectString();
@ -129,18 +129,18 @@ public class JDBCIOExceptionHandlerTest extends TestCase {
JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter(); JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
jdbc.setDataSource(dataSource); jdbc.setDataSource(dataSource);
jdbc.setLockKeepAlivePeriod(1000l); jdbc.setLockKeepAlivePeriod(1000L);
if (lease) { if (lease) {
LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker(); LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
leaseDatabaseLocker.setHandleStartException(true); leaseDatabaseLocker.setHandleStartException(true);
leaseDatabaseLocker.setLockAcquireSleepInterval(2000l); leaseDatabaseLocker.setLockAcquireSleepInterval(2000L);
jdbc.setLocker(leaseDatabaseLocker); jdbc.setLocker(leaseDatabaseLocker);
} }
broker.setPersistenceAdapter(jdbc); broker.setPersistenceAdapter(jdbc);
LeaseLockerIOExceptionHandler ioExceptionHandler = new LeaseLockerIOExceptionHandler(); LeaseLockerIOExceptionHandler ioExceptionHandler = new LeaseLockerIOExceptionHandler();
ioExceptionHandler.setResumeCheckSleepPeriod(1000l); ioExceptionHandler.setResumeCheckSleepPeriod(1000L);
ioExceptionHandler.setStopStartConnectors(false); ioExceptionHandler.setStopStartConnectors(false);
broker.setIoExceptionHandler(ioExceptionHandler); broker.setIoExceptionHandler(ioExceptionHandler);
slave.set(broker); slave.set(broker);

View File

@ -150,7 +150,7 @@ public class JDBCMessagePriorityTest extends MessagePriorityTest {
for (int priority = 0; priority < maxPriority; priority++) { for (int priority = 0; priority < maxPriority; priority++) {
producers.add(new ProducerThread(topic, MSG_NUM, priority)); producers.add(new ProducerThread(topic, MSG_NUM, priority));
messageCounts[priority] = new AtomicInteger(0); messageCounts[priority] = new AtomicInteger(0);
messageIds[priority] = 1l; messageIds[priority] = 1L;
} }
for (ProducerThread producer : producers) { for (ProducerThread producer : producers) {

View File

@ -188,7 +188,7 @@ public class LeaseDatabaseLockerTest {
statement = connection.prepareStatement(jdbc.getStatements().getLeaseUpdateStatement()); statement = connection.prepareStatement(jdbc.getStatements().getLeaseUpdateStatement());
statement.setString(1, null); statement.setString(1, null);
statement.setLong(2, 0l); statement.setLong(2, 0L);
statement.setString(3, fakeId); statement.setString(3, fakeId);
assertEquals("we released " + fakeId, 1, statement.executeUpdate()); assertEquals("we released " + fakeId, 1, statement.executeUpdate());
LOG.info("released " + fakeId); LOG.info("released " + fakeId);

View File

@ -189,7 +189,7 @@ public class KahaDBFastEnqueueTest {
MessageProducer producer = session.createProducer(destination); MessageProducer producer = session.createProducer(destination);
Long start = System.currentTimeMillis(); Long start = System.currentTimeMillis();
long i = 0l; long i = 0L;
while ( (i=count.getAndDecrement()) > 0) { while ( (i=count.getAndDecrement()) > 0) {
Message message = null; Message message = null;
if (useBytesMessage) { if (useBytesMessage) {
@ -238,8 +238,8 @@ public class KahaDBFastEnqueueTest {
public void testRollover() throws Exception { public void testRollover() throws Exception {
byte flip = 0x1; byte flip = 0x1;
for (long i=0; i<Short.MAX_VALUE; i++) { for (long i=0; i<Short.MAX_VALUE; i++) {
assertEquals("0 @:" + i, 0, flip ^= 1); assertEquals("0 @:" + i, 0, flip ^= (byte) 1);
assertEquals("1 @:" + i, 1, flip ^= 1); assertEquals("1 @:" + i, 1, flip ^= (byte) 1);
} }
} }
} }

View File

@ -22,11 +22,7 @@ import static org.junit.Assert.assertTrue;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.*;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Vector;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -46,7 +42,7 @@ public class PListTest {
private PListStoreImpl store; private PListStoreImpl store;
private PListImpl plist; private PListImpl plist;
final ByteSequence payload = new ByteSequence(new byte[400]); final ByteSequence payload = new ByteSequence(new byte[400]);
final String idSeed = new String("Seed" + new byte[1024]); final String idSeed = new String("Seed" + Arrays.toString(new byte[1024]));
final Vector<Throwable> exceptions = new Vector<Throwable>(); final Vector<Throwable> exceptions = new Vector<Throwable>();
ExecutorService executor; ExecutorService executor;
@ -617,7 +613,7 @@ public class PListTest {
} }
} }
Map<PList, Object> locks = new HashMap<PList, Object>(); final Map<PList, Object> locks = new HashMap<PList, Object>();
private Object plistLocks(PList plist) { private Object plistLocks(PList plist) {
Object lock = null; Object lock = null;

View File

@ -250,7 +250,7 @@ public class JMSInputStreamTest extends JmsTestSupport {
} }
out.flush(); out.flush();
synchronized (complete) { synchronized (complete) {
if (!complete.get()) { while (!complete.get()) {
complete.wait(30000); complete.wait(30000);
} }
} }

View File

@ -19,6 +19,7 @@ package org.apache.activemq.transport;
import java.io.IOException; import java.io.IOException;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceStopper;
@ -29,7 +30,7 @@ import org.apache.activemq.util.ServiceStopper;
public class StubTransport extends TransportSupport { public class StubTransport extends TransportSupport {
private Queue<Object> queue = new ConcurrentLinkedQueue<Object>(); private Queue<Object> queue = new ConcurrentLinkedQueue<Object>();
private volatile int receiveCounter; private AtomicInteger receiveCounter;
protected void doStop(ServiceStopper stopper) throws Exception { protected void doStop(ServiceStopper stopper) throws Exception {
} }
@ -38,7 +39,7 @@ public class StubTransport extends TransportSupport {
} }
public void oneway(Object command) throws IOException { public void oneway(Object command) throws IOException {
receiveCounter++; receiveCounter.incrementAndGet();
queue.add(command); queue.add(command);
} }
@ -51,7 +52,7 @@ public class StubTransport extends TransportSupport {
} }
public int getReceiveCounter() { public int getReceiveCounter() {
return receiveCounter; return receiveCounter.get();
} }
} }

View File

@ -55,7 +55,7 @@ public class TopicClusterTest extends TestCase implements MessageListener {
protected Destination destination; protected Destination destination;
protected boolean topic = true; protected boolean topic = true;
protected AtomicInteger receivedMessageCount = new AtomicInteger(0); protected final AtomicInteger receivedMessageCount = new AtomicInteger(0);
protected int deliveryMode = DeliveryMode.NON_PERSISTENT; protected int deliveryMode = DeliveryMode.NON_PERSISTENT;
protected MessageProducer[] producers; protected MessageProducer[] producers;
protected Connection[] connections; protected Connection[] connections;
@ -166,7 +166,7 @@ public class TopicClusterTest extends TestCase implements MessageListener {
} }
} }
synchronized (receivedMessageCount) { synchronized (receivedMessageCount) {
if (receivedMessageCount.get() < expectedReceiveCount()) { while (receivedMessageCount.get() < expectedReceiveCount()) {
receivedMessageCount.wait(20000); receivedMessageCount.wait(20000);
} }
} }

View File

@ -21,6 +21,7 @@ import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection; import javax.jms.Connection;
@ -73,14 +74,12 @@ public class AMQ1925Test extends TestCase implements ExceptionListener {
// The runnable is likely to interrupt during the session#commit, since // The runnable is likely to interrupt during the session#commit, since
// this takes the longest // this takes the longest
final Object starter = new Object(); final CountDownLatch starter = new CountDownLatch(1);
final AtomicBoolean restarted = new AtomicBoolean(); final AtomicBoolean restarted = new AtomicBoolean();
new Thread(new Runnable() { new Thread(new Runnable() {
public void run() { public void run() {
try { try {
synchronized (starter) { starter.await();
starter.wait();
}
// Simulate broker failure & restart // Simulate broker failure & restart
bs.stop(); bs.stop();
@ -97,9 +96,6 @@ public class AMQ1925Test extends TestCase implements ExceptionListener {
} }
}).start(); }).start();
synchronized (starter) {
starter.notifyAll();
}
for (int i = 0; i < MESSAGE_COUNT; i++) { for (int i = 0; i < MESSAGE_COUNT; i++) {
Message message = consumer.receive(500); Message message = consumer.receive(500);
assertNotNull("No Message " + i + " found", message); assertNotNull("No Message " + i + " found", message);
@ -108,9 +104,7 @@ public class AMQ1925Test extends TestCase implements ExceptionListener {
assertFalse("Timing problem, restarted too soon", restarted assertFalse("Timing problem, restarted too soon", restarted
.get()); .get());
if (i == 10) { if (i == 10) {
synchronized (starter) { starter.countDown();
starter.notifyAll();
}
} }
if (i > MESSAGE_COUNT - 100) { if (i > MESSAGE_COUNT - 100) {
assertTrue("Timing problem, restarted too late", restarted assertTrue("Timing problem, restarted too late", restarted
@ -143,14 +137,12 @@ public class AMQ1925Test extends TestCase implements ExceptionListener {
// The runnable is likely to interrupt during the session#commit, since // The runnable is likely to interrupt during the session#commit, since
// this takes the longest // this takes the longest
final Object starter = new Object(); final CountDownLatch starter = new CountDownLatch(1);
final AtomicBoolean restarted = new AtomicBoolean(); final AtomicBoolean restarted = new AtomicBoolean();
new Thread(new Runnable() { new Thread(new Runnable() {
public void run() { public void run() {
try { try {
synchronized (starter) { starter.await();
starter.wait();
}
// Simulate broker failure & restart // Simulate broker failure & restart
bs.stop(); bs.stop();
@ -167,9 +159,6 @@ public class AMQ1925Test extends TestCase implements ExceptionListener {
} }
}).start(); }).start();
synchronized (starter) {
starter.notifyAll();
}
Collection<Integer> results = new ArrayList<Integer>(MESSAGE_COUNT); Collection<Integer> results = new ArrayList<Integer>(MESSAGE_COUNT);
for (int i = 0; i < MESSAGE_COUNT; i++) { for (int i = 0; i < MESSAGE_COUNT; i++) {
Message message1 = consumer1.receive(20); Message message1 = consumer1.receive(20);
@ -191,9 +180,7 @@ public class AMQ1925Test extends TestCase implements ExceptionListener {
assertFalse("Timing problem, restarted too soon", restarted assertFalse("Timing problem, restarted too soon", restarted
.get()); .get());
if (i == 10) { if (i == 10) {
synchronized (starter) { starter.countDown();
starter.notifyAll();
}
} }
if (i > MESSAGE_COUNT - 50) { if (i > MESSAGE_COUNT - 50) {
assertTrue("Timing problem, restarted too late", restarted assertTrue("Timing problem, restarted too late", restarted

View File

@ -46,7 +46,7 @@ public abstract class UdpTestSupport extends TestCase implements TransportListen
protected Transport producer; protected Transport producer;
protected Transport consumer; protected Transport consumer;
protected Object lock = new Object(); protected final Object lock = new Object();
protected Command receivedCommand; protected Command receivedCommand;
protected TransportServer server; protected TransportServer server;
protected boolean large; protected boolean large;
@ -251,10 +251,10 @@ public abstract class UdpTestSupport extends TestCase implements TransportListen
Command answer = null; Command answer = null;
synchronized (lock) { synchronized (lock) {
answer = receivedCommand; answer = receivedCommand;
if (answer == null) { while (answer == null) {
lock.wait(waitForCommandTimeout); lock.wait(waitForCommandTimeout);
answer = receivedCommand;
} }
answer = receivedCommand;
} }
assertNotNull("Should have received a Command by now!", answer); assertNotNull("Should have received a Command by now!", answer);

View File

@ -69,7 +69,7 @@ public class BacklogNetworkCrossTalkTest extends JmsMultipleBrokersTestSupport {
MessageConsumer clientB = createConsumer("B", destA); MessageConsumer clientB = createConsumer("B", destA);
final long maxWait = 5 * 60 * 1000l; final long maxWait = 5 * 60 * 1000L;
MessageIdList listA = getConsumerMessages("A", clientA); MessageIdList listA = getConsumerMessages("A", clientA);
listA.setMaximumDuration(maxWait); listA.setMaximumDuration(maxWait);
listA.waitForMessagesToArrive(numMessages); listA.waitForMessagesToArrive(numMessages);

View File

@ -88,7 +88,7 @@ public class BrokerQueueNetworkWithDisconnectTest extends JmsMultipleBrokersTest
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void testNoStuckConnectionsWithTransportDisconnect() throws Exception { public void testNoStuckConnectionsWithTransportDisconnect() throws Exception {
inactiveDuration=60000l; inactiveDuration=60000L;
useDuplexNetworkBridge = true; useDuplexNetworkBridge = true;
bridgeBrokers(SPOKE, HUB); bridgeBrokers(SPOKE, HUB);

View File

@ -107,7 +107,7 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport {
// periodically start a durable sub that has a backlog // periodically start a durable sub that has a backlog
final int consumersToActivate = 5; final int consumersToActivate = 5;
final Object addConsumerSignal = new Object(); final CountDownLatch addConsumerSignal = new CountDownLatch(1);
Executors.newCachedThreadPool(new ThreadFactory() { Executors.newCachedThreadPool(new ThreadFactory() {
@Override @Override
public Thread newThread(Runnable r) { public Thread newThread(Runnable r) {
@ -120,9 +120,7 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport {
MessageConsumer consumer = null; MessageConsumer consumer = null;
for (int i = 0; i < consumersToActivate; i++) { for (int i = 0; i < consumersToActivate; i++) {
LOG.info("Waiting for add signal from producer..."); LOG.info("Waiting for add signal from producer...");
synchronized (addConsumerSignal) { addConsumerSignal.await(30, TimeUnit.MINUTES);
addConsumerSignal.wait(30 * 60 * 1000);
}
TimedMessageListener listener = new TimedMessageListener(); TimedMessageListener listener = new TimedMessageListener();
consumer = createDurableSubscriber(factory.createConnection(), destination, "consumer" + (i + 1)); consumer = createDurableSubscriber(factory.createConnection(), destination, "consumer" + (i + 1));
LOG.info("Created consumer " + consumer); LOG.info("Created consumer " + consumer);
@ -254,7 +252,7 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport {
final int numIterations, final int numIterations,
Session session, Session session,
MessageProducer producer, MessageProducer producer,
Object addConsumerSignal) throws Exception { CountDownLatch addConsumerSignal) throws Exception {
long start; long start;
long count = 0; long count = 0;
double batchMax = 0, max = 0, sum = 0; double batchMax = 0, max = 0, sum = 0;
@ -269,10 +267,8 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport {
max = Math.max(max, (System.currentTimeMillis() - singleSendstart)); max = Math.max(max, (System.currentTimeMillis() - singleSendstart));
if (++count % 500 == 0) { if (++count % 500 == 0) {
if (addConsumerSignal != null) { if (addConsumerSignal != null) {
synchronized (addConsumerSignal) { addConsumerSignal.countDown();
addConsumerSignal.notifyAll(); LOG.info("Signalled add consumer");
LOG.info("Signalled add consumer");
}
} }
}; };
if (count % 5000 == 0) { if (count % 5000 == 0) {
@ -405,7 +401,7 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport {
final int batchSize = 1000; final int batchSize = 1000;
CountDownLatch firstReceiptLatch = new CountDownLatch(1); CountDownLatch firstReceiptLatch = new CountDownLatch(1);
long mark = System.currentTimeMillis(); long mark = System.currentTimeMillis();
long firstReceipt = 0l; long firstReceipt = 0L;
long receiptAccumulator = 0; long receiptAccumulator = 0;
long batchReceiptAccumulator = 0; long batchReceiptAccumulator = 0;
long maxReceiptTime = 0; long maxReceiptTime = 0;

View File

@ -95,7 +95,7 @@ public class ConcurrentProducerQueueConsumerTest extends TestSupport
// periodically start a queue consumer // periodically start a queue consumer
final int consumersToActivate = 5; final int consumersToActivate = 5;
final Object addConsumerSignal = new Object(); final CountDownLatch addConsumerSignal = new CountDownLatch(1);
Executors.newCachedThreadPool(new ThreadFactory() { Executors.newCachedThreadPool(new ThreadFactory() {
@Override @Override
public Thread newThread(Runnable r) { public Thread newThread(Runnable r) {
@ -108,9 +108,7 @@ public class ConcurrentProducerQueueConsumerTest extends TestSupport
MessageConsumer consumer = null; MessageConsumer consumer = null;
for (int i = 0; i < consumersToActivate; i++) { for (int i = 0; i < consumersToActivate; i++) {
LOG.info("Waiting for add signal from producer..."); LOG.info("Waiting for add signal from producer...");
synchronized (addConsumerSignal) { addConsumerSignal.await(30, TimeUnit.MINUTES);
addConsumerSignal.wait(30 * 60 * 1000);
}
TimedMessageListener listener = new TimedMessageListener(); TimedMessageListener listener = new TimedMessageListener();
consumer = createConsumer(factory.createConnection(), destination); consumer = createConsumer(factory.createConnection(), destination);
LOG.info("Created consumer " + consumer); LOG.info("Created consumer " + consumer);
@ -241,7 +239,7 @@ public class ConcurrentProducerQueueConsumerTest extends TestSupport
final int numIterations, final int numIterations,
Session session, Session session,
MessageProducer producer, MessageProducer producer,
Object addConsumerSignal) throws Exception { CountDownLatch addConsumerSignal) throws Exception {
long start; long start;
long count = 0; long count = 0;
double batchMax = 0, max = 0, sum = 0; double batchMax = 0, max = 0, sum = 0;
@ -257,10 +255,8 @@ public class ConcurrentProducerQueueConsumerTest extends TestSupport
max = Math.max(max, (System.currentTimeMillis() - singleSendstart)); max = Math.max(max, (System.currentTimeMillis() - singleSendstart));
if (++count % 500 == 0) { if (++count % 500 == 0) {
if (addConsumerSignal != null) { if (addConsumerSignal != null) {
synchronized (addConsumerSignal) { addConsumerSignal.countDown();
addConsumerSignal.notifyAll(); LOG.info("Signalled add consumer");
LOG.info("Signalled add consumer");
}
} }
} }
; ;
@ -362,7 +358,7 @@ public class ConcurrentProducerQueueConsumerTest extends TestSupport
final CountDownLatch firstReceiptLatch = new CountDownLatch(1); final CountDownLatch firstReceiptLatch = new CountDownLatch(1);
long mark = System.currentTimeMillis(); long mark = System.currentTimeMillis();
long firstReceipt = 0l; long firstReceipt = 0L;
long receiptAccumulator = 0; long receiptAccumulator = 0;
long batchReceiptAccumulator = 0; long batchReceiptAccumulator = 0;
long maxReceiptTime = 0; long maxReceiptTime = 0;

View File

@ -30,6 +30,7 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Session; import javax.jms.Session;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
@ -80,7 +81,7 @@ public class DurableSubscriptionOffline4Test extends DurableSubscriptionOfflineT
MessageProducer producer = session.createProducer(null); MessageProducer producer = session.createProducer(null);
final int toSend = 500; final int toSend = 500;
final String payload = new byte[40*1024].toString(); final String payload = Arrays.toString(new byte[40 * 1024]);
int sent = 0; int sent = 0;
for (int i = sent; i < toSend; i++) { for (int i = sent; i < toSend; i++) {
Message message = session.createTextMessage(payload); Message message = session.createTextMessage(payload);

View File

@ -239,7 +239,7 @@ public class JdbcDurableSubDupTest {
ActiveMQConnectionFactory factory; ActiveMQConnectionFactory factory;
MessageProducer messageProducer; MessageProducer messageProducer;
long timeToLive = 0l; long timeToLive = 0L;
TextMessage message = null; TextMessage message = null;

View File

@ -168,7 +168,7 @@ public class MemoryLimitTest extends TestSupport {
final ProducerThread producer = new ProducerThread(sess, sess.createQueue("STORE.1")) { final ProducerThread producer = new ProducerThread(sess, sess.createQueue("STORE.1")) {
@Override @Override
protected Message createMessage(int i) throws Exception { protected Message createMessage(int i) throws Exception {
return sess.createTextMessage(payload + "::" + i); return sess.createTextMessage(Arrays.toString(payload) + "::" + i);
} }
}; };
producer.setMessageCount(1000); producer.setMessageCount(1000);
@ -176,7 +176,7 @@ public class MemoryLimitTest extends TestSupport {
final ProducerThread producer2 = new ProducerThread(sess, sess.createQueue("STORE.2")) { final ProducerThread producer2 = new ProducerThread(sess, sess.createQueue("STORE.2")) {
@Override @Override
protected Message createMessage(int i) throws Exception { protected Message createMessage(int i) throws Exception {
return sess.createTextMessage(payload + "::" + i); return sess.createTextMessage(Arrays.toString(payload) + "::" + i);
} }
}; };
producer2.setMessageCount(1000); producer2.setMessageCount(1000);

View File

@ -122,8 +122,8 @@ public class MessageGroupReconnectDistributionTest {
final ArrayList<AtomicLong> batchCounters = new ArrayList<AtomicLong>(numConsumers); final ArrayList<AtomicLong> batchCounters = new ArrayList<AtomicLong>(numConsumers);
for (int i = 0; i < numConsumers; i++) { for (int i = 0; i < numConsumers; i++) {
consumedCounters.add(new AtomicLong(0l)); consumedCounters.add(new AtomicLong(0L));
batchCounters.add(new AtomicLong(0l)); batchCounters.add(new AtomicLong(0L));
final int id = i; final int id = i;
executorService.submit(new Runnable() { executorService.submit(new Runnable() {

View File

@ -44,7 +44,7 @@ public class MultiBrokersMultiClientsTest extends JmsMultipleBrokersTestSupport
private static final Logger LOG = LoggerFactory.getLogger(MultiBrokersMultiClientsTest.class); private static final Logger LOG = LoggerFactory.getLogger(MultiBrokersMultiClientsTest.class);
protected Map<String, MessageConsumer> consumerMap; protected Map<String, MessageConsumer> consumerMap;
Map<Thread, Throwable> unhandeledExceptions = new HashMap<Thread, Throwable>(); final Map<Thread, Throwable> unhandeledExceptions = new HashMap<Thread, Throwable>();
public void testTopicAllConnected() throws Exception { public void testTopicAllConnected() throws Exception {
bridgeAllBrokers(); bridgeAllBrokers();

View File

@ -265,7 +265,7 @@ public class NoDuplicateOnTopicNetworkTest extends CombinationTestSupport {
private MessageConsumer consumer; private MessageConsumer consumer;
private final String durableID = "DURABLE_ID"; private final String durableID = "DURABLE_ID";
private List<String> receivedStrings = Collections.synchronizedList(new ArrayList<String>()); private final List<String> receivedStrings = Collections.synchronizedList(new ArrayList<String>());
private int numMessages = 10; private int numMessages = 10;
private CountDownLatch recievedLatch = new CountDownLatch(numMessages); private CountDownLatch recievedLatch = new CountDownLatch(numMessages);

View File

@ -47,8 +47,8 @@ public class ReliableReconnectTest extends org.apache.activemq.TestSupport {
protected int deliveryMode = DeliveryMode.PERSISTENT; protected int deliveryMode = DeliveryMode.PERSISTENT;
protected String consumerClientId; protected String consumerClientId;
protected Destination destination; protected Destination destination;
protected AtomicBoolean closeBroker = new AtomicBoolean(false); protected final AtomicBoolean closeBroker = new AtomicBoolean(false);
protected AtomicInteger messagesReceived = new AtomicInteger(0); protected final AtomicInteger messagesReceived = new AtomicInteger(0);
protected BrokerService broker; protected BrokerService broker;
protected int firstBatch = MESSAGE_COUNT / 10; protected int firstBatch = MESSAGE_COUNT / 10;
private IdGenerator idGen = new IdGenerator(); private IdGenerator idGen = new IdGenerator();
@ -159,7 +159,7 @@ public class ReliableReconnectTest extends org.apache.activemq.TestSupport {
connection.close(); connection.close();
spawnConsumer(); spawnConsumer();
synchronized (closeBroker) { synchronized (closeBroker) {
if (!closeBroker.get()) { while (!closeBroker.get()) {
closeBroker.wait(); closeBroker.wait();
} }
} }
@ -168,7 +168,7 @@ public class ReliableReconnectTest extends org.apache.activemq.TestSupport {
startBroker(false); startBroker(false);
// System.err.println("Started Broker again"); // System.err.println("Started Broker again");
synchronized (messagesReceived) { synchronized (messagesReceived) {
if (messagesReceived.get() < MESSAGE_COUNT) { while (messagesReceived.get() < MESSAGE_COUNT) {
messagesReceived.wait(60000); messagesReceived.wait(60000);
} }
} }

View File

@ -122,7 +122,7 @@ public class ThreeBrokerTempDestDemandSubscriptionCleanupTest extends JmsMultipl
threadService.submit(tester); threadService.submit(tester);
threadService.shutdown(); threadService.shutdown();
assertTrue("executor done on time", threadService.awaitTermination(30l, TimeUnit.SECONDS)); assertTrue("executor done on time", threadService.awaitTermination(30L, TimeUnit.SECONDS));
// for the real test... we should not have any subscriptions left on broker C for the temp dests // for the real test... we should not have any subscriptions left on broker C for the temp dests
BrokerItem brokerC = brokers.get(BROKER_C); BrokerItem brokerC = brokers.get(BROKER_C);

View File

@ -51,7 +51,7 @@ public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTest
public static final int TIMEOUT = 30000; public static final int TIMEOUT = 30000;
protected Map<String, MessageConsumer> consumerMap; protected Map<String, MessageConsumer> consumerMap;
Map<Thread, Throwable> unhandledExceptions = new HashMap<Thread, Throwable>(); final Map<Thread, Throwable> unhandledExceptions = new HashMap<Thread, Throwable>();
private void assertNoUnhandledExceptions() { private void assertNoUnhandledExceptions() {
for( Entry<Thread, Throwable> e: unhandledExceptions.entrySet()) { for( Entry<Thread, Throwable> e: unhandledExceptions.entrySet()) {

View File

@ -140,20 +140,28 @@ public class MessageIdList extends Assert implements MessageListener {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
for (int i = 0; i < messageCount; i++) { synchronized (semaphore)
try { {
if (hasReceivedMessages(messageCount)) { for (int i = 0; i < messageCount; i++)
break; {
} try
long duration = System.currentTimeMillis() - start; {
if (duration >= maximumDuration) { if (hasReceivedMessages(messageCount))
break; {
} break;
synchronized (semaphore) { }
long duration = System.currentTimeMillis() - start;
if (duration >= maximumDuration)
{
break;
}
semaphore.wait(maximumDuration - duration); semaphore.wait(maximumDuration - duration);
} }
} catch (InterruptedException e) { catch (InterruptedException e)
LOG.info("Caught: " + e); {
LOG.info("Caught: " + e);
}
} }
} }
long end = System.currentTimeMillis() - start; long end = System.currentTimeMillis() - start;