diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index f8355e0e870..a9c440dca07 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -148,6 +148,10 @@ public class AsyncFSWAL extends AbstractFSWAL { "hbase.wal.async.use-shared-event-loop"; public static final boolean DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP = false; + public static final String ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS = + "hbase.wal.async.wait.on.shutdown.seconds"; + public static final int DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS = 5; + private final EventLoopGroup eventLoopGroup; private final ExecutorService consumeExecutor; @@ -207,6 +211,8 @@ public class AsyncFSWAL extends AbstractFSWAL { private long highestProcessedAppendTxidAtLastSync; + private final int waitOnShutdownInSeconds; + public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir, Configuration conf, List listeners, boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup, @@ -254,6 +260,8 @@ public class AsyncFSWAL extends AbstractFSWAL { batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE); createMaxRetries = conf.getInt(ASYNC_WAL_CREATE_MAX_RETRIES, DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES); + waitOnShutdownInSeconds = conf.getInt(ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS, + DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS); rollWriter(); } @@ -699,30 +707,26 @@ public class AsyncFSWAL extends AbstractFSWAL { } finally { consumeLock.unlock(); } - long oldFileLen; - if (oldWriter != null) { - oldFileLen = oldWriter.getLength(); - closeExecutor.execute(() -> { - try { - oldWriter.close(); - } catch (IOException e) { - LOG.warn("close old writer failed", e); - } - }); - } else { - oldFileLen = 0L; - } - return oldFileLen; + return executeClose(closeExecutor, oldWriter); } @Override protected void doShutdown() throws IOException { waitForSafePoint(); - if (this.writer != null) { - this.writer.close(); - this.writer = null; - } + executeClose(closeExecutor, writer); closeExecutor.shutdown(); + try { + if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) { + LOG.error("We have waited " + waitOnShutdownInSeconds + " seconds but" + + " the close of async writer doesn't complete." + + "Please check the status of underlying filesystem" + + " or increase the wait time by the config \"" + + ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS + "\""); + } + } catch (InterruptedException e) { + LOG.error("The wait for close of async writer is interrupted"); + Thread.currentThread().interrupt(); + } IOException error = new IOException("WAL has been closed"); syncFutures.forEach(f -> f.done(f.getTxid(), error)); if (!(consumeExecutor instanceof EventLoop)) { @@ -730,6 +734,23 @@ public class AsyncFSWAL extends AbstractFSWAL { } } + private static long executeClose(ExecutorService closeExecutor, AsyncWriter writer) { + long fileLength; + if (writer != null) { + fileLength = writer.getLength(); + closeExecutor.execute(() -> { + try { + writer.close(); + } catch (IOException e) { + LOG.warn("close old writer failed", e); + } + }); + } else { + fileLength = 0L; + } + return fileLength; + } + @Override protected void doAppend(AsyncWriter writer, FSWALEntry entry) { writer.append(entry);