From 00fc40ab9b252f8d6af2858d2c988a36708539db Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Wed, 23 Mar 2022 14:53:58 +0800 Subject: [PATCH] HBASE-26866 Shutdown WAL may abort region server (#4254) Signed-off-by: Xiaolin Ha (cherry picked from commit b67c16a7636958970d37bfcd775fd55e8de98177) --- .../hbase/regionserver/wal/AbstractFSWAL.java | 25 +++++++++++++++---- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index 983a5987bb2..74696f91c5c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -46,6 +46,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -329,8 +331,12 @@ public abstract class AbstractFSWAL implements WAL { protected final AtomicBoolean rollRequested = new AtomicBoolean(false); - private final ExecutorService logArchiveOrShutdownExecutor = Executors.newSingleThreadExecutor( - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Archive-Or-Shutdown-%d").build()); + // Run in caller if we get reject execution exception, to avoid aborting region server when we get + // reject execution exception. Usually this should not happen but let's make it more robust. + private final ExecutorService logArchiveExecutor = + new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue(), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Archive-%d").build(), + new ThreadPoolExecutor.CallerRunsPolicy()); private final int archiveRetries; @@ -696,7 +702,7 @@ public abstract class AbstractFSWAL implements WAL { final List> localLogsToArchive = logsToArchive; // make it async for (Pair log : localLogsToArchive) { - logArchiveOrShutdownExecutor.execute(() -> { + logArchiveExecutor.execute(() -> { archive(log); }); this.walFile2Props.remove(log.getFirst()); @@ -903,7 +909,10 @@ public abstract class AbstractFSWAL implements WAL { } } - Future future = logArchiveOrShutdownExecutor.submit(new Callable() { + ExecutorService shutdownExecutor = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Shutdown-%d").build()); + + Future future = shutdownExecutor.submit(new Callable() { @Override public Void call() throws Exception { if (rollWriterLock.tryLock(walShutdownTimeout, TimeUnit.SECONDS)) { @@ -921,7 +930,7 @@ public abstract class AbstractFSWAL implements WAL { return null; } }); - logArchiveOrShutdownExecutor.shutdown(); + shutdownExecutor.shutdown(); try { future.get(walShutdownTimeout, TimeUnit.MILLISECONDS); @@ -938,6 +947,12 @@ public abstract class AbstractFSWAL implements WAL { } else { throw new IOException(e.getCause()); } + } finally { + // in shutdown we may call cleanOldLogs so shutdown this executor in the end. + // In sync replication implementation, we may shutdown a WAL without shutting down the whole + // region server, if we shutdown this executor earlier we may get reject execution exception + // and abort the region server + logArchiveExecutor.shutdown(); } }