mirror of https://github.com/apache/activemq.git
Fixing a thread safety issue with memoryUsage when using concurrentStoreAndDispatch that was causing memory usage to get out of sync. The InnerFutureTask class inside KahaDB was not thread safe which was the root cause of the problem.
This commit is contained in:
parent
6a0c65828a
commit
0f0bdb21ef
|
@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import org.apache.activemq.broker.ConnectionContext;
|
import org.apache.activemq.broker.ConnectionContext;
|
||||||
import org.apache.activemq.broker.region.BaseDestination;
|
import org.apache.activemq.broker.region.BaseDestination;
|
||||||
|
@ -1435,10 +1436,10 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
|
||||||
|
|
||||||
private class InnerFutureTask extends FutureTask<Object> implements ListenableFuture<Object> {
|
private class InnerFutureTask extends FutureTask<Object> implements ListenableFuture<Object> {
|
||||||
|
|
||||||
private Runnable listener;
|
private final AtomicReference<Runnable> listenerRef = new AtomicReference<>();
|
||||||
|
|
||||||
public InnerFutureTask(Runnable runnable) {
|
public InnerFutureTask(Runnable runnable) {
|
||||||
super(runnable, null);
|
super(runnable, null);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setException(final Exception e) {
|
public void setException(final Exception e) {
|
||||||
|
@ -1456,13 +1457,14 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void addListener(Runnable listener) {
|
public void addListener(Runnable listener) {
|
||||||
this.listener = listener;
|
this.listenerRef.set(listener);
|
||||||
if (isDone()) {
|
if (isDone()) {
|
||||||
fireListener();
|
fireListener();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void fireListener() {
|
private void fireListener() {
|
||||||
|
Runnable listener = listenerRef.getAndSet(null);
|
||||||
if (listener != null) {
|
if (listener != null) {
|
||||||
try {
|
try {
|
||||||
listener.run();
|
listener.run();
|
||||||
|
|
Loading…
Reference in New Issue