HBASE-26866 Shutdown WAL may abort region server (#4254)
Signed-off-by: Xiaolin Ha <haxiaolin@apache.org>
(cherry picked from commit b67c16a763
)
This commit is contained in:
parent
62fc8c81a3
commit
3b24b3b969
|
@ -48,6 +48,8 @@ import java.util.concurrent.ExecutionException;
|
|||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
@ -331,8 +333,12 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
|
||||
protected final AtomicBoolean rollRequested = new AtomicBoolean(false);
|
||||
|
||||
private final ExecutorService logArchiveOrShutdownExecutor = Executors.newSingleThreadExecutor(
|
||||
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Archive-Or-Shutdown-%d").build());
|
||||
// Run in caller if we get reject execution exception, to avoid aborting region server when we get
|
||||
// reject execution exception. Usually this should not happen but let's make it more robust.
|
||||
private final ExecutorService logArchiveExecutor =
|
||||
new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(),
|
||||
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Archive-%d").build(),
|
||||
new ThreadPoolExecutor.CallerRunsPolicy());
|
||||
|
||||
private final int archiveRetries;
|
||||
|
||||
|
@ -722,7 +728,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
final List<Pair<Path, Long>> localLogsToArchive = logsToArchive;
|
||||
// make it async
|
||||
for (Pair<Path, Long> log : localLogsToArchive) {
|
||||
logArchiveOrShutdownExecutor.execute(() -> {
|
||||
logArchiveExecutor.execute(() -> {
|
||||
archive(log);
|
||||
});
|
||||
this.walFile2Props.remove(log.getFirst());
|
||||
|
@ -937,7 +943,10 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
}
|
||||
}
|
||||
|
||||
Future<Void> future = logArchiveOrShutdownExecutor.submit(new Callable<Void>() {
|
||||
ExecutorService shutdownExecutor = Executors.newSingleThreadExecutor(
|
||||
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Shutdown-%d").build());
|
||||
|
||||
Future<Void> future = shutdownExecutor.submit(new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
if (rollWriterLock.tryLock(walShutdownTimeout, TimeUnit.SECONDS)) {
|
||||
|
@ -955,7 +964,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
return null;
|
||||
}
|
||||
});
|
||||
logArchiveOrShutdownExecutor.shutdown();
|
||||
shutdownExecutor.shutdown();
|
||||
|
||||
try {
|
||||
future.get(walShutdownTimeout, TimeUnit.MILLISECONDS);
|
||||
|
@ -972,6 +981,12 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
} else {
|
||||
throw new IOException(e.getCause());
|
||||
}
|
||||
} finally {
|
||||
// in shutdown we may call cleanOldLogs so shutdown this executor in the end.
|
||||
// In sync replication implementation, we may shutdown a WAL without shutting down the whole
|
||||
// region server, if we shutdown this executor earlier we may get reject execution exception
|
||||
// and abort the region server
|
||||
logArchiveExecutor.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue