merge -c 834543 - resolve https://issues.apache.org/activemq/browse/AMQ-2481 - no need to force a page in but sync between expiry from browse and from pageIn needed some tweaks, expired messages need to be removed from the cursor in the event of expiry from browse. Also resolve unit test failures from https://issues.apache.org/activemq/browse/AMQ-2481

git-svn-id: https://svn.apache.org/repos/asf/activemq/branches/activemq-5.3@834557 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2009-11-10 17:26:38 +00:00
parent a1dca0e242
commit e150757421
8 changed files with 181 additions and 54 deletions

View File

@ -25,7 +25,6 @@ import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageDispatchNotification;

View File

@ -16,6 +16,28 @@
*/
package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
@ -54,26 +76,6 @@ import org.apache.activemq.usage.UsageListener;
import org.apache.activemq.util.BrokerSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
/**
@ -198,10 +200,12 @@ public class Queue extends BaseDestination implements Task, UsageListener {
public boolean recoverMessage(Message message) {
// Message could have expired while it was being
// loaded..
if (broker.isExpired(message)) {
messageExpired(createConnectionContext(), createMessageReference(message));
// drop message will decrement so counter balance here
destinationStatistics.getMessages().increment();
if (message.isExpired()) {
if (broker.isExpired(message)) {
messageExpired(createConnectionContext(), createMessageReference(message));
// drop message will decrement so counter balance here
destinationStatistics.getMessages().increment();
}
return true;
}
if (hasSpace()) {
@ -439,6 +443,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
// While waiting for space to free up... the
// message may have expired.
if (message.isExpired()) {
LOG.error("expired waiting for space..");
broker.messageExpired(context, message);
destinationStatistics.getExpired().increment();
} else {
@ -585,7 +590,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
return null;
}
};
doBrowse(true, browsedMessages, this.getMaxExpirePageSize());
doBrowse(browsedMessages, this.getMaxExpirePageSize());
}
public void gc(){
@ -749,14 +754,15 @@ public class Queue extends BaseDestination implements Task, UsageListener {
public Message[] browse() {
List<Message> l = new ArrayList<Message>();
doBrowse(false, l, getMaxBrowsePageSize());
doBrowse(l, getMaxBrowsePageSize());
return l.toArray(new Message[l.size()]);
}
public void doBrowse(boolean forcePageIn, List<Message> l, int max) {
public void doBrowse(List<Message> l, int max) {
final ConnectionContext connectionContext = createConnectionContext();
try {
pageInMessages(forcePageIn);
pageInMessages(false);
List<MessageReference> toExpire = new ArrayList<MessageReference>();
synchronized(dispatchMutex) {
synchronized (pagedInPendingDispatch) {
@ -770,7 +776,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
toExpire.clear();
synchronized (pagedInMessages) {
addAll(pagedInMessages.values(), l, max, toExpire);
addAll(pagedInMessages.values(), l, max, toExpire);
}
for (MessageReference ref : toExpire) {
if (broker.isExpired(ref)) {
@ -787,13 +793,16 @@ public class Queue extends BaseDestination implements Task, UsageListener {
try {
messages.reset();
while (messages.hasNext() && l.size() < max) {
MessageReference node = messages.next();
messages.rollback(node.getMessageId());
if (node != null) {
MessageReference node = messages.next();
if (node.isExpired()) {
if (broker.isExpired(node)) {
messageExpired(connectionContext,
createMessageReference(node.getMessage()));
} else if (l.contains(node.getMessage()) == false) {
}
messages.remove();
} else {
messages.rollback(node.getMessageId());
if (l.contains(node.getMessage()) == false) {
l.add(node.getMessage());
}
}
@ -806,7 +815,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
} catch (Exception e) {
LOG.error("Problem retrieving message for browse", e);
}
}
}
private void addAll(Collection<QueueMessageReference> refs,
@ -1278,7 +1287,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
public void messageExpired(ConnectionContext context,Subscription subs, MessageReference reference) {
if (LOG.isDebugEnabled()) {
if (LOG.isDebugEnabled()) {
LOG.debug("message expired: " + reference);
}
broker.messageExpired(context, reference);
@ -1371,12 +1380,14 @@ public class Queue extends BaseDestination implements Task, UsageListener {
node.incrementReferenceCount();
messages.remove();
QueueMessageReference ref = createMessageReference(node.getMessage());
if (!broker.isExpired(node)) {
if (ref.isExpired()) {
if (broker.isExpired(ref)) {
messageExpired(createConnectionContext(), ref);
}
} else {
result.add(ref);
count++;
} else {
messageExpired(createConnectionContext(), ref);
}
}
}
} finally {
messages.release();

View File

@ -0,0 +1,79 @@
package org.apache.activemq.util;
import java.util.HashMap;
import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Debugging tool to track entry points through code, useful to see runtime call paths
* To use, add to a method as follows:<code>
* public void someMethod() {
* ThreadTracker.track("someMethod");
* ...
* }</code>
* and at some stage call <code>result</code> to get a LOG
* output of the callers with an associated call count
*
*/
public class ThreadTracker {
static final Log LOG = LogFactory.getLog(ThreadTracker.class);
static HashMap<String, Tracker> trackers = new HashMap<String, Tracker>();
/**
* track the stack trace of callers
* @param name the method being tracked
*/
public static void track(String name) {
Tracker t;
synchronized(trackers) {
t = trackers.get(name);
if (t == null) {
t = new Tracker();
trackers.put(name, t);
}
}
t.track();
}
/**
* output the result of stack trace capture to the log
*/
public static void result() {
for (Entry<String, Tracker> t: trackers.entrySet()) {
LOG.info("Tracker: " + t.getKey() + ", " + t.getValue().size() + " entry points...");
for (Trace trace : t.getValue().values()) {
LOG.info("count: " + trace.count, trace);
}
LOG.info("Tracker: " + t.getKey() + ", done.");
}
}
}
@SuppressWarnings("serial")
class Trace extends Throwable {
public int count;
public final int size;
Trace() {
super();
size = this.getStackTrace().length;
}
}
@SuppressWarnings("serial")
class Tracker extends HashMap<Integer, Trace> {
public void track() {
Trace current = new Trace();
synchronized(this) {
Trace exist = get(current.size);
if (exist != null) {
exist.count++;
} else {
put(current.size, current);
}
}
}
}

