diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 925e0c6176..f322747c92 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -17,12 +17,10 @@ package org.apache.activemq.broker.region; import java.io.IOException; - import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; @@ -78,7 +76,7 @@ public class Queue extends BaseDestination implements Task { private final Log log; private final ActiveMQDestination destination; - private final List consumers = new CopyOnWriteArrayList(); + private final List consumers = new ArrayList(50); private final SystemUsage systemUsage; private final MemoryUsage memoryUsage; private final DestinationStatistics destinationStatistics = new DestinationStatistics(); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java index 7395439507..a143319ed6 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java @@ -40,6 +40,7 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor { protected boolean enableAudit=true; protected ActiveMQMessageAudit audit; private boolean started=false; + public synchronized void start() throws Exception { if (!started && enableAudit && audit==null) { @@ -260,6 +261,10 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor { audit.rollback(id); } } + + protected synchronized boolean isStarted() { + return started; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java index 56f597b15a..ab04be92ec 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java @@ -43,6 +43,7 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements Message private Destination regionDestination; private int size; private boolean fillBatchDuplicates; + private boolean cacheEnabled; /** * @param topic @@ -56,7 +57,13 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements Message } - public void start() throws Exception{ + public synchronized void start() throws Exception{ + if (!isStarted()) { + this.size = getStoreSize(); + if (this.size==0) { + cacheEnabled=true; + } + } super.start(); store.resetBatching(); } @@ -78,16 +85,22 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements Message } public synchronized int size() { - try { - size = store.getMessageCount(); - } catch (IOException e) { - LOG.error("Failed to get message count", e); - throw new RuntimeException(e); + if (isStarted()) { + return size; } + this.size = getStoreSize(); return size; + } public synchronized void addMessageLast(MessageReference node) throws Exception { + if (cacheEnabled && !isFull()) { + //optimization - A persistent queue will add the message to + //to store then retrieve it again from the store. + recoverMessage(node.getMessage()); + }else { + cacheEnabled=false; + } size++; } @@ -95,12 +108,16 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements Message size++; } - public void remove() { + public synchronized void remove() { size--; + if (size==0 && isStarted()) { + cacheEnabled=true; + } } public void remove(MessageReference node) { size--; + cacheEnabled=false; } public synchronized boolean hasNext() { @@ -157,10 +174,11 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements Message } } - public void gc() { + public synchronized void gc() { for (Message msg : batchList) { msg.decrementReferenceCount(); } + cacheEnabled=false; batchList.clear(); } @@ -173,6 +191,15 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements Message } fillBatchDuplicates=false; } + + protected synchronized int getStoreSize() { + try { + return store.getMessageCount(); + } catch (IOException e) { + LOG.error("Failed to get message count", e); + throw new RuntimeException(e); + } + } public String toString() { return "QueueStorePrefetch" + System.identityHashCode(this); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java index 9a05c09ab2..95090e5e93 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java @@ -43,11 +43,15 @@ public class RoundRobinDispatchPolicy implements DispatchPolicy { * @see org.apache.activemq.broker.region.policy.DispatchPolicy#dispatch(org.apache.activemq.broker.region.MessageReference, * org.apache.activemq.filter.MessageEvaluationContext, java.util.List) */ - public boolean dispatch(MessageReference node, MessageEvaluationContext msgContext, List consumers) throws Exception { - int count = 0; + public boolean dispatch(MessageReference node, + MessageEvaluationContext msgContext, List consumers) + throws Exception { + int count = 0; - Subscription firstMatchingConsumer = null; - for (Iterator iter = consumers.iterator(); iter.hasNext();) { + Subscription firstMatchingConsumer = null; + synchronized (consumers) { + for (Iterator iter = consumers.iterator(); iter + .hasNext();) { Subscription sub = iter.next(); // Only dispatch to interested subscriptions @@ -71,6 +75,7 @@ public class RoundRobinDispatchPolicy implements DispatchPolicy { } catch (Throwable bestEffort) { } } - return count > 0; + } + return count > 0; } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java index 3acc2438f8..f8d0cabd26 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java @@ -432,7 +432,7 @@ public class AMQMessageStore implements MessageStore { */ public void recover(final MessageRecoveryListener listener) throws Exception { flush(); - referenceStore.recover(new RecoveryListenerAdapter(this, listener)); + referenceStore.recover(new RecoveryListenerAdapter(this, listener)); } public void start() throws Exception { @@ -483,29 +483,41 @@ public class AMQMessageStore implements MessageStore { } public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception { - /* - * RecoveryListenerAdapter recoveryListener=new - * RecoveryListenerAdapter(this,listener); - * if(referenceStore.supportsExternalBatchControl()){ - * synchronized(this){ - * referenceStore.recoverNextMessages(maxReturned,recoveryListener); - * if(recoveryListener.size()==0&&recoveryListener.hasSpace()){ // check - * for inflight messages int count=0; Iterator> - * iterator=messages.entrySet().iterator(); - * while(iterator.hasNext()&&count entry=iterator.next(); ReferenceData - * data=entry.getValue(); Message message=getMessage(data); - * recoveryListener.recoverMessage(message); count++; } - * referenceStore.setBatch(recoveryListener.getLastRecoveredMessageId()); } } - * }else{ flush(); - * referenceStore.recoverNextMessages(maxReturned,recoveryListener); } - */ + + RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter( + this, listener); + if (referenceStore.supportsExternalBatchControl()) { + synchronized (this) { + referenceStore.recoverNextMessages(maxReturned, + recoveryListener); + if (recoveryListener.size() == 0 && recoveryListener.hasSpace()) { + int count = 0; + Iterator> iterator = messages + .entrySet().iterator(); + while (iterator.hasNext() && count < maxReturned + && recoveryListener.hasSpace()) { + Entry entry = iterator.next(); + ReferenceData data = entry.getValue(); + Message message = getMessage(data); + recoveryListener.recoverMessage(message); + count++; + } + referenceStore.setBatch(recoveryListener + .getLastRecoveredMessageId()); + } + } + } else { + flush(); + referenceStore.recoverNextMessages(maxReturned, recoveryListener); + } + /* RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter(this, listener); referenceStore.recoverNextMessages(maxReturned, recoveryListener); if (recoveryListener.size() == 0 && recoveryListener.hasSpace()) { flush(); referenceStore.recoverNextMessages(maxReturned, recoveryListener); } + */ } Message getMessage(ReferenceData data) throws IOException { diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java index ed2e8eff68..77d3ef841c 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java @@ -24,7 +24,8 @@ public class BrokerRestartTestSupport extends BrokerTestSupport { protected BrokerService createBroker() throws Exception { BrokerService broker = new BrokerService(); - broker.setPersistent(false); + //broker.setPersistent(false); + broker.setDeleteAllMessagesOnStartup(true); persistenceAdapter = broker.getPersistenceAdapter(); return broker; } @@ -35,7 +36,7 @@ public class BrokerRestartTestSupport extends BrokerTestSupport { */ protected BrokerService createRestartedBroker() throws Exception { BrokerService broker = new BrokerService(); - broker.setPersistenceAdapter(persistenceAdapter); + //broker.setPersistenceAdapter(persistenceAdapter); return broker; } diff --git a/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java b/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java index f503bf3527..cf98b4e0c2 100644 --- a/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java @@ -30,9 +30,9 @@ public class SimpleQueueTest extends SimpleTopicTest { } protected void setUp() throws Exception { - numberOfConsumers = 50; - numberofProducers = 50; - this.consumerSleepDuration=10; + numberOfConsumers = 10; + numberofProducers = 10; + this.consumerSleepDuration=20; super.setUp(); } diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/QueueRepeaterTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/QueueRepeaterTest.java index 1aada3f0e0..3445a01df1 100755 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/QueueRepeaterTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/QueueRepeaterTest.java @@ -18,6 +18,7 @@ package org.apache.activemq.usecases; import java.util.Date; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import javax.jms.Connection; import javax.jms.Destination; @@ -56,7 +57,7 @@ public final class QueueRepeaterTest extends TestCase { public void testTransaction() throws Exception { - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); connection = factory.createConnection(); queue = new ActiveMQQueue(getClass().getName() + "." + getName()); @@ -104,8 +105,8 @@ public final class QueueRepeaterTest extends TestCase { } LOG.info("Waiting for latch"); - latch.await(); - + latch.await(2,TimeUnit.SECONDS); + assertNotNull(receivedText); LOG.info("test completed, destination=" + receivedText); } diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/TestSupport.java b/activemq-core/src/test/java/org/apache/activemq/usecases/TestSupport.java index 3dcd5e548b..31e7a48b80 100755 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/TestSupport.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/TestSupport.java @@ -120,7 +120,7 @@ public class TestSupport extends TestCase { } protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { - return new ActiveMQConnectionFactory("vm://localhost"); + return new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); } /** diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/TransactionTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/TransactionTest.java index 99778d03c4..e7495d7814 100755 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/TransactionTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/TransactionTest.java @@ -107,18 +107,6 @@ public final class TransactionTest extends TestCase { LOG.info("Waiting for latch"); latch.await(2,TimeUnit.SECONDS); - if (receivedText==null) { - /* - Map map = Thread.getAllStackTraces(); - for (Map.Entry entry: map.entrySet()) { - System.out.println(entry.getKey()); - for (StackTraceElement element :entry.getValue()) { - System.out.println(element); - } - } - */ - fail("No message received"); - } assertNotNull(receivedText); LOG.info("test completed, destination=" + receivedText); }