Tweaking for Queue performance and concurrency

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@607317 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-12-28 19:52:24 +00:00
parent c493b87f0a
commit 409902c394
10 changed files with 92 additions and 55 deletions

View File

@ -17,12 +17,10 @@
package org.apache.activemq.broker.region; package org.apache.activemq.broker.region;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.jms.InvalidSelectorException; import javax.jms.InvalidSelectorException;
import javax.jms.JMSException; import javax.jms.JMSException;
@ -78,7 +76,7 @@ public class Queue extends BaseDestination implements Task {
private final Log log; private final Log log;
private final ActiveMQDestination destination; private final ActiveMQDestination destination;
private final List<Subscription> consumers = new CopyOnWriteArrayList<Subscription>(); private final List<Subscription> consumers = new ArrayList<Subscription>(50);
private final SystemUsage systemUsage; private final SystemUsage systemUsage;
private final MemoryUsage memoryUsage; private final MemoryUsage memoryUsage;
private final DestinationStatistics destinationStatistics = new DestinationStatistics(); private final DestinationStatistics destinationStatistics = new DestinationStatistics();

View File

@ -41,6 +41,7 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor {
protected ActiveMQMessageAudit audit; protected ActiveMQMessageAudit audit;
private boolean started=false; private boolean started=false;
public synchronized void start() throws Exception { public synchronized void start() throws Exception {
if (!started && enableAudit && audit==null) { if (!started && enableAudit && audit==null) {
audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit); audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
@ -261,5 +262,9 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor {
} }
} }
protected synchronized boolean isStarted() {
return started;
}
} }

View File

@ -43,6 +43,7 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements Message
private Destination regionDestination; private Destination regionDestination;
private int size; private int size;
private boolean fillBatchDuplicates; private boolean fillBatchDuplicates;
private boolean cacheEnabled;
/** /**
* @param topic * @param topic
@ -56,7 +57,13 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements Message
} }
public void start() throws Exception{ public synchronized void start() throws Exception{
if (!isStarted()) {
this.size = getStoreSize();
if (this.size==0) {
cacheEnabled=true;
}
}
super.start(); super.start();
store.resetBatching(); store.resetBatching();
} }
@ -78,16 +85,22 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements Message
} }
public synchronized int size() { public synchronized int size() {
try { if (isStarted()) {
size = store.getMessageCount();
} catch (IOException e) {
LOG.error("Failed to get message count", e);
throw new RuntimeException(e);
}
return size; return size;
} }
this.size = getStoreSize();
return size;
}
public synchronized void addMessageLast(MessageReference node) throws Exception { public synchronized void addMessageLast(MessageReference node) throws Exception {
if (cacheEnabled && !isFull()) {
//optimization - A persistent queue will add the message to
//to store then retrieve it again from the store.
recoverMessage(node.getMessage());
}else {
cacheEnabled=false;
}
size++; size++;
} }
@ -95,12 +108,16 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements Message
size++; size++;
} }
public void remove() { public synchronized void remove() {
size--; size--;
if (size==0 && isStarted()) {
cacheEnabled=true;
}
} }
public void remove(MessageReference node) { public void remove(MessageReference node) {
size--; size--;
cacheEnabled=false;
} }
public synchronized boolean hasNext() { public synchronized boolean hasNext() {
@ -157,10 +174,11 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements Message
} }
} }
public void gc() { public synchronized void gc() {
for (Message msg : batchList) { for (Message msg : batchList) {
msg.decrementReferenceCount(); msg.decrementReferenceCount();
} }
cacheEnabled=false;
batchList.clear(); batchList.clear();
} }
@ -174,6 +192,15 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements Message
fillBatchDuplicates=false; fillBatchDuplicates=false;
} }
protected synchronized int getStoreSize() {
try {
return store.getMessageCount();
} catch (IOException e) {
LOG.error("Failed to get message count", e);
throw new RuntimeException(e);
}
}
public String toString() { public String toString() {
return "QueueStorePrefetch" + System.identityHashCode(this); return "QueueStorePrefetch" + System.identityHashCode(this);
} }

View File

@ -43,11 +43,15 @@ public class RoundRobinDispatchPolicy implements DispatchPolicy {
* @see org.apache.activemq.broker.region.policy.DispatchPolicy#dispatch(org.apache.activemq.broker.region.MessageReference, * @see org.apache.activemq.broker.region.policy.DispatchPolicy#dispatch(org.apache.activemq.broker.region.MessageReference,
* org.apache.activemq.filter.MessageEvaluationContext, java.util.List) * org.apache.activemq.filter.MessageEvaluationContext, java.util.List)
*/ */
public boolean dispatch(MessageReference node, MessageEvaluationContext msgContext, List<Subscription> consumers) throws Exception { public boolean dispatch(MessageReference node,
MessageEvaluationContext msgContext, List<Subscription> consumers)
throws Exception {
int count = 0; int count = 0;
Subscription firstMatchingConsumer = null; Subscription firstMatchingConsumer = null;
for (Iterator<Subscription> iter = consumers.iterator(); iter.hasNext();) { synchronized (consumers) {
for (Iterator<Subscription> iter = consumers.iterator(); iter
.hasNext();) {
Subscription sub = iter.next(); Subscription sub = iter.next();
// Only dispatch to interested subscriptions // Only dispatch to interested subscriptions
@ -71,6 +75,7 @@ public class RoundRobinDispatchPolicy implements DispatchPolicy {
} catch (Throwable bestEffort) { } catch (Throwable bestEffort) {
} }
} }
}
return count > 0; return count > 0;
} }
} }

View File

