HBASE-24695 FSHLog - close the current WAL file in a background thread. (#2183)

Signed-off-by: Ramkrishna <ramkrishna@apache.org>
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
Anoop Sam John 2020-08-01 22:46:32 +05:30 committed by GitHub
parent 271a3cbf13
commit 86fccba0d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 56 additions and 16 deletions

View File

@ -35,6 +35,8 @@ import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -62,6 +64,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* The default implementation of FSWAL.
@ -115,6 +118,9 @@ public class FSHLog extends AbstractFSWAL<Writer> {
private static final String MAX_BATCH_COUNT = "hbase.regionserver.wal.sync.batch.count";
private static final int DEFAULT_MAX_BATCH_COUNT = 200;
private static final String FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS = "hbase.wal.fshlog.wait.on.shutdown.seconds";
private static final int DEFAULT_FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS = 5;
/**
* The nexus at which all incoming handlers meet. Does appends and sync with an ordering. Appends
* and syncs are each put on the ring which means handlers need to smash up against the ring twice
@ -160,6 +166,10 @@ public class FSHLog extends AbstractFSWAL<Writer> {
private final AtomicInteger closeErrorCount = new AtomicInteger();
private final int waitOnShutdownInSeconds;
private final ExecutorService closeExecutor = Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
/**
* Exception handler to pass the disruptor ringbuffer. Same as native implementation only it logs
* using our logger instead of java native logger.
@ -224,7 +234,8 @@ public class FSHLog extends AbstractFSWAL<Writer> {
CommonFSUtils.getDefaultReplication(fs, this.walDir));
this.lowReplicationRollLimit = conf.getInt(LOW_REPLICATION_ROLL_LIMIT, DEFAULT_LOW_REPLICATION_ROLL_LIMIT);
this.closeErrorsTolerated = conf.getInt(ROLL_ERRORS_TOLERATED, DEFAULT_ROLL_ERRORS_TOLERATED);
this.waitOnShutdownInSeconds = conf.getInt(FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS,
DEFAULT_FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS);
// This is the 'writer' -- a single threaded executor. This single thread 'consumes' what is
// put on the ring buffer.
String hostingThreadName = Thread.currentThread().getName();
@ -355,23 +366,22 @@ public class FSHLog extends AbstractFSWAL<Writer> {
}
long oldFileLen = 0L;
// It is at the safe point. Swap out writer from under the blocked writer thread.
// TODO: This is close is inline with critical section. Should happen in background?
if (this.writer != null) {
oldFileLen = this.writer.getLength();
try {
TraceUtil.addTimelineAnnotation("closing writer");
this.writer.close();
TraceUtil.addTimelineAnnotation("writer closed");
this.closeErrorCount.set(0);
} catch (IOException ioe) {
int errors = closeErrorCount.incrementAndGet();
if (!isUnflushedEntries() && (errors <= this.closeErrorsTolerated)) {
LOG.warn("Riding over failed WAL close of " + oldPath + ", cause=\"" + ioe.getMessage()
+ "\", errors=" + errors
+ "; THIS FILE WAS NOT CLOSED BUT ALL EDITS SYNCED SO SHOULD BE OK");
} else {
throw ioe;
}
// In case of having unflushed entries or we already reached the
// closeErrorsTolerated count, call the closeWriter inline rather than in async
// way so that in case of an IOE we will throw it back and abort RS.
if (isUnflushedEntries() || closeErrorCount.get() >= this.closeErrorsTolerated) {
closeWriter(this.writer, oldPath, true);
} else {
Writer localWriter = this.writer;
closeExecutor.execute(() -> {
try {
closeWriter(localWriter, oldPath, false);
} catch (IOException e) {
// We will never reach here.
}
});
}
}
logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
@ -413,6 +423,24 @@ public class FSHLog extends AbstractFSWAL<Writer> {
}
}
private void closeWriter(Writer writer, Path path, boolean syncCloseCall) throws IOException {
try {
TraceUtil.addTimelineAnnotation("closing writer");
writer.close();
TraceUtil.addTimelineAnnotation("writer closed");
} catch (IOException ioe) {
int errors = closeErrorCount.incrementAndGet();
boolean hasUnflushedEntries = isUnflushedEntries();
if (syncCloseCall && (hasUnflushedEntries || (errors > this.closeErrorsTolerated))) {
LOG.error("Close of WAL " + path + " failed. Cause=\"" + ioe.getMessage() + "\", errors="
+ errors + ", hasUnflushedEntries=" + hasUnflushedEntries);
throw ioe;
}
LOG.warn("Riding over failed WAL close of " + path
+ "; THIS FILE WAS NOT CLOSED BUT ALL EDITS SYNCED SO SHOULD BE OK", ioe);
}
}
@Override
protected void doShutdown() throws IOException {
// Shutdown the disruptor. Will stop after all entries have been processed. Make sure we
@ -437,6 +465,18 @@ public class FSHLog extends AbstractFSWAL<Writer> {
this.writer.close();
this.writer = null;
}
closeExecutor.shutdown();
try {
if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) {
LOG.error("We have waited {} seconds but the close of writer(s) doesn't complete."
+ "Please check the status of underlying filesystem"
+ " or increase the wait time by the config \"{}\"", this.waitOnShutdownInSeconds,
FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS);
}
} catch (InterruptedException e) {
LOG.error("The wait for termination of FSHLog writer(s) is interrupted");
Thread.currentThread().interrupt();
}
}
@Override