HBASE-26526 Introduce a timeout to shutdown of WAL (#3297)
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
bdbb4fa087
commit
ca3ba494cb
|
@ -39,6 +39,7 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.OptionalLong;
|
import java.util.OptionalLong;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentNavigableMap;
|
import java.util.concurrent.ConcurrentNavigableMap;
|
||||||
import java.util.concurrent.ConcurrentSkipListMap;
|
import java.util.concurrent.ConcurrentSkipListMap;
|
||||||
|
@ -46,7 +47,9 @@ import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
@ -147,6 +150,10 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
||||||
public static final String RING_BUFFER_SLOT_COUNT =
|
public static final String RING_BUFFER_SLOT_COUNT =
|
||||||
"hbase.regionserver.wal.disruptor.event.count";
|
"hbase.regionserver.wal.disruptor.event.count";
|
||||||
|
|
||||||
|
public static final String WAL_SHUTDOWN_WAIT_TIMEOUT_MS =
|
||||||
|
"hbase.wal.shutdown.wait.timeout.ms";
|
||||||
|
public static final int DEFAULT_WAL_SHUTDOWN_WAIT_TIMEOUT_MS = 15 * 1000;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* file system instance
|
* file system instance
|
||||||
*/
|
*/
|
||||||
|
@ -278,6 +285,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
||||||
|
|
||||||
protected final AtomicBoolean shutdown = new AtomicBoolean(false);
|
protected final AtomicBoolean shutdown = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
protected final long walShutdownTimeout;
|
||||||
|
|
||||||
private long nextLogTooOldNs = System.nanoTime();
|
private long nextLogTooOldNs = System.nanoTime();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -336,8 +345,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
||||||
|
|
||||||
protected final AtomicBoolean rollRequested = new AtomicBoolean(false);
|
protected final AtomicBoolean rollRequested = new AtomicBoolean(false);
|
||||||
|
|
||||||
private final ExecutorService logArchiveExecutor = Executors.newSingleThreadExecutor(
|
private final ExecutorService logArchiveOrShutdownExecutor = Executors.newSingleThreadExecutor(
|
||||||
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Archiver-%d").build());
|
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Archive-Or-Shutdown-%d").build());
|
||||||
|
|
||||||
private final int archiveRetries;
|
private final int archiveRetries;
|
||||||
|
|
||||||
|
@ -499,7 +508,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
||||||
SURVIVED_TOO_LONG_SEC_KEY, SURVIVED_TOO_LONG_SEC_DEFAULT));
|
SURVIVED_TOO_LONG_SEC_KEY, SURVIVED_TOO_LONG_SEC_DEFAULT));
|
||||||
this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC);
|
this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC);
|
||||||
archiveRetries = this.conf.getInt("hbase.regionserver.walroll.archive.retries", 0);
|
archiveRetries = this.conf.getInt("hbase.regionserver.walroll.archive.retries", 0);
|
||||||
|
this.walShutdownTimeout = conf.getLong(WAL_SHUTDOWN_WAIT_TIMEOUT_MS,
|
||||||
|
DEFAULT_WAL_SHUTDOWN_WAIT_TIMEOUT_MS);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -766,7 +776,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
||||||
final List<Pair<Path, Long>> localLogsToArchive = logsToArchive;
|
final List<Pair<Path, Long>> localLogsToArchive = logsToArchive;
|
||||||
// make it async
|
// make it async
|
||||||
for (Pair<Path, Long> log : localLogsToArchive) {
|
for (Pair<Path, Long> log : localLogsToArchive) {
|
||||||
logArchiveExecutor.execute(() -> {
|
logArchiveOrShutdownExecutor.execute(() -> {
|
||||||
archive(log);
|
archive(log);
|
||||||
});
|
});
|
||||||
this.walFile2Props.remove(log.getFirst());
|
this.walFile2Props.remove(log.getFirst());
|
||||||
|
@ -980,18 +990,43 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
||||||
i.logCloseRequested();
|
i.logCloseRequested();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
rollWriterLock.lock();
|
|
||||||
|
Future<Void> future = logArchiveOrShutdownExecutor.submit(new Callable<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void call() throws Exception {
|
||||||
|
if (rollWriterLock.tryLock(walShutdownTimeout, TimeUnit.SECONDS)) {
|
||||||
try {
|
try {
|
||||||
doShutdown();
|
doShutdown();
|
||||||
if (syncFutureCache != null) {
|
if (syncFutureCache != null) {
|
||||||
syncFutureCache.clear();
|
syncFutureCache.clear();
|
||||||
}
|
}
|
||||||
if (logArchiveExecutor != null) {
|
|
||||||
logArchiveExecutor.shutdownNow();
|
|
||||||
}
|
|
||||||
} finally {
|
} finally {
|
||||||
rollWriterLock.unlock();
|
rollWriterLock.unlock();
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
throw new IOException("Waiting for rollWriterLock timeout");
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
logArchiveOrShutdownExecutor.shutdown();
|
||||||
|
|
||||||
|
try {
|
||||||
|
future.get(walShutdownTimeout, TimeUnit.MILLISECONDS);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new InterruptedIOException("Interrupted when waiting for shutdown WAL");
|
||||||
|
} catch (TimeoutException e) {
|
||||||
|
throw new TimeoutIOException("We have waited " + walShutdownTimeout + "ms, but"
|
||||||
|
+ " the shutdown of WAL doesn't complete! Please check the status of underlying "
|
||||||
|
+ "filesystem or increase the wait time by the config \""
|
||||||
|
+ WAL_SHUTDOWN_WAIT_TIMEOUT_MS + "\"", e);
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
if (e.getCause() instanceof IOException) {
|
||||||
|
throw (IOException) e.getCause();
|
||||||
|
} else {
|
||||||
|
throw new IOException(e.getCause());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue