From e70308804fc21a0b2bafb5667ee23c860e0e93b8 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Wed, 16 Jun 2010 09:38:46 +0000 Subject: [PATCH] clear down async tasks on stop git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@955168 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/activemq/store/kahadb/KahaDBStore.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 3a8fe1bc34..f30bb986bc 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -172,6 +172,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{ public void setMaxAsyncJobs(int maxAsyncJobs) { this.maxAsyncJobs = maxAsyncJobs; } + + + @Override public void doStart() throws Exception { @@ -200,12 +203,17 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{ @Override public void doStop(ServiceStopper stopper) throws Exception { + //drain down async jobs + LOG.info("Stopping async queue tasks"); + this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); synchronized (this.asyncQueueMap) { for (StoreQueueTask task : this.asyncQueueMap.values()) { task.cancel(); } this.asyncQueueMap.clear(); } + LOG.info("Stopping async topic tasks"); + this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); synchronized (this.asyncTopicMap) { for (StoreTopicTask task : this.asyncTopicMap.values()) { task.cancel(); @@ -224,6 +232,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{ if (this.topicExecutor != null) { this.topicExecutor.shutdownNow(); } + LOG.info("Stopped KahaDB"); super.doStop(stopper); }