@ -483,29 +483,41 @@ public class AMQMessageStore implements MessageStore {
} }
public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception { public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter(
this, listener);
if (referenceStore.supportsExternalBatchControl()) {
synchronized (this) {
referenceStore.recoverNextMessages(maxReturned,
recoveryListener);
if (recoveryListener.size() == 0 && recoveryListener.hasSpace()) {
int count = 0;
Iterator<Entry<MessageId, ReferenceData>> iterator = messages
.entrySet().iterator();
while (iterator.hasNext() && count < maxReturned
&& recoveryListener.hasSpace()) {
Entry<MessageId, ReferenceData> entry = iterator.next();
ReferenceData data = entry.getValue();
Message message = getMessage(data);
recoveryListener.recoverMessage(message);
count++;
}
referenceStore.setBatch(recoveryListener
.getLastRecoveredMessageId());
}
}
} else {
flush();
referenceStore.recoverNextMessages(maxReturned, recoveryListener);
}
/* /*
* RecoveryListenerAdapter recoveryListener=new
* RecoveryListenerAdapter(this,listener);
* if(referenceStore.supportsExternalBatchControl()){
* synchronized(this){
* referenceStore.recoverNextMessages(maxReturned,recoveryListener);
* if(recoveryListener.size()==0&&recoveryListener.hasSpace()){ // check
* for inflight messages int count=0; Iterator<Entry<MessageId,ReferenceData>>
* iterator=messages.entrySet().iterator();
* while(iterator.hasNext()&&count<maxReturned&&recoveryListener.hasSpace()){
* Entry<MessageId,ReferenceData> entry=iterator.next(); ReferenceData
* data=entry.getValue(); Message message=getMessage(data);
* recoveryListener.recoverMessage(message); count++; }
* referenceStore.setBatch(recoveryListener.getLastRecoveredMessageId()); } }
* }else{ flush();
* referenceStore.recoverNextMessages(maxReturned,recoveryListener); }
*/
RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter(this, listener); RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter(this, listener);
referenceStore.recoverNextMessages(maxReturned, recoveryListener); referenceStore.recoverNextMessages(maxReturned, recoveryListener);
if (recoveryListener.size() == 0 && recoveryListener.hasSpace()) { if (recoveryListener.size() == 0 && recoveryListener.hasSpace()) {
flush(); flush();
referenceStore.recoverNextMessages(maxReturned, recoveryListener); referenceStore.recoverNextMessages(maxReturned, recoveryListener);
} }
*/
} }
Message getMessage(ReferenceData data) throws IOException { Message getMessage(ReferenceData data) throws IOException {

View File

@ -24,7 +24,8 @@ public class BrokerRestartTestSupport extends BrokerTestSupport {
protected BrokerService createBroker() throws Exception { protected BrokerService createBroker() throws Exception {
BrokerService broker = new BrokerService(); BrokerService broker = new BrokerService();
broker.setPersistent(false); //broker.setPersistent(false);
broker.setDeleteAllMessagesOnStartup(true);
persistenceAdapter = broker.getPersistenceAdapter(); persistenceAdapter = broker.getPersistenceAdapter();
return broker; return broker;
} }
@ -35,7 +36,7 @@ public class BrokerRestartTestSupport extends BrokerTestSupport {
*/ */
protected BrokerService createRestartedBroker() throws Exception { protected BrokerService createRestartedBroker() throws Exception {
BrokerService broker = new BrokerService(); BrokerService broker = new BrokerService();
broker.setPersistenceAdapter(persistenceAdapter); //broker.setPersistenceAdapter(persistenceAdapter);
return broker; return broker;
} }

View File

@ -30,9 +30,9 @@ public class SimpleQueueTest extends SimpleTopicTest {
} }
protected void setUp() throws Exception { protected void setUp() throws Exception {
numberOfConsumers = 50; numberOfConsumers = 10;
numberofProducers = 50; numberofProducers = 10;
this.consumerSleepDuration=10; this.consumerSleepDuration=20;
super.setUp(); super.setUp();
} }

View File

@ -18,6 +18,7 @@ package org.apache.activemq.usecases;
import java.util.Date; import java.util.Date;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.Destination; import javax.jms.Destination;
@ -56,7 +57,7 @@ public final class QueueRepeaterTest extends TestCase {
public void testTransaction() throws Exception { public void testTransaction() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
connection = factory.createConnection(); connection = factory.createConnection();
queue = new ActiveMQQueue(getClass().getName() + "." + getName()); queue = new ActiveMQQueue(getClass().getName() + "." + getName());
@ -104,8 +105,8 @@ public final class QueueRepeaterTest extends TestCase {
} }
LOG.info("Waiting for latch"); LOG.info("Waiting for latch");
latch.await(); latch.await(2,TimeUnit.SECONDS);
assertNotNull(receivedText);
LOG.info("test completed, destination=" + receivedText); LOG.info("test completed, destination=" + receivedText);
} }

View File

@ -120,7 +120,7 @@ public class TestSupport extends TestCase {
} }
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
return new ActiveMQConnectionFactory("vm://localhost"); return new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
} }
/** /**

View File

@ -107,18 +107,6 @@ public final class TransactionTest extends TestCase {
LOG.info("Waiting for latch"); LOG.info("Waiting for latch");
latch.await(2,TimeUnit.SECONDS); latch.await(2,TimeUnit.SECONDS);
if (receivedText==null) {
/*
Map<Thread,StackTraceElement[]> map = Thread.getAllStackTraces();
for (Map.Entry<Thread,StackTraceElement[]> entry: map.entrySet()) {
System.out.println(entry.getKey());
for (StackTraceElement element :entry.getValue()) {
System.out.println(element);
}
}
*/
fail("No message received");
}
assertNotNull(receivedText); assertNotNull(receivedText);
LOG.info("test completed, destination=" + receivedText); LOG.info("test completed, destination=" + receivedText);
} }