Fixing a thread safety issue with memoryUsage when using concurrentStoreAndDispatch
that was causing memory usage to get out of sync.

The InnerFutureTask class inside KahaDB was not thread safe which was
the root cause of the problem.

(cherry picked from commit 0f0bdb21ef)
This commit is contained in:
Christopher L. Shannon (cshannon) 2016-12-02 12:19:53 -05:00
parent 4cdd188ef2
commit 3542657541
1 changed files with 5 additions and 3 deletions

View File

@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.BaseDestination; import org.apache.activemq.broker.region.BaseDestination;
@ -1435,10 +1436,10 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
private class InnerFutureTask extends FutureTask<Object> implements ListenableFuture<Object> { private class InnerFutureTask extends FutureTask<Object> implements ListenableFuture<Object> {
private Runnable listener; private final AtomicReference<Runnable> listenerRef = new AtomicReference<>();
public InnerFutureTask(Runnable runnable) { public InnerFutureTask(Runnable runnable) {
super(runnable, null); super(runnable, null);
} }
public void setException(final Exception e) { public void setException(final Exception e) {
@ -1456,13 +1457,14 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
@Override @Override
public void addListener(Runnable listener) { public void addListener(Runnable listener) {
this.listener = listener; this.listenerRef.set(listener);
if (isDone()) { if (isDone()) {
fireListener(); fireListener();
} }
} }
private void fireListener() { private void fireListener() {
Runnable listener = listenerRef.getAndSet(null);
if (listener != null) { if (listener != null) {
try { try {
listener.run(); listener.run();