mirror of https://github.com/apache/activemq.git
partial fix for AMQ2149|http://issues.apache.org/activemq/browse/AMQ-2149 , contention when usage limit reached can lead to out or order mesage dispatch
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@755715 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a1cccb9ed9
commit
08aecbee6e
|
@ -70,6 +70,8 @@ import org.apache.activemq.thread.Task;
|
||||||
import org.apache.activemq.thread.TaskRunner;
|
import org.apache.activemq.thread.TaskRunner;
|
||||||
import org.apache.activemq.thread.TaskRunnerFactory;
|
import org.apache.activemq.thread.TaskRunnerFactory;
|
||||||
import org.apache.activemq.transaction.Synchronization;
|
import org.apache.activemq.transaction.Synchronization;
|
||||||
|
import org.apache.activemq.usage.Usage;
|
||||||
|
import org.apache.activemq.usage.UsageListener;
|
||||||
import org.apache.activemq.util.BrokerSupport;
|
import org.apache.activemq.util.BrokerSupport;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -81,7 +83,7 @@ import org.apache.commons.logging.LogFactory;
|
||||||
*
|
*
|
||||||
* @version $Revision: 1.28 $
|
* @version $Revision: 1.28 $
|
||||||
*/
|
*/
|
||||||
public class Queue extends BaseDestination implements Task {
|
public class Queue extends BaseDestination implements Task, UsageListener {
|
||||||
protected static final Log LOG = LogFactory.getLog(Queue.class);
|
protected static final Log LOG = LogFactory.getLog(Queue.class);
|
||||||
protected TaskRunnerFactory taskFactory;
|
protected TaskRunnerFactory taskFactory;
|
||||||
protected TaskRunner taskRunner;
|
protected TaskRunner taskRunner;
|
||||||
|
@ -99,7 +101,7 @@ public class Queue extends BaseDestination implements Task {
|
||||||
private final ReentrantLock dispatchLock = new ReentrantLock();
|
private final ReentrantLock dispatchLock = new ReentrantLock();
|
||||||
private boolean useConsumerPriority=true;
|
private boolean useConsumerPriority=true;
|
||||||
private boolean strictOrderDispatch=false;
|
private boolean strictOrderDispatch=false;
|
||||||
private QueueDispatchSelector dispatchSelector;
|
private QueueDispatchSelector dispatchSelector;
|
||||||
private boolean optimizedDispatch=false;
|
private boolean optimizedDispatch=false;
|
||||||
private boolean firstConsumer = false;
|
private boolean firstConsumer = false;
|
||||||
private int timeBeforeDispatchStarts = 0;
|
private int timeBeforeDispatchStarts = 0;
|
||||||
|
@ -133,6 +135,16 @@ public class Queue extends BaseDestination implements Task {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// make the queue easily visible in the debugger from its task runner threads
|
||||||
|
final class QueueThread extends Thread {
|
||||||
|
final Queue queue;
|
||||||
|
public QueueThread(Runnable runnable, String name,
|
||||||
|
Queue queue) {
|
||||||
|
super(runnable, name);
|
||||||
|
this.queue = queue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void initialize() throws Exception {
|
public void initialize() throws Exception {
|
||||||
if (this.messages == null) {
|
if (this.messages == null) {
|
||||||
if (destination.isTemporary() || broker == null || store == null) {
|
if (destination.isTemporary() || broker == null || store == null) {
|
||||||
|
@ -153,9 +165,10 @@ public class Queue extends BaseDestination implements Task {
|
||||||
if (isOptimizedDispatch()) {
|
if (isOptimizedDispatch()) {
|
||||||
this.taskRunner = taskFactory.createTaskRunner(this, "TempQueue: " + destination.getPhysicalName());
|
this.taskRunner = taskFactory.createTaskRunner(this, "TempQueue: " + destination.getPhysicalName());
|
||||||
}else {
|
}else {
|
||||||
|
final Queue queue = this;
|
||||||
this.executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
|
this.executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
|
||||||
public Thread newThread(Runnable runnable) {
|
public Thread newThread(Runnable runnable) {
|
||||||
Thread thread = new Thread(runnable, "QueueThread:"+destination);
|
Thread thread = new QueueThread(runnable, "QueueThread:"+destination, queue);
|
||||||
thread.setDaemon(true);
|
thread.setDaemon(true);
|
||||||
thread.setPriority(Thread.NORM_PRIORITY);
|
thread.setPriority(Thread.NORM_PRIORITY);
|
||||||
return thread;
|
return thread;
|
||||||
|
@ -565,6 +578,7 @@ public class Queue extends BaseDestination implements Task {
|
||||||
if (memoryUsage != null) {
|
if (memoryUsage != null) {
|
||||||
memoryUsage.start();
|
memoryUsage.start();
|
||||||
}
|
}
|
||||||
|
systemUsage.getMemoryUsage().addUsageListener(this);
|
||||||
messages.start();
|
messages.start();
|
||||||
doPageIn(false);
|
doPageIn(false);
|
||||||
}
|
}
|
||||||
|
@ -579,6 +593,8 @@ public class Queue extends BaseDestination implements Task {
|
||||||
if (messages != null) {
|
if (messages != null) {
|
||||||
messages.stop();
|
messages.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
systemUsage.getMemoryUsage().removeUsageListener(this);
|
||||||
if (memoryUsage != null) {
|
if (memoryUsage != null) {
|
||||||
memoryUsage.stop();
|
memoryUsage.stop();
|
||||||
}
|
}
|
||||||
|
@ -1000,6 +1016,15 @@ public class Queue extends BaseDestination implements Task {
|
||||||
public boolean iterate() {
|
public boolean iterate() {
|
||||||
boolean pageInMoreMessages = false;
|
boolean pageInMoreMessages = false;
|
||||||
synchronized(iteratingMutex) {
|
synchronized(iteratingMutex) {
|
||||||
|
|
||||||
|
// do early to allow dispatch of these waiting messages
|
||||||
|
synchronized(messagesWaitingForSpace) {
|
||||||
|
while (!messagesWaitingForSpace.isEmpty() && !memoryUsage.isFull()) {
|
||||||
|
Runnable op = messagesWaitingForSpace.removeFirst();
|
||||||
|
op.run();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
BrowserDispatch rd;
|
BrowserDispatch rd;
|
||||||
while ((rd = getNextBrowserDispatch()) != null) {
|
while ((rd = getNextBrowserDispatch()) != null) {
|
||||||
pageInMoreMessages = true;
|
pageInMoreMessages = true;
|
||||||
|
@ -1078,13 +1103,7 @@ public class Queue extends BaseDestination implements Task {
|
||||||
LOG.error("Failed to page in more queue messages ", e);
|
LOG.error("Failed to page in more queue messages ", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
synchronized(messagesWaitingForSpace) {
|
return !messagesWaitingForSpace.isEmpty();
|
||||||
while (!messagesWaitingForSpace.isEmpty() && !memoryUsage.isFull()) {
|
|
||||||
Runnable op = messagesWaitingForSpace.removeFirst();
|
|
||||||
op.run();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1520,4 +1539,18 @@ public class Queue extends BaseDestination implements Task {
|
||||||
}
|
}
|
||||||
return sub;
|
return sub;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
|
||||||
|
if (oldPercentUsage > newPercentUsage) {
|
||||||
|
synchronized(messagesWaitingForSpace) {
|
||||||
|
if (!messagesWaitingForSpace.isEmpty() && !memoryUsage.isFull()) {
|
||||||
|
try {
|
||||||
|
this.taskRunner.wakeup();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
LOG.warn(getName() + " failed to wakeup task runner on usageChange: " + e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,8 +25,6 @@ import org.apache.activemq.broker.region.MessageReference;
|
||||||
import org.apache.activemq.command.Message;
|
import org.apache.activemq.command.Message;
|
||||||
import org.apache.activemq.command.MessageId;
|
import org.apache.activemq.command.MessageId;
|
||||||
import org.apache.activemq.store.MessageRecoveryListener;
|
import org.apache.activemq.store.MessageRecoveryListener;
|
||||||
import org.apache.activemq.usage.Usage;
|
|
||||||
import org.apache.activemq.usage.UsageListener;
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
@ -34,7 +32,7 @@ import org.apache.commons.logging.LogFactory;
|
||||||
* Store based cursor
|
* Store based cursor
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor implements MessageRecoveryListener, UsageListener {
|
public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor implements MessageRecoveryListener {
|
||||||
private static final Log LOG = LogFactory.getLog(AbstractStoreCursor.class);
|
private static final Log LOG = LogFactory.getLog(AbstractStoreCursor.class);
|
||||||
protected final Destination regionDestination;
|
protected final Destination regionDestination;
|
||||||
private final LinkedHashMap<MessageId,Message> batchList = new LinkedHashMap<MessageId,Message> ();
|
private final LinkedHashMap<MessageId,Message> batchList = new LinkedHashMap<MessageId,Message> ();
|
||||||
|
@ -60,11 +58,9 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
|
||||||
cacheEnabled=true;
|
cacheEnabled=true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
getSystemUsage().getMemoryUsage().addUsageListener(this);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public final synchronized void stop() throws Exception {
|
public final synchronized void stop() throws Exception {
|
||||||
getSystemUsage().getMemoryUsage().removeUsageListener(this);
|
|
||||||
resetBatch();
|
resetBatch();
|
||||||
super.stop();
|
super.stop();
|
||||||
gc();
|
gc();
|
||||||
|
@ -160,7 +156,9 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
|
||||||
if (cacheEnabled) {
|
if (cacheEnabled) {
|
||||||
cacheEnabled=false;
|
cacheEnabled=false;
|
||||||
// sync with store on disabling the cache
|
// sync with store on disabling the cache
|
||||||
setBatch(lastCachedId);
|
if (lastCachedId != null) {
|
||||||
|
setBatch(lastCachedId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
size++;
|
size++;
|
||||||
|
@ -190,20 +188,6 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
|
||||||
batchList.remove(node.getMessageId());
|
batchList.remove(node.getMessageId());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public final synchronized void onUsageChanged(Usage usage, int oldPercentUsage,
|
|
||||||
int newPercentUsage) {
|
|
||||||
if (oldPercentUsage > newPercentUsage && oldPercentUsage >= memoryUsageHighWaterMark) {
|
|
||||||
storeHasMessages = true;
|
|
||||||
try {
|
|
||||||
fillBatch();
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.error("Failed to fill batch ", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
public final synchronized void clear() {
|
public final synchronized void clear() {
|
||||||
gc();
|
gc();
|
||||||
}
|
}
|
||||||
|
@ -229,7 +213,6 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
|
||||||
resetBatch();
|
resetBatch();
|
||||||
this.batchResetNeeded = false;
|
this.batchResetNeeded = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if( this.batchList.isEmpty() && (this.storeHasMessages ||this.size >0)) {
|
if( this.batchList.isEmpty() && (this.storeHasMessages ||this.size >0)) {
|
||||||
this.storeHasMessages = false;
|
this.storeHasMessages = false;
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -32,7 +32,8 @@ import org.apache.activemq.kaha.StoreEntry;
|
||||||
import org.apache.activemq.store.MessageRecoveryListener;
|
import org.apache.activemq.store.MessageRecoveryListener;
|
||||||
import org.apache.activemq.store.ReferenceStore;
|
import org.apache.activemq.store.ReferenceStore;
|
||||||
import org.apache.activemq.store.AbstractMessageStore;
|
import org.apache.activemq.store.AbstractMessageStore;
|
||||||
import org.apache.activemq.usage.MemoryUsage;
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author rajdavies
|
* @author rajdavies
|
||||||
|
@ -40,6 +41,7 @@ import org.apache.activemq.usage.MemoryUsage;
|
||||||
*/
|
*/
|
||||||
public class KahaReferenceStore extends AbstractMessageStore implements ReferenceStore {
|
public class KahaReferenceStore extends AbstractMessageStore implements ReferenceStore {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(KahaReferenceStore.class);
|
||||||
protected final MapContainer<MessageId, ReferenceRecord> messageContainer;
|
protected final MapContainer<MessageId, ReferenceRecord> messageContainer;
|
||||||
protected KahaReferenceStoreAdapter adapter;
|
protected KahaReferenceStoreAdapter adapter;
|
||||||
private StoreEntry batchEntry;
|
private StoreEntry batchEntry;
|
||||||
|
@ -120,6 +122,11 @@ public class KahaReferenceStore extends AbstractMessageStore implements Referenc
|
||||||
if ( recoverReference(listener, msg)) {
|
if ( recoverReference(listener, msg)) {
|
||||||
count++;
|
count++;
|
||||||
lastBatchId = msg.getMessageId();
|
lastBatchId = msg.getMessageId();
|
||||||
|
} else {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug(destination.getQualifiedName() + " did not recover:" + msg.getMessageId());
|
||||||
|
}
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
lastBatchId = null;
|
lastBatchId = null;
|
||||||
|
|
|
@ -0,0 +1,199 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.activemq.bugs;
|
||||||
|
|
||||||
|
import java.util.Vector;
|
||||||
|
|
||||||
|
import junit.framework.TestCase;
|
||||||
|
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.DeliveryMode;
|
||||||
|
import javax.jms.JMSException;
|
||||||
|
import javax.jms.Message;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageListener;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Session;
|
||||||
|
|
||||||
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
|
import org.apache.activemq.usage.MemoryUsage;
|
||||||
|
import org.apache.activemq.usage.SystemUsage;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
public class AMQ2149Test extends TestCase {
|
||||||
|
|
||||||
|
private static final Log log = LogFactory.getLog(AMQ2149Test.class);
|
||||||
|
|
||||||
|
private String BROKER_URL;
|
||||||
|
private final String SEQ_NUM_PROPERTY = "seqNum";
|
||||||
|
|
||||||
|
final int MESSAGE_LENGTH_BYTES = 75000;
|
||||||
|
final int MAX_TO_SEND = 2000;
|
||||||
|
final long SLEEP_BETWEEN_SEND_MS = 5;
|
||||||
|
final int NUM_SENDERS_AND_RECEIVERS = 10;
|
||||||
|
|
||||||
|
BrokerService broker;
|
||||||
|
Vector<Throwable> exceptions = new Vector<Throwable>();
|
||||||
|
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
broker = new BrokerService();
|
||||||
|
broker.addConnector("tcp://localhost:0");
|
||||||
|
broker.deleteAllMessages();
|
||||||
|
|
||||||
|
SystemUsage usage = new SystemUsage();
|
||||||
|
MemoryUsage memoryUsage = new MemoryUsage();
|
||||||
|
memoryUsage.setLimit(2048 * 7 * NUM_SENDERS_AND_RECEIVERS);
|
||||||
|
usage.setMemoryUsage(memoryUsage);
|
||||||
|
broker.setSystemUsage(usage);
|
||||||
|
broker.start();
|
||||||
|
|
||||||
|
BROKER_URL = "failover:("
|
||||||
|
+ broker.getTransportConnectors().get(0).getUri()
|
||||||
|
+")?maxReconnectDelay=1000&useExponentialBackOff=false";
|
||||||
|
}
|
||||||
|
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
broker.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
private String buildLongString() {
|
||||||
|
final StringBuilder stringBuilder = new StringBuilder(
|
||||||
|
MESSAGE_LENGTH_BYTES);
|
||||||
|
for (int i = 0; i < MESSAGE_LENGTH_BYTES; ++i) {
|
||||||
|
stringBuilder.append((int) (Math.random() * 10));
|
||||||
|
}
|
||||||
|
return stringBuilder.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
private class Receiver implements MessageListener {
|
||||||
|
|
||||||
|
private final String queueName;
|
||||||
|
|
||||||
|
private final Connection connection;
|
||||||
|
|
||||||
|
private final Session session;
|
||||||
|
|
||||||
|
private final MessageConsumer messageConsumer;
|
||||||
|
|
||||||
|
private volatile long nextExpectedSeqNum = 0;
|
||||||
|
|
||||||
|
public Receiver(String queueName) throws JMSException {
|
||||||
|
this.queueName = queueName;
|
||||||
|
connection = new ActiveMQConnectionFactory(BROKER_URL)
|
||||||
|
.createConnection();
|
||||||
|
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
messageConsumer = session.createConsumer(new ActiveMQQueue(
|
||||||
|
queueName));
|
||||||
|
messageConsumer.setMessageListener(this);
|
||||||
|
connection.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void onMessage(Message message) {
|
||||||
|
try {
|
||||||
|
final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY);
|
||||||
|
if ((seqNum % 100) == 0) {
|
||||||
|
log.info(queueName + " received " + seqNum);
|
||||||
|
}
|
||||||
|
if (seqNum != nextExpectedSeqNum) {
|
||||||
|
log.warn(queueName + " received " + seqNum + " expected "
|
||||||
|
+ nextExpectedSeqNum);
|
||||||
|
fail(queueName + " received " + seqNum + " expected "
|
||||||
|
+ nextExpectedSeqNum);
|
||||||
|
}
|
||||||
|
++nextExpectedSeqNum;
|
||||||
|
} catch (Throwable e) {
|
||||||
|
log.error(queueName + " onMessage error", e);
|
||||||
|
exceptions.add(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private class Sender implements Runnable {
|
||||||
|
|
||||||
|
private final String queueName;
|
||||||
|
|
||||||
|
private final Connection connection;
|
||||||
|
|
||||||
|
private final Session session;
|
||||||
|
|
||||||
|
private final MessageProducer messageProducer;
|
||||||
|
|
||||||
|
private volatile long nextSequenceNumber = 0;
|
||||||
|
|
||||||
|
public Sender(String queueName) throws JMSException {
|
||||||
|
this.queueName = queueName;
|
||||||
|
connection = new ActiveMQConnectionFactory(BROKER_URL)
|
||||||
|
.createConnection();
|
||||||
|
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
messageProducer = session.createProducer(new ActiveMQQueue(
|
||||||
|
queueName));
|
||||||
|
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||||
|
connection.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void run() {
|
||||||
|
final String longString = buildLongString();
|
||||||
|
while (nextSequenceNumber <= MAX_TO_SEND) {
|
||||||
|
try {
|
||||||
|
final Message message = session
|
||||||
|
.createTextMessage(longString);
|
||||||
|
message.setLongProperty(SEQ_NUM_PROPERTY,
|
||||||
|
nextSequenceNumber);
|
||||||
|
++nextSequenceNumber;
|
||||||
|
messageProducer.send(message);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error(queueName + " send error", e);
|
||||||
|
exceptions.add(e);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
Thread.sleep(SLEEP_BETWEEN_SEND_MS);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
log.warn(queueName + " sleep interrupted", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testOutOfOrderWithMemeUsageLimit() throws Exception {
|
||||||
|
Vector<Thread> threads = new Vector<Thread>();
|
||||||
|
|
||||||
|
for (int i = 0; i < NUM_SENDERS_AND_RECEIVERS; ++i) {
|
||||||
|
final String queueName = "test.queue." + i;
|
||||||
|
new Receiver(queueName);
|
||||||
|
Thread thread = new Thread(new Sender(queueName));
|
||||||
|
thread.start();
|
||||||
|
threads.add(thread);
|
||||||
|
}
|
||||||
|
|
||||||
|
final long expiry = System.currentTimeMillis() + 1000 * 60 * 5;
|
||||||
|
while(!threads.isEmpty() && exceptions.isEmpty() && System.currentTimeMillis() < expiry) {
|
||||||
|
Thread sendThread = threads.firstElement();
|
||||||
|
sendThread.join(1000*10);
|
||||||
|
if (!sendThread.isAlive()) {
|
||||||
|
threads.remove(sendThread);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertTrue("No timeout waiting for senders to complete", System.currentTimeMillis() < expiry);
|
||||||
|
assertTrue("No exceptions", exceptions.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -18,7 +18,7 @@
|
||||||
<beans>
|
<beans>
|
||||||
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
|
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
|
||||||
|
|
||||||
<broker brokerName="master" persistent="false" useJmx="false" deleteAllMessagesOnStartup="true" xmlns="http://activemq.apache.org/schema/core">
|
<broker brokerName="master" useJmx="false" deleteAllMessagesOnStartup="true" xmlns="http://activemq.apache.org/schema/core">
|
||||||
<transportConnectors>
|
<transportConnectors>
|
||||||
<transportConnector uri="tcp://localhost:62001"/>
|
<transportConnector uri="tcp://localhost:62001"/>
|
||||||
</transportConnectors>
|
</transportConnectors>
|
||||||
|
|
|
@ -18,7 +18,7 @@
|
||||||
<beans>
|
<beans>
|
||||||
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
|
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
|
||||||
|
|
||||||
<broker brokerName="slave" useJmx="false" masterConnectorURI="tcp://localhost:62001" xmlns="http://activemq.apache.org/schema/core">
|
<broker brokerName="slave" deleteAllMessagesOnStartup="true" useJmx="false" masterConnectorURI="tcp://localhost:62001" xmlns="http://activemq.apache.org/schema/core">
|
||||||
<transportConnectors>
|
<transportConnectors>
|
||||||
<transportConnector uri="tcp://localhost:62002"/>
|
<transportConnector uri="tcp://localhost:62002"/>
|
||||||
</transportConnectors>
|
</transportConnectors>
|
||||||
|
|
Loading…
Reference in New Issue