diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 902d354f7f6..168cbede97f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -35,6 +35,8 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -62,6 +64,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; /** * The default implementation of FSWAL. @@ -115,6 +118,9 @@ public class FSHLog extends AbstractFSWAL { private static final String MAX_BATCH_COUNT = "hbase.regionserver.wal.sync.batch.count"; private static final int DEFAULT_MAX_BATCH_COUNT = 200; + private static final String FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS = "hbase.wal.fshlog.wait.on.shutdown.seconds"; + private static final int DEFAULT_FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS = 5; + /** * The nexus at which all incoming handlers meet. Does appends and sync with an ordering. Appends * and syncs are each put on the ring which means handlers need to smash up against the ring twice @@ -160,6 +166,10 @@ public class FSHLog extends AbstractFSWAL { private final AtomicInteger closeErrorCount = new AtomicInteger(); + private final int waitOnShutdownInSeconds; + private final ExecutorService closeExecutor = Executors.newCachedThreadPool( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build()); + /** * Exception handler to pass the disruptor ringbuffer. Same as native implementation only it logs * using our logger instead of java native logger. @@ -224,7 +234,8 @@ public class FSHLog extends AbstractFSWAL { CommonFSUtils.getDefaultReplication(fs, this.walDir)); this.lowReplicationRollLimit = conf.getInt(LOW_REPLICATION_ROLL_LIMIT, DEFAULT_LOW_REPLICATION_ROLL_LIMIT); this.closeErrorsTolerated = conf.getInt(ROLL_ERRORS_TOLERATED, DEFAULT_ROLL_ERRORS_TOLERATED); - + this.waitOnShutdownInSeconds = conf.getInt(FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS, + DEFAULT_FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS); // This is the 'writer' -- a single threaded executor. This single thread 'consumes' what is // put on the ring buffer. String hostingThreadName = Thread.currentThread().getName(); @@ -355,23 +366,22 @@ public class FSHLog extends AbstractFSWAL { } long oldFileLen = 0L; // It is at the safe point. Swap out writer from under the blocked writer thread. - // TODO: This is close is inline with critical section. Should happen in background? if (this.writer != null) { oldFileLen = this.writer.getLength(); - try { - TraceUtil.addTimelineAnnotation("closing writer"); - this.writer.close(); - TraceUtil.addTimelineAnnotation("writer closed"); - this.closeErrorCount.set(0); - } catch (IOException ioe) { - int errors = closeErrorCount.incrementAndGet(); - if (!isUnflushedEntries() && (errors <= this.closeErrorsTolerated)) { - LOG.warn("Riding over failed WAL close of " + oldPath + ", cause=\"" + ioe.getMessage() - + "\", errors=" + errors - + "; THIS FILE WAS NOT CLOSED BUT ALL EDITS SYNCED SO SHOULD BE OK"); - } else { - throw ioe; - } + // In case of having unflushed entries or we already reached the + // closeErrorsTolerated count, call the closeWriter inline rather than in async + // way so that in case of an IOE we will throw it back and abort RS. + if (isUnflushedEntries() || closeErrorCount.get() >= this.closeErrorsTolerated) { + closeWriter(this.writer, oldPath, true); + } else { + Writer localWriter = this.writer; + closeExecutor.execute(() -> { + try { + closeWriter(localWriter, oldPath, false); + } catch (IOException e) { + // We will never reach here. + } + }); } } logRollAndSetupWalProps(oldPath, newPath, oldFileLen); @@ -413,6 +423,24 @@ public class FSHLog extends AbstractFSWAL { } } + private void closeWriter(Writer writer, Path path, boolean syncCloseCall) throws IOException { + try { + TraceUtil.addTimelineAnnotation("closing writer"); + writer.close(); + TraceUtil.addTimelineAnnotation("writer closed"); + } catch (IOException ioe) { + int errors = closeErrorCount.incrementAndGet(); + boolean hasUnflushedEntries = isUnflushedEntries(); + if (syncCloseCall && (hasUnflushedEntries || (errors > this.closeErrorsTolerated))) { + LOG.error("Close of WAL " + path + " failed. Cause=\"" + ioe.getMessage() + "\", errors=" + + errors + ", hasUnflushedEntries=" + hasUnflushedEntries); + throw ioe; + } + LOG.warn("Riding over failed WAL close of " + path + + "; THIS FILE WAS NOT CLOSED BUT ALL EDITS SYNCED SO SHOULD BE OK", ioe); + } + } + @Override protected void doShutdown() throws IOException { // Shutdown the disruptor. Will stop after all entries have been processed. Make sure we @@ -437,6 +465,18 @@ public class FSHLog extends AbstractFSWAL { this.writer.close(); this.writer = null; } + closeExecutor.shutdown(); + try { + if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) { + LOG.error("We have waited {} seconds but the close of writer(s) doesn't complete." + + "Please check the status of underlying filesystem" + + " or increase the wait time by the config \"{}\"", this.waitOnShutdownInSeconds, + FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS); + } + } catch (InterruptedException e) { + LOG.error("The wait for termination of FSHLog writer(s) is interrupted"); + Thread.currentThread().interrupt(); + } } @Override