HBASE-26526 Introduce a timeout to shutdown of WAL (#3297)
Signed-off-by: Andrew Purtell <apurtell@apache.org>
(cherry picked from commit ca3ba494cb
)
This commit is contained in:
parent
dd3cdf13cf
commit
62fc8c81a3
|
@ -39,6 +39,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.OptionalLong;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentNavigableMap;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
|
@ -46,7 +47,9 @@ import java.util.concurrent.CopyOnWriteArrayList;
|
|||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
@ -144,6 +147,9 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
public static final String RING_BUFFER_SLOT_COUNT =
|
||||
"hbase.regionserver.wal.disruptor.event.count";
|
||||
|
||||
public static final String WAL_SHUTDOWN_WAIT_TIMEOUT_MS = "hbase.wal.shutdown.wait.timeout.ms";
|
||||
public static final int DEFAULT_WAL_SHUTDOWN_WAIT_TIMEOUT_MS = 15 * 1000;
|
||||
|
||||
/**
|
||||
* file system instance
|
||||
*/
|
||||
|
@ -271,6 +277,9 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
protected volatile boolean closed = false;
|
||||
|
||||
protected final AtomicBoolean shutdown = new AtomicBoolean(false);
|
||||
|
||||
protected final long walShutdownTimeout;
|
||||
|
||||
/**
|
||||
* WAL Comparator; it compares the timestamp (log filenum), present in the log file name. Throws
|
||||
* an IllegalArgumentException if used to compare paths from different wals.
|
||||
|
@ -322,8 +331,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
|
||||
protected final AtomicBoolean rollRequested = new AtomicBoolean(false);
|
||||
|
||||
private final ExecutorService logArchiveExecutor = Executors.newSingleThreadExecutor(
|
||||
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Log-Archiver-%d").build());
|
||||
private final ExecutorService logArchiveOrShutdownExecutor = Executors.newSingleThreadExecutor(
|
||||
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Archive-Or-Shutdown-%d").build());
|
||||
|
||||
private final int archiveRetries;
|
||||
|
||||
|
@ -480,7 +489,9 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
this.syncFutureCache = new SyncFutureCache(conf);
|
||||
this.implClassName = getClass().getSimpleName();
|
||||
this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC);
|
||||
archiveRetries = this.conf.getInt("hbase.regionserver.logroll.archive.retries", 0);
|
||||
archiveRetries = this.conf.getInt("hbase.regionserver.walroll.archive.retries", 0);
|
||||
this.walShutdownTimeout =
|
||||
conf.getLong(WAL_SHUTDOWN_WAIT_TIMEOUT_MS, DEFAULT_WAL_SHUTDOWN_WAIT_TIMEOUT_MS);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -711,7 +722,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
final List<Pair<Path, Long>> localLogsToArchive = logsToArchive;
|
||||
// make it async
|
||||
for (Pair<Path, Long> log : localLogsToArchive) {
|
||||
logArchiveExecutor.execute(() -> {
|
||||
logArchiveOrShutdownExecutor.execute(() -> {
|
||||
archive(log);
|
||||
});
|
||||
this.walFile2Props.remove(log.getFirst());
|
||||
|
@ -925,17 +936,42 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
i.logCloseRequested();
|
||||
}
|
||||
}
|
||||
rollWriterLock.lock();
|
||||
|
||||
Future<Void> future = logArchiveOrShutdownExecutor.submit(new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
if (rollWriterLock.tryLock(walShutdownTimeout, TimeUnit.SECONDS)) {
|
||||
try {
|
||||
doShutdown();
|
||||
if (syncFutureCache != null) {
|
||||
syncFutureCache.clear();
|
||||
}
|
||||
} finally {
|
||||
rollWriterLock.unlock();
|
||||
}
|
||||
} else {
|
||||
throw new IOException("Waiting for rollWriterLock timeout");
|
||||
}
|
||||
return null;
|
||||
}
|
||||
});
|
||||
logArchiveOrShutdownExecutor.shutdown();
|
||||
|
||||
try {
|
||||
doShutdown();
|
||||
if (syncFutureCache != null) {
|
||||
syncFutureCache.clear();
|
||||
future.get(walShutdownTimeout, TimeUnit.MILLISECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptedIOException("Interrupted when waiting for shutdown WAL");
|
||||
} catch (TimeoutException e) {
|
||||
throw new TimeoutIOException("We have waited " + walShutdownTimeout + "ms, but"
|
||||
+ " the shutdown of WAL doesn't complete! Please check the status of underlying "
|
||||
+ "filesystem or increase the wait time by the config \"" + WAL_SHUTDOWN_WAIT_TIMEOUT_MS
|
||||
+ "\"", e);
|
||||
} catch (ExecutionException e) {
|
||||
if (e.getCause() instanceof IOException) {
|
||||
throw (IOException) e.getCause();
|
||||
} else {
|
||||
throw new IOException(e.getCause());
|
||||
}
|
||||
if (logArchiveExecutor != null) {
|
||||
logArchiveExecutor.shutdownNow();
|
||||
}
|
||||
} finally {
|
||||
rollWriterLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue