HBASE-19542 fix TestSafemodeBringsDownMaster
This commit is contained in:
parent
dc5ec061b5
commit
c811d7f965
|
@ -148,6 +148,10 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||
"hbase.wal.async.use-shared-event-loop";
|
||||
public static final boolean DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP = false;
|
||||
|
||||
public static final String ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS =
|
||||
"hbase.wal.async.wait.on.shutdown.seconds";
|
||||
public static final int DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS = 5;
|
||||
|
||||
private final EventLoopGroup eventLoopGroup;
|
||||
|
||||
private final ExecutorService consumeExecutor;
|
||||
|
@ -207,6 +211,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||
|
||||
private long highestProcessedAppendTxidAtLastSync;
|
||||
|
||||
private final int waitOnShutdownInSeconds;
|
||||
|
||||
public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
|
||||
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
|
||||
String prefix, String suffix, EventLoopGroup eventLoopGroup,
|
||||
|
@ -254,6 +260,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||
batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE);
|
||||
createMaxRetries =
|
||||
conf.getInt(ASYNC_WAL_CREATE_MAX_RETRIES, DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES);
|
||||
waitOnShutdownInSeconds = conf.getInt(ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS,
|
||||
DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS);
|
||||
rollWriter();
|
||||
}
|
||||
|
||||
|
@ -699,30 +707,26 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||
} finally {
|
||||
consumeLock.unlock();
|
||||
}
|
||||
long oldFileLen;
|
||||
if (oldWriter != null) {
|
||||
oldFileLen = oldWriter.getLength();
|
||||
closeExecutor.execute(() -> {
|
||||
try {
|
||||
oldWriter.close();
|
||||
} catch (IOException e) {
|
||||
LOG.warn("close old writer failed", e);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
oldFileLen = 0L;
|
||||
}
|
||||
return oldFileLen;
|
||||
return executeClose(closeExecutor, oldWriter);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doShutdown() throws IOException {
|
||||
waitForSafePoint();
|
||||
if (this.writer != null) {
|
||||
this.writer.close();
|
||||
this.writer = null;
|
||||
}
|
||||
executeClose(closeExecutor, writer);
|
||||
closeExecutor.shutdown();
|
||||
try {
|
||||
if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) {
|
||||
LOG.error("We have waited " + waitOnShutdownInSeconds + " seconds but"
|
||||
+ " the close of async writer doesn't complete."
|
||||
+ "Please check the status of underlying filesystem"
|
||||
+ " or increase the wait time by the config \""
|
||||
+ ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS + "\"");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
LOG.error("The wait for close of async writer is interrupted");
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
IOException error = new IOException("WAL has been closed");
|
||||
syncFutures.forEach(f -> f.done(f.getTxid(), error));
|
||||
if (!(consumeExecutor instanceof EventLoop)) {
|
||||
|
@ -730,6 +734,23 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||
}
|
||||
}
|
||||
|
||||
private static long executeClose(ExecutorService closeExecutor, AsyncWriter writer) {
|
||||
long fileLength;
|
||||
if (writer != null) {
|
||||
fileLength = writer.getLength();
|
||||
closeExecutor.execute(() -> {
|
||||
try {
|
||||
writer.close();
|
||||
} catch (IOException e) {
|
||||
LOG.warn("close old writer failed", e);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
fileLength = 0L;
|
||||
}
|
||||
return fileLength;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doAppend(AsyncWriter writer, FSWALEntry entry) {
|
||||
writer.append(entry);
|
||||
|
|
Loading…
Reference in New Issue