resolve AMQ-2005, Scheduler is now referenced by its users such that it cannot be gc'ed during normal operation

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@718931 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2008-11-19 11:08:54 +00:00
parent 2ffb1a6782
commit 2b2b35e757
8 changed files with 39 additions and 25 deletions

View File

@ -93,7 +93,7 @@ import org.apache.commons.logging.LogFactory;
public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsCapable, ActiveMQDispatcher { public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsCapable, ActiveMQDispatcher {
private static final Log LOG = LogFactory.getLog(ActiveMQMessageConsumer.class); private static final Log LOG = LogFactory.getLog(ActiveMQMessageConsumer.class);
protected static final Scheduler scheduler = Scheduler.getInstance();
protected final ActiveMQSession session; protected final ActiveMQSession session;
protected final ConsumerInfo info; protected final ConsumerInfo info;
@ -969,7 +969,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
if (redeliveryDelay > 0) { if (redeliveryDelay > 0) {
// Start up the delivery again a little later. // Start up the delivery again a little later.
Scheduler.executeAfterDelay(new Runnable() { scheduler.executeAfterDelay(new Runnable() {
public void run() { public void run() {
try { try {
if (started.get()) { if (started.get()) {

View File

@ -148,6 +148,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
} }
private static final Log LOG = LogFactory.getLog(ActiveMQSession.class); private static final Log LOG = LogFactory.getLog(ActiveMQSession.class);
protected static final Scheduler scheduler = Scheduler.getInstance();
protected int acknowledgementMode; protected int acknowledgementMode;
protected final ActiveMQConnection connection; protected final ActiveMQConnection connection;
@ -779,7 +780,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
for (int i = 0; i < redeliveryCounter; i++) { for (int i = 0; i < redeliveryCounter; i++) {
redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay); redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
} }
Scheduler.executeAfterDelay(new Runnable() { scheduler.executeAfterDelay(new Runnable() {
public void run() { public void run() {
((ActiveMQDispatcher)md.getConsumer()).dispatch(md); ((ActiveMQDispatcher)md.getConsumer()).dispatch(md);

View File

@ -53,6 +53,8 @@ import org.apache.commons.logging.LogFactory;
public abstract class PrefetchSubscription extends AbstractSubscription { public abstract class PrefetchSubscription extends AbstractSubscription {
private static final Log LOG = LogFactory.getLog(PrefetchSubscription.class); private static final Log LOG = LogFactory.getLog(PrefetchSubscription.class);
protected static final Scheduler scheduler = Scheduler.getInstance();
protected PendingMessageCursor pending; protected PendingMessageCursor pending;
protected final List<MessageReference> dispatched = new CopyOnWriteArrayList<MessageReference>(); protected final List<MessageReference> dispatched = new CopyOnWriteArrayList<MessageReference>();
protected int prefetchExtension; protected int prefetchExtension;
@ -109,7 +111,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
dispatchPending(); dispatchPending();
} }
if (pull.getTimeout() > 0) { if (pull.getTimeout() > 0) {
Scheduler.executeAfterDelay(new Runnable() { scheduler.executeAfterDelay(new Runnable() {
public void run() { public void run() {
pullTimeout(dispatchCounterBeforePull); pullTimeout(dispatchCounterBeforePull);

View File

@ -42,6 +42,7 @@ import org.apache.activemq.thread.Scheduler;
public class TimedSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy { public class TimedSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy {
private static final int GC_INTERVAL = 1000; private static final int GC_INTERVAL = 1000;
protected static final Scheduler scheduler = Scheduler.getInstance();
// TODO: need to get a better synchronized linked list that has little // TODO: need to get a better synchronized linked list that has little
// contention between enqueuing and dequeuing // contention between enqueuing and dequeuing
@ -90,11 +91,11 @@ public class TimedSubscriptionRecoveryPolicy implements SubscriptionRecoveryPoli
} }
public void start() throws Exception { public void start() throws Exception {
Scheduler.executePeriodically(gcTask, GC_INTERVAL); scheduler.executePeriodically(gcTask, GC_INTERVAL);
} }
public void stop() throws Exception { public void stop() throws Exception {
Scheduler.cancel(gcTask); scheduler.cancel(gcTask);
} }
public void gc() { public void gc() {

View File

@ -75,6 +75,7 @@ public class AsyncDataManager {
public static final int PREFERED_DIFF = 1024 * 512; public static final int PREFERED_DIFF = 1024 * 512;
private static final Log LOG = LogFactory.getLog(AsyncDataManager.class); private static final Log LOG = LogFactory.getLog(AsyncDataManager.class);
protected static Scheduler scheduler = Scheduler.getInstance();
protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>(); protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
@ -191,7 +192,7 @@ public class AsyncDataManager {
cleanup(); cleanup();
} }
}; };
Scheduler.executePeriodically(cleanupTask, DEFAULT_CLEANUP_INTERVAL); scheduler.executePeriodically(cleanupTask, DEFAULT_CLEANUP_INTERVAL);
} }
public void lock() throws IOException { public void lock() throws IOException {
@ -326,7 +327,7 @@ public class AsyncDataManager {
if (!started) { if (!started) {
return; return;
} }
Scheduler.cancel(cleanupTask); scheduler.cancel(cleanupTask);
accessorPool.close(); accessorPool.close();
storeState(false); storeState(false);
appender.close(); appender.close();

View File

@ -84,6 +84,7 @@ import org.apache.commons.logging.LogFactory;
public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, BrokerServiceAware { public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, BrokerServiceAware {
private static final Log LOG = LogFactory.getLog(AMQPersistenceAdapter.class); private static final Log LOG = LogFactory.getLog(AMQPersistenceAdapter.class);
private static final Scheduler scheduler = Scheduler.getInstance();
private final ConcurrentHashMap<ActiveMQQueue, AMQMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, AMQMessageStore>(); private final ConcurrentHashMap<ActiveMQQueue, AMQMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, AMQMessageStore>();
private final ConcurrentHashMap<ActiveMQTopic, AMQTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, AMQTopicMessageStore>(); private final ConcurrentHashMap<ActiveMQTopic, AMQTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, AMQTopicMessageStore>();
private static final String PROPERTY_PREFIX = "org.apache.activemq.store.amq"; private static final String PROPERTY_PREFIX = "org.apache.activemq.store.amq";
@ -271,14 +272,14 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
checkpoint(false); checkpoint(false);
} }
}; };
Scheduler.executePeriodically(periodicCheckpointTask, getCheckpointInterval()); scheduler.executePeriodically(periodicCheckpointTask, getCheckpointInterval());
periodicCleanupTask = new Runnable() { periodicCleanupTask = new Runnable() {
public void run() { public void run() {
cleanup(); cleanup();
} }
}; };
Scheduler.executePeriodically(periodicCleanupTask, getCleanupInterval()); scheduler.executePeriodically(periodicCleanupTask, getCleanupInterval());
if (lockAquired && lockLogged) { if (lockAquired && lockLogged) {
LOG.info("Aquired lock for AMQ Store" + getDirectory()); LOG.info("Aquired lock for AMQ Store" + getDirectory());
@ -301,8 +302,8 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
} }
this.usageManager.getMemoryUsage().removeUsageListener(this); this.usageManager.getMemoryUsage().removeUsageListener(this);
synchronized (this) { synchronized (this) {
Scheduler.cancel(periodicCheckpointTask); scheduler.cancel(periodicCheckpointTask);
Scheduler.cancel(periodicCleanupTask); scheduler.cancel(periodicCleanupTask);
} }
Iterator<AMQMessageStore> queueIterator = queues.values().iterator(); Iterator<AMQMessageStore> queueIterator = queues.values().iterator();
while (queueIterator.hasNext()) { while (queueIterator.hasNext()) {

View File

@ -83,6 +83,7 @@ import org.apache.commons.logging.LogFactory;
*/ */
public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener, BrokerServiceAware { public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener, BrokerServiceAware {
protected static final Scheduler scheduler = Scheduler.getInstance();
private static final Log LOG = LogFactory.getLog(JournalPersistenceAdapter.class); private static final Log LOG = LogFactory.getLog(JournalPersistenceAdapter.class);
private final Journal journal; private final Journal journal;
@ -230,7 +231,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
recover(); recover();
// Do a checkpoint periodically. // Do a checkpoint periodically.
Scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval / 10); scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval / 10);
} }
@ -241,7 +242,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
return; return;
} }
Scheduler.cancel(periodicCheckpointTask); scheduler.cancel(periodicCheckpointTask);
// Take one final checkpoint and stop checkpoint processing. // Take one final checkpoint and stop checkpoint processing.
checkpoint(true, true); checkpoint(true, true);

View File

@ -21,25 +21,33 @@ import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
/** /**
* Singelton, references maintained by users
* @version $Revision$ * @version $Revision$
*/ */
public final class Scheduler { public final class Scheduler {
private final Timer CLOCK_DAEMON = new Timer("ActiveMQ Scheduler", true);
private final HashMap<Runnable, TimerTask> TIMER_TASKS = new HashMap<Runnable, TimerTask>();
private static Scheduler instance;
static {
public static final Timer CLOCK_DAEMON = new Timer("ActiveMQ Scheduler", true); instance = new Scheduler();
private static final HashMap<Runnable, TimerTask> TIMER_TASKS = new HashMap<Runnable, TimerTask>(); }
private Scheduler() { private Scheduler() {
} }
public static synchronized void executePeriodically(final Runnable task, long period) { public static Scheduler getInstance() {
return instance;
}
public synchronized void executePeriodically(final Runnable task, long period) {
TimerTask timerTask = new SchedulerTimerTask(task); TimerTask timerTask = new SchedulerTimerTask(task);
CLOCK_DAEMON.scheduleAtFixedRate(timerTask, period, period); CLOCK_DAEMON.scheduleAtFixedRate(timerTask, period, period);
TIMER_TASKS.put(task, timerTask); TIMER_TASKS.put(task, timerTask);
} }
public static synchronized void cancel(Runnable task) { public synchronized void cancel(Runnable task) {
TimerTask ticket = TIMER_TASKS.remove(task); TimerTask ticket = TIMER_TASKS.remove(task);
if (ticket != null) { if (ticket != null) {
ticket.cancel(); ticket.cancel();
@ -47,13 +55,12 @@ public final class Scheduler {
} }
} }
public static void executeAfterDelay(final Runnable task, long redeliveryDelay) { public void executeAfterDelay(final Runnable task, long redeliveryDelay) {
TimerTask timerTask = new SchedulerTimerTask(task); TimerTask timerTask = new SchedulerTimerTask(task);
CLOCK_DAEMON.schedule(timerTask, redeliveryDelay); CLOCK_DAEMON.schedule(timerTask, redeliveryDelay);
} }
public static void shutdown() { public void shutdown() {
CLOCK_DAEMON.cancel(); CLOCK_DAEMON.cancel();
} }
} }