diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index ee88c8eb68..132c349d58 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -25,7 +25,6 @@ import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.policy.DeadLetterStrategy; import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageDispatchNotification; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index ae58c0f65a..afaa48bc66 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -16,6 +16,28 @@ */ package org.apache.activemq.broker.region; +import java.io.IOException; +import java.util.AbstractList; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import javax.jms.InvalidSelectorException; +import javax.jms.JMSException; + import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; @@ -54,26 +76,6 @@ import org.apache.activemq.usage.UsageListener; import org.apache.activemq.util.BrokerSupport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import java.io.IOException; -import java.util.AbstractList; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import javax.jms.InvalidSelectorException; -import javax.jms.JMSException; /** @@ -198,10 +200,12 @@ public class Queue extends BaseDestination implements Task, UsageListener { public boolean recoverMessage(Message message) { // Message could have expired while it was being // loaded.. - if (broker.isExpired(message)) { - messageExpired(createConnectionContext(), createMessageReference(message)); - // drop message will decrement so counter balance here - destinationStatistics.getMessages().increment(); + if (message.isExpired()) { + if (broker.isExpired(message)) { + messageExpired(createConnectionContext(), createMessageReference(message)); + // drop message will decrement so counter balance here + destinationStatistics.getMessages().increment(); + } return true; } if (hasSpace()) { @@ -439,6 +443,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { // While waiting for space to free up... the // message may have expired. if (message.isExpired()) { + LOG.error("expired waiting for space.."); broker.messageExpired(context, message); destinationStatistics.getExpired().increment(); } else { @@ -585,7 +590,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { return null; } }; - doBrowse(true, browsedMessages, this.getMaxExpirePageSize()); + doBrowse(browsedMessages, this.getMaxExpirePageSize()); } public void gc(){ @@ -749,14 +754,15 @@ public class Queue extends BaseDestination implements Task, UsageListener { public Message[] browse() { List l = new ArrayList(); - doBrowse(false, l, getMaxBrowsePageSize()); + doBrowse(l, getMaxBrowsePageSize()); return l.toArray(new Message[l.size()]); } - public void doBrowse(boolean forcePageIn, List l, int max) { + + public void doBrowse(List l, int max) { final ConnectionContext connectionContext = createConnectionContext(); try { - pageInMessages(forcePageIn); + pageInMessages(false); List toExpire = new ArrayList(); synchronized(dispatchMutex) { synchronized (pagedInPendingDispatch) { @@ -770,7 +776,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { } toExpire.clear(); synchronized (pagedInMessages) { - addAll(pagedInMessages.values(), l, max, toExpire); + addAll(pagedInMessages.values(), l, max, toExpire); } for (MessageReference ref : toExpire) { if (broker.isExpired(ref)) { @@ -787,13 +793,16 @@ public class Queue extends BaseDestination implements Task, UsageListener { try { messages.reset(); while (messages.hasNext() && l.size() < max) { - MessageReference node = messages.next(); - messages.rollback(node.getMessageId()); - if (node != null) { + MessageReference node = messages.next(); + if (node.isExpired()) { if (broker.isExpired(node)) { messageExpired(connectionContext, createMessageReference(node.getMessage())); - } else if (l.contains(node.getMessage()) == false) { + } + messages.remove(); + } else { + messages.rollback(node.getMessageId()); + if (l.contains(node.getMessage()) == false) { l.add(node.getMessage()); } } @@ -806,7 +815,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { } } catch (Exception e) { LOG.error("Problem retrieving message for browse", e); - } + } } private void addAll(Collection refs, @@ -1278,7 +1287,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { } public void messageExpired(ConnectionContext context,Subscription subs, MessageReference reference) { - if (LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("message expired: " + reference); } broker.messageExpired(context, reference); @@ -1371,12 +1380,14 @@ public class Queue extends BaseDestination implements Task, UsageListener { node.incrementReferenceCount(); messages.remove(); QueueMessageReference ref = createMessageReference(node.getMessage()); - if (!broker.isExpired(node)) { + if (ref.isExpired()) { + if (broker.isExpired(ref)) { + messageExpired(createConnectionContext(), ref); + } + } else { result.add(ref); count++; - } else { - messageExpired(createConnectionContext(), ref); - } + } } } finally { messages.release(); diff --git a/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java b/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java new file mode 100644 index 0000000000..9b03dba84f --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java @@ -0,0 +1,79 @@ +package org.apache.activemq.util; + +import java.util.HashMap; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Debugging tool to track entry points through code, useful to see runtime call paths + * To use, add to a method as follows: + * public void someMethod() { + * ThreadTracker.track("someMethod"); + * ... + * } + * and at some stage call result to get a LOG + * output of the callers with an associated call count + * + */ +public class ThreadTracker { + + static final Log LOG = LogFactory.getLog(ThreadTracker.class); + static HashMap trackers = new HashMap(); + + /** + * track the stack trace of callers + * @param name the method being tracked + */ + public static void track(String name) { + Tracker t; + synchronized(trackers) { + t = trackers.get(name); + if (t == null) { + t = new Tracker(); + trackers.put(name, t); + } + } + t.track(); + } + + /** + * output the result of stack trace capture to the log + */ + public static void result() { + for (Entry t: trackers.entrySet()) { + LOG.info("Tracker: " + t.getKey() + ", " + t.getValue().size() + " entry points..."); + for (Trace trace : t.getValue().values()) { + LOG.info("count: " + trace.count, trace); + } + LOG.info("Tracker: " + t.getKey() + ", done."); + } + } + +} + +@SuppressWarnings("serial") +class Trace extends Throwable { + public int count; + public final int size; + Trace() { + super(); + size = this.getStackTrace().length; + } +} + +@SuppressWarnings("serial") +class Tracker extends HashMap { + public void track() { + Trace current = new Trace(); + synchronized(this) { + Trace exist = get(current.size); + if (exist != null) { + exist.count++; + } else { + put(current.size, current); + } + } + } +} diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveSingleUrlTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveSingleUrlTest.java index 78845b2f89..423ecddce3 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveSingleUrlTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveSingleUrlTest.java @@ -16,11 +16,14 @@ */ package org.apache.activemq.broker.ft; +import java.io.File; +import java.io.IOException; import java.net.URI; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.store.amq.AMQPersistenceAdapter; public class QueueMasterSlaveSingleUrlTest extends QueueMasterSlaveTest { @@ -33,17 +36,25 @@ public class QueueMasterSlaveSingleUrlTest extends QueueMasterSlaveTest { protected void createMaster() throws Exception { master = new BrokerService(); - master.setBrokerName("shared"); + master.setBrokerName("shared-master"); + configureSharedPersistenceAdapter(master); master.addConnector(brokerUrl); master.start(); } + private void configureSharedPersistenceAdapter(BrokerService broker) throws Exception { + AMQPersistenceAdapter adapter = new AMQPersistenceAdapter(); + adapter.setDirectory(new File("shared")); + broker.setPersistenceAdapter(adapter); + } + protected void createSlave() throws Exception { new Thread(new Runnable() { public void run() { try { BrokerService broker = new BrokerService(); - broker.setBrokerName("shared"); + broker.setBrokerName("shared-slave"); + configureSharedPersistenceAdapter(broker); // add transport as a service so that it is bound on start, after store started final TransportConnector tConnector = new TransportConnector(); tConnector.setUri(new URI(brokerUrl)); diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java index 2fed7a3ef1..92309ea41d 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java @@ -98,6 +98,23 @@ public class QueuePurgeTest extends TestCase { proxy.getQueueSize()); } + public void testRepeatedExpiryProcessingOfLargeQueue() throws Exception { + applyBrokerSpoolingPolicy(); + final int exprityPeriod = 1000; + applyExpiryDuration(exprityPeriod); + createProducerAndSendMessages(90000); + QueueViewMBean proxy = getProxyToQueueViewMBean(); + LOG.info("waiting for expiry to kick in a bunch of times to verify it does not blow mem"); + Thread.sleep(10000); + assertEquals("Queue size is has not changed " + proxy.getQueueSize(), 90000, + proxy.getQueueSize()); + } + + + private void applyExpiryDuration(int i) { + broker.getDestinationPolicy().getDefaultEntry().setExpireMessagesPeriod(i); + } + private void applyBrokerSpoolingPolicy() { PolicyMap policyMap = new PolicyMap(); PolicyEntry defaultEntry = new PolicyEntry(); diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java index 7ca0b37949..78fda29191 100755 --- a/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java @@ -98,7 +98,8 @@ public class DiscoveryNetworkReconnectTest { context.checking(new Expectations(){{ allowing (managementContext).getJmxDomainName(); will (returnValue("Test")); allowing (managementContext).start(); - allowing (managementContext).stop(); + allowing (managementContext).stop(); + allowing (managementContext).isConnectorStarted(); // expected MBeans allowing (managementContext).registerMBean(with(any(Object.class)), with(equal( diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java index 90144f9186..ffdec58615 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java @@ -152,6 +152,8 @@ public class ExpiredMessagesTest extends CombinationTestSupport { assertTrue("all messages expired - queue size gone to zero " + view.getQueueSize(), Wait.waitFor(new Wait.Condition() { public boolean isSatisified() throws Exception { + LOG.info("Stats: received: " + received.get() + ", size= " + view.getQueueSize() + ", enqueues: " + view.getDequeueCount() + ", dequeues: " + view.getDequeueCount() + + ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount() + ", expiries: " + view.getExpiredCount()); return view.getQueueSize() == 0; } })); @@ -282,7 +284,7 @@ public class ExpiredMessagesTest extends CombinationTestSupport { broker.waitUntilStarted(); return broker; } - + protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception { String domain = "org.apache.activemq"; ObjectName name; diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java index 8f08ef4f5f..032998c260 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java @@ -16,6 +16,19 @@ */ package org.apache.activemq.usecases; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.management.ObjectName; + +import junit.framework.Test; + import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.CombinationTestSupport; import org.apache.activemq.broker.BrokerService; @@ -27,16 +40,6 @@ import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.util.Wait; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import javax.jms.Connection; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.management.ObjectName; -import junit.framework.Test; public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { @@ -140,12 +143,16 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { final DestinationViewMBean view = createView(destination); Wait.waitFor(new Wait.Condition() { public boolean isSatisified() throws Exception { + LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount() + + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount() + + ", size= " + view.getQueueSize()); return sendCount == view.getExpiredCount(); } }); LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount() + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount() + ", size= " + view.getQueueSize()); + assertEquals("All sent have expired", sendCount, view.getExpiredCount()); }