HBASE-21228 Memory leak since AbstractFSWAL caches Thread object and never clean later
This commit is contained in:
parent
04c732e9de
commit
0b6e2907bd
@ -189,12 +189,11 @@ public class FSHLog implements WAL {
|
||||
private final RingBufferEventHandler ringBufferEventHandler;
|
||||
|
||||
/**
|
||||
* Map of {@link SyncFuture}s keyed by Handler objects. Used so we reuse SyncFutures.
|
||||
* TODO: Reus FSWALEntry's rather than create them anew each time as we do SyncFutures here.
|
||||
* TODO: Add a FSWalEntry and SyncFuture as thread locals on handlers rather than have them
|
||||
* get them from this Map?
|
||||
* Map of {@link SyncFuture}s owned by Thread objects. Used so we reuse SyncFutures.
|
||||
* Thread local is used so JVM can GC the terminated thread for us. See HBASE-21228
|
||||
* <p>
|
||||
*/
|
||||
private final Map<Thread, SyncFuture> syncFuturesByHandler;
|
||||
private final ThreadLocal<SyncFuture> cachedSyncFutures;
|
||||
|
||||
/**
|
||||
* The highest known outstanding unsync'd WALEdit sequence number where sequence number is the
|
||||
@ -573,8 +572,12 @@ public class FSHLog implements WAL {
|
||||
maxHandlersCount);
|
||||
this.disruptor.handleExceptionsWith(new RingBufferExceptionHandler());
|
||||
this.disruptor.handleEventsWith(new RingBufferEventHandler [] {this.ringBufferEventHandler});
|
||||
// Presize our map of SyncFutures by handler objects.
|
||||
this.syncFuturesByHandler = new ConcurrentHashMap<Thread, SyncFuture>(maxHandlersCount);
|
||||
this.cachedSyncFutures = new ThreadLocal<SyncFuture>() {
|
||||
@Override
|
||||
protected SyncFuture initialValue() {
|
||||
return new SyncFuture();
|
||||
}
|
||||
};
|
||||
// Starting up threads in constructor is a no no; Interface should have an init call.
|
||||
this.disruptor.start();
|
||||
}
|
||||
@ -1426,7 +1429,7 @@ public class FSHLog implements WAL {
|
||||
// SyncFuture reuse by thread, if TimeoutIOException happens, ringbuffer
|
||||
// still refer to it, so if this thread use it next time may get a wrong
|
||||
// result.
|
||||
this.syncFuturesByHandler.remove(Thread.currentThread());
|
||||
this.cachedSyncFutures.remove();
|
||||
throw tioe;
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.warn("Interrupted", ie);
|
||||
@ -1444,12 +1447,7 @@ public class FSHLog implements WAL {
|
||||
}
|
||||
|
||||
private SyncFuture getSyncFuture(final long sequence, Span span) {
|
||||
SyncFuture syncFuture = this.syncFuturesByHandler.get(Thread.currentThread());
|
||||
if (syncFuture == null) {
|
||||
syncFuture = new SyncFuture();
|
||||
this.syncFuturesByHandler.put(Thread.currentThread(), syncFuture);
|
||||
}
|
||||
return syncFuture.reset(sequence, span);
|
||||
return cachedSyncFutures.get().reset(sequence);
|
||||
}
|
||||
|
||||
private void postSync(final long timeInNanos, final int handlerSyncs) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user