diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index de8a6af6a80..5416e3a2d66 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -48,6 +48,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -345,8 +347,12 @@ public abstract class AbstractFSWAL implements WAL { protected final AtomicBoolean rollRequested = new AtomicBoolean(false); - private final ExecutorService logArchiveOrShutdownExecutor = Executors.newSingleThreadExecutor( - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Archive-Or-Shutdown-%d").build()); + // Run in caller if we get reject execution exception, to avoid aborting region server when we get + // reject execution exception. Usually this should not happen but let's make it more robust. + private final ExecutorService logArchiveExecutor = + new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue(), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Archive-%d").build(), + new ThreadPoolExecutor.CallerRunsPolicy()); private final int archiveRetries; @@ -770,7 +776,7 @@ public abstract class AbstractFSWAL implements WAL { final List> localLogsToArchive = logsToArchive; // make it async for (Pair log : localLogsToArchive) { - logArchiveOrShutdownExecutor.execute(() -> { + logArchiveExecutor.execute(() -> { archive(log); }); this.walFile2Props.remove(log.getFirst()); @@ -985,7 +991,10 @@ public abstract class AbstractFSWAL implements WAL { } } - Future future = logArchiveOrShutdownExecutor.submit(new Callable() { + ExecutorService shutdownExecutor = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Shutdown-%d").build()); + + Future future = shutdownExecutor.submit(new Callable() { @Override public Void call() throws Exception { if (rollWriterLock.tryLock(walShutdownTimeout, TimeUnit.SECONDS)) { @@ -1003,7 +1012,7 @@ public abstract class AbstractFSWAL implements WAL { return null; } }); - logArchiveOrShutdownExecutor.shutdown(); + shutdownExecutor.shutdown(); try { future.get(walShutdownTimeout, TimeUnit.MILLISECONDS); @@ -1020,6 +1029,12 @@ public abstract class AbstractFSWAL implements WAL { } else { throw new IOException(e.getCause()); } + } finally { + // in shutdown we may call cleanOldLogs so shutdown this executor in the end. + // In sync replication implementation, we may shutdown a WAL without shutting down the whole + // region server, if we shutdown this executor earlier we may get reject execution exception + // and abort the region server + logArchiveExecutor.shutdown(); } }