HBASE-26526 Introduce a timeout to shutdown of WAL (#3297)

Signed-off-by: Andrew Purtell <apurtell@apache.org>
(cherry picked from commit ca3ba494cbc322b0824d2d755bcf4191c3a525ed)
This commit is contained in:
Xiaolin Ha 2021-12-07 12:26:59 +08:00 committed by Duo Zhang
parent e1b58290d1
commit 17deed9533

View File

@ -39,6 +39,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.OptionalLong; import java.util.OptionalLong;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
@ -46,7 +47,9 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -143,6 +146,9 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
public static final String RING_BUFFER_SLOT_COUNT = public static final String RING_BUFFER_SLOT_COUNT =
"hbase.regionserver.wal.disruptor.event.count"; "hbase.regionserver.wal.disruptor.event.count";
public static final String WAL_SHUTDOWN_WAIT_TIMEOUT_MS = "hbase.wal.shutdown.wait.timeout.ms";
public static final int DEFAULT_WAL_SHUTDOWN_WAIT_TIMEOUT_MS = 15 * 1000;
/** /**
* file system instance * file system instance
*/ */
@ -270,6 +276,9 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
protected volatile boolean closed = false; protected volatile boolean closed = false;
protected final AtomicBoolean shutdown = new AtomicBoolean(false); protected final AtomicBoolean shutdown = new AtomicBoolean(false);
protected final long walShutdownTimeout;
/** /**
* WAL Comparator; it compares the timestamp (log filenum), present in the log file name. Throws * WAL Comparator; it compares the timestamp (log filenum), present in the log file name. Throws
* an IllegalArgumentException if used to compare paths from different wals. * an IllegalArgumentException if used to compare paths from different wals.
@ -321,8 +330,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
protected final AtomicBoolean rollRequested = new AtomicBoolean(false); protected final AtomicBoolean rollRequested = new AtomicBoolean(false);
private final ExecutorService logArchiveExecutor = Executors.newSingleThreadExecutor( private final ExecutorService logArchiveOrShutdownExecutor = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Log-Archiver-%d").build()); new ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Archive-Or-Shutdown-%d").build());
private final int archiveRetries; private final int archiveRetries;
@ -479,7 +488,9 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
this.syncFutureCache = new SyncFutureCache(conf); this.syncFutureCache = new SyncFutureCache(conf);
this.implClassName = getClass().getSimpleName(); this.implClassName = getClass().getSimpleName();
this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC); this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC);
archiveRetries = this.conf.getInt("hbase.regionserver.logroll.archive.retries", 0); archiveRetries = this.conf.getInt("hbase.regionserver.walroll.archive.retries", 0);
this.walShutdownTimeout =
conf.getLong(WAL_SHUTDOWN_WAIT_TIMEOUT_MS, DEFAULT_WAL_SHUTDOWN_WAIT_TIMEOUT_MS);
} }
/** /**
@ -710,7 +721,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
final List<Pair<Path, Long>> localLogsToArchive = logsToArchive; final List<Pair<Path, Long>> localLogsToArchive = logsToArchive;
// make it async // make it async
for (Pair<Path, Long> log : localLogsToArchive) { for (Pair<Path, Long> log : localLogsToArchive) {
logArchiveExecutor.execute(() -> { logArchiveOrShutdownExecutor.execute(() -> {
archive(log); archive(log);
}); });
this.walFile2Props.remove(log.getFirst()); this.walFile2Props.remove(log.getFirst());
@ -924,17 +935,42 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
i.logCloseRequested(); i.logCloseRequested();
} }
} }
rollWriterLock.lock();
Future<Void> future = logArchiveOrShutdownExecutor.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
if (rollWriterLock.tryLock(walShutdownTimeout, TimeUnit.SECONDS)) {
try {
doShutdown();
if (syncFutureCache != null) {
syncFutureCache.clear();
}
} finally {
rollWriterLock.unlock();
}
} else {
throw new IOException("Waiting for rollWriterLock timeout");
}
return null;
}
});
logArchiveOrShutdownExecutor.shutdown();
try { try {
doShutdown(); future.get(walShutdownTimeout, TimeUnit.MILLISECONDS);
if (syncFutureCache != null) { } catch (InterruptedException e) {
syncFutureCache.clear(); throw new InterruptedIOException("Interrupted when waiting for shutdown WAL");
} catch (TimeoutException e) {
throw new TimeoutIOException("We have waited " + walShutdownTimeout + "ms, but"
+ " the shutdown of WAL doesn't complete! Please check the status of underlying "
+ "filesystem or increase the wait time by the config \"" + WAL_SHUTDOWN_WAIT_TIMEOUT_MS
+ "\"", e);
} catch (ExecutionException e) {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
} else {
throw new IOException(e.getCause());
} }
if (logArchiveExecutor != null) {
logArchiveExecutor.shutdownNow();
}
} finally {
rollWriterLock.unlock();
} }
} }