View File

@ -16,11 +16,14 @@
*/
package org.apache.activemq.broker.ft;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.store.amq.AMQPersistenceAdapter;
public class QueueMasterSlaveSingleUrlTest extends QueueMasterSlaveTest {
@ -33,17 +36,25 @@ public class QueueMasterSlaveSingleUrlTest extends QueueMasterSlaveTest {
protected void createMaster() throws Exception {
master = new BrokerService();
master.setBrokerName("shared");
master.setBrokerName("shared-master");
configureSharedPersistenceAdapter(master);
master.addConnector(brokerUrl);
master.start();
}
private void configureSharedPersistenceAdapter(BrokerService broker) throws Exception {
AMQPersistenceAdapter adapter = new AMQPersistenceAdapter();
adapter.setDirectory(new File("shared"));
broker.setPersistenceAdapter(adapter);
}
protected void createSlave() throws Exception {
new Thread(new Runnable() {
public void run() {
try {
BrokerService broker = new BrokerService();
broker.setBrokerName("shared");
broker.setBrokerName("shared-slave");
configureSharedPersistenceAdapter(broker);
// add transport as a service so that it is bound on start, after store started
final TransportConnector tConnector = new TransportConnector();
tConnector.setUri(new URI(brokerUrl));

View File

@ -98,6 +98,23 @@ public class QueuePurgeTest extends TestCase {
proxy.getQueueSize());
}
public void testRepeatedExpiryProcessingOfLargeQueue() throws Exception {
applyBrokerSpoolingPolicy();
final int exprityPeriod = 1000;
applyExpiryDuration(exprityPeriod);
createProducerAndSendMessages(90000);
QueueViewMBean proxy = getProxyToQueueViewMBean();
LOG.info("waiting for expiry to kick in a bunch of times to verify it does not blow mem");
Thread.sleep(10000);
assertEquals("Queue size is has not changed " + proxy.getQueueSize(), 90000,
proxy.getQueueSize());
}
private void applyExpiryDuration(int i) {
broker.getDestinationPolicy().getDefaultEntry().setExpireMessagesPeriod(i);
}
private void applyBrokerSpoolingPolicy() {
PolicyMap policyMap = new PolicyMap();
PolicyEntry defaultEntry = new PolicyEntry();

View File

@ -98,7 +98,8 @@ public class DiscoveryNetworkReconnectTest {
context.checking(new Expectations(){{
allowing (managementContext).getJmxDomainName(); will (returnValue("Test"));
allowing (managementContext).start();
allowing (managementContext).stop();
allowing (managementContext).stop();
allowing (managementContext).isConnectorStarted();
// expected MBeans
allowing (managementContext).registerMBean(with(any(Object.class)), with(equal(

View File

@ -152,6 +152,8 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
assertTrue("all messages expired - queue size gone to zero " + view.getQueueSize(), Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
LOG.info("Stats: received: " + received.get() + ", size= " + view.getQueueSize() + ", enqueues: " + view.getDequeueCount() + ", dequeues: " + view.getDequeueCount()
+ ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount() + ", expiries: " + view.getExpiredCount());
return view.getQueueSize() == 0;
}
}));
@ -282,7 +284,7 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
broker.waitUntilStarted();
return broker;
}
protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
String domain = "org.apache.activemq";
ObjectName name;

View File

@ -16,6 +16,19 @@
*/
package org.apache.activemq.usecases;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.management.ObjectName;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService;
@ -27,16 +40,6 @@ import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.util.Wait;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.management.ObjectName;
import junit.framework.Test;
public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
@ -140,12 +143,16 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
final DestinationViewMBean view = createView(destination);
Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount()
+ ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount()
+ ", size= " + view.getQueueSize());
return sendCount == view.getExpiredCount();
}
});
LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount()
+ ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount()
+ ", size= " + view.getQueueSize());
assertEquals("All sent have expired", sendCount, view.getExpiredCount());
}