diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index b4f6c6ffec..758d0de26c 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.BaseDestination; @@ -1435,10 +1436,10 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, private class InnerFutureTask extends FutureTask implements ListenableFuture { - private Runnable listener; + private final AtomicReference listenerRef = new AtomicReference<>(); + public InnerFutureTask(Runnable runnable) { super(runnable, null); - } public void setException(final Exception e) { @@ -1456,13 +1457,14 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, @Override public void addListener(Runnable listener) { - this.listener = listener; + this.listenerRef.set(listener); if (isDone()) { fireListener(); } } private void fireListener() { + Runnable listener = listenerRef.getAndSet(null); if (listener != null) { try { listener.run();