diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 3a8fe1bc34..f30bb986bc 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -172,6 +172,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{ public void setMaxAsyncJobs(int maxAsyncJobs) { this.maxAsyncJobs = maxAsyncJobs; } + + + @Override public void doStart() throws Exception { @@ -200,12 +203,17 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{ @Override public void doStop(ServiceStopper stopper) throws Exception { + //drain down async jobs + LOG.info("Stopping async queue tasks"); + this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); synchronized (this.asyncQueueMap) { for (StoreQueueTask task : this.asyncQueueMap.values()) { task.cancel(); } this.asyncQueueMap.clear(); } + LOG.info("Stopping async topic tasks"); + this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); synchronized (this.asyncTopicMap) { for (StoreTopicTask task : this.asyncTopicMap.values()) { task.cancel(); @@ -224,6 +232,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{ if (this.topicExecutor != null) { this.topicExecutor.shutdownNow(); } + LOG.info("Stopped KahaDB"); super.doStop(stopper); }