mirror of https://github.com/apache/activemq.git
fix some synchonization issues
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@815788 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5f2abc13c8
commit
7d4b446e67
|
@ -28,32 +28,25 @@ import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
public class ConsumerBean extends Assert implements MessageListener {
|
public class ConsumerBean extends Assert implements MessageListener {
|
||||||
private static final Log LOG = LogFactory.getLog(ConsumerBean.class);
|
private static final Log LOG = LogFactory.getLog(ConsumerBean.class);
|
||||||
private List<Message> messages = new ArrayList<Message>();
|
private final List<Message> messages = new ArrayList<Message>();
|
||||||
private Object semaphore;
|
|
||||||
private boolean verbose;
|
private boolean verbose;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor.
|
* Constructor.
|
||||||
*/
|
*/
|
||||||
public ConsumerBean() {
|
public ConsumerBean() {
|
||||||
this(new Object());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Constructor, initialized semaphore object.
|
|
||||||
*
|
|
||||||
* @param semaphore
|
|
||||||
*/
|
|
||||||
public ConsumerBean(Object semaphore) {
|
|
||||||
this.semaphore = semaphore;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return all the messages on the list so far, clearing the buffer
|
* @return all the messages on the list so far, clearing the buffer
|
||||||
*/
|
*/
|
||||||
public synchronized List<Message> flushMessages() {
|
public List<Message> flushMessages() {
|
||||||
List<Message> answer = new ArrayList<Message>(messages);
|
List<Message> answer = null;
|
||||||
|
synchronized(messages) {
|
||||||
|
answer = new ArrayList<Message>(messages);
|
||||||
messages.clear();
|
messages.clear();
|
||||||
|
}
|
||||||
return answer;
|
return answer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -62,13 +55,13 @@ public class ConsumerBean extends Assert implements MessageListener {
|
||||||
*
|
*
|
||||||
* @param message
|
* @param message
|
||||||
*/
|
*/
|
||||||
public synchronized void onMessage(Message message) {
|
public void onMessage(Message message) {
|
||||||
messages.add(message);
|
synchronized (messages) {
|
||||||
if (verbose) {
|
messages.add(message);
|
||||||
LOG.info("Received: " + message);
|
if (verbose) {
|
||||||
}
|
LOG.info("Received: " + message);
|
||||||
synchronized (semaphore) {
|
}
|
||||||
semaphore.notifyAll();
|
messages.notifyAll();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -82,8 +75,8 @@ public class ConsumerBean extends Assert implements MessageListener {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (hasReceivedMessage()) {
|
if (hasReceivedMessage()) {
|
||||||
synchronized (semaphore) {
|
synchronized (messages) {
|
||||||
semaphore.wait(4000);
|
messages.wait(4000);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
|
@ -100,29 +93,30 @@ public class ConsumerBean extends Assert implements MessageListener {
|
||||||
* @param messageCount
|
* @param messageCount
|
||||||
*/
|
*/
|
||||||
public void waitForMessagesToArrive(int messageCount) {
|
public void waitForMessagesToArrive(int messageCount) {
|
||||||
final long maxRemainingMessageCount = Math.max(0, messageCount - messages.size());
|
long maxRemainingMessageCount = Math.max(0, messageCount - messages.size());
|
||||||
LOG.info("Waiting for (" + maxRemainingMessageCount + ") message(s) to arrive");
|
LOG.info("Waiting for (" + maxRemainingMessageCount + ") message(s) to arrive");
|
||||||
long start = System.currentTimeMillis();
|
long start = System.currentTimeMillis();
|
||||||
for (int i = 0; i < maxRemainingMessageCount; i++) {
|
long maxWaitTime = start + 120 * 1000;
|
||||||
|
while (maxRemainingMessageCount > 0) {
|
||||||
try {
|
try {
|
||||||
synchronized (semaphore) {
|
synchronized (messages) {
|
||||||
semaphore.wait(1000);
|
messages.wait(1000);
|
||||||
}
|
}
|
||||||
if (hasReceivedMessages(messageCount)) {
|
if (hasReceivedMessages(messageCount) || System.currentTimeMillis() > maxWaitTime) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
LOG.info("Caught: " + e);
|
LOG.info("Caught: " + e);
|
||||||
}
|
}
|
||||||
|
maxRemainingMessageCount = Math.max(0, messageCount - messages.size());
|
||||||
}
|
}
|
||||||
long end = System.currentTimeMillis() - start;
|
long end = System.currentTimeMillis() - start;
|
||||||
|
|
||||||
LOG.info("End of wait for " + end + " millis");
|
LOG.info("End of wait for " + end + " millis");
|
||||||
}
|
}
|
||||||
|
|
||||||
public void assertMessagesArrived(int total) {
|
public void assertMessagesArrived(int total) {
|
||||||
waitForMessagesToArrive(total);
|
waitForMessagesToArrive(total);
|
||||||
synchronized (this) {
|
synchronized (messages) {
|
||||||
int count = messages.size();
|
int count = messages.size();
|
||||||
|
|
||||||
assertEquals("Messages received", total, count);
|
assertEquals("Messages received", total, count);
|
||||||
|
@ -152,7 +146,9 @@ public class ConsumerBean extends Assert implements MessageListener {
|
||||||
* @param messageCount
|
* @param messageCount
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
protected synchronized boolean hasReceivedMessages(int messageCount) {
|
protected boolean hasReceivedMessages(int messageCount) {
|
||||||
return messages.size() >= messageCount;
|
synchronized (messages) {
|
||||||
|
return messages.size() >= messageCount;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue