diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index 3d7678c37c6..4a788fc075a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -39,6 +39,7 @@ import java.util.List; import java.util.Map; import java.util.OptionalLong; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; @@ -46,7 +47,9 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -143,6 +146,9 @@ public abstract class AbstractFSWAL implements WAL { public static final String RING_BUFFER_SLOT_COUNT = "hbase.regionserver.wal.disruptor.event.count"; + public static final String WAL_SHUTDOWN_WAIT_TIMEOUT_MS = "hbase.wal.shutdown.wait.timeout.ms"; + public static final int DEFAULT_WAL_SHUTDOWN_WAIT_TIMEOUT_MS = 15 * 1000; + /** * file system instance */ @@ -270,6 +276,9 @@ public abstract class AbstractFSWAL implements WAL { protected volatile boolean closed = false; protected final AtomicBoolean shutdown = new AtomicBoolean(false); + + protected final long walShutdownTimeout; + /** * WAL Comparator; it compares the timestamp (log filenum), present in the log file name. Throws * an IllegalArgumentException if used to compare paths from different wals. @@ -321,8 +330,8 @@ public abstract class AbstractFSWAL implements WAL { protected final AtomicBoolean rollRequested = new AtomicBoolean(false); - private final ExecutorService logArchiveExecutor = Executors.newSingleThreadExecutor( - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Log-Archiver-%d").build()); + private final ExecutorService logArchiveOrShutdownExecutor = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Archive-Or-Shutdown-%d").build()); private final int archiveRetries; @@ -479,7 +488,9 @@ public abstract class AbstractFSWAL implements WAL { this.syncFutureCache = new SyncFutureCache(conf); this.implClassName = getClass().getSimpleName(); this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC); - archiveRetries = this.conf.getInt("hbase.regionserver.logroll.archive.retries", 0); + archiveRetries = this.conf.getInt("hbase.regionserver.walroll.archive.retries", 0); + this.walShutdownTimeout = + conf.getLong(WAL_SHUTDOWN_WAIT_TIMEOUT_MS, DEFAULT_WAL_SHUTDOWN_WAIT_TIMEOUT_MS); } /** @@ -710,7 +721,7 @@ public abstract class AbstractFSWAL implements WAL { final List> localLogsToArchive = logsToArchive; // make it async for (Pair log : localLogsToArchive) { - logArchiveExecutor.execute(() -> { + logArchiveOrShutdownExecutor.execute(() -> { archive(log); }); this.walFile2Props.remove(log.getFirst()); @@ -924,17 +935,42 @@ public abstract class AbstractFSWAL implements WAL { i.logCloseRequested(); } } - rollWriterLock.lock(); + + Future future = logArchiveOrShutdownExecutor.submit(new Callable() { + @Override + public Void call() throws Exception { + if (rollWriterLock.tryLock(walShutdownTimeout, TimeUnit.SECONDS)) { + try { + doShutdown(); + if (syncFutureCache != null) { + syncFutureCache.clear(); + } + } finally { + rollWriterLock.unlock(); + } + } else { + throw new IOException("Waiting for rollWriterLock timeout"); + } + return null; + } + }); + logArchiveOrShutdownExecutor.shutdown(); + try { - doShutdown(); - if (syncFutureCache != null) { - syncFutureCache.clear(); + future.get(walShutdownTimeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw new InterruptedIOException("Interrupted when waiting for shutdown WAL"); + } catch (TimeoutException e) { + throw new TimeoutIOException("We have waited " + walShutdownTimeout + "ms, but" + + " the shutdown of WAL doesn't complete! Please check the status of underlying " + + "filesystem or increase the wait time by the config \"" + WAL_SHUTDOWN_WAIT_TIMEOUT_MS + + "\"", e); + } catch (ExecutionException e) { + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } else { + throw new IOException(e.getCause()); } - if (logArchiveExecutor != null) { - logArchiveExecutor.shutdownNow(); - } - } finally { - rollWriterLock.unlock(); } }