HBASE-19542 fix TestSafemodeBringsDownMaster
This commit is contained in:
parent
90ac3c93c9
commit
0a5ef1ed88
|
@ -148,6 +148,10 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
||||||
"hbase.wal.async.use-shared-event-loop";
|
"hbase.wal.async.use-shared-event-loop";
|
||||||
public static final boolean DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP = false;
|
public static final boolean DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP = false;
|
||||||
|
|
||||||
|
public static final String ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS =
|
||||||
|
"hbase.wal.async.wait.on.shutdown.seconds";
|
||||||
|
public static final int DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS = 5;
|
||||||
|
|
||||||
private final EventLoopGroup eventLoopGroup;
|
private final EventLoopGroup eventLoopGroup;
|
||||||
|
|
||||||
private final ExecutorService consumeExecutor;
|
private final ExecutorService consumeExecutor;
|
||||||
|
@ -207,6 +211,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
||||||
|
|
||||||
private long highestProcessedAppendTxidAtLastSync;
|
private long highestProcessedAppendTxidAtLastSync;
|
||||||
|
|
||||||
|
private final int waitOnShutdownInSeconds;
|
||||||
|
|
||||||
public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
|
public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
|
||||||
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
|
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
|
||||||
String prefix, String suffix, EventLoopGroup eventLoopGroup,
|
String prefix, String suffix, EventLoopGroup eventLoopGroup,
|
||||||
|
@ -254,6 +260,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
||||||
batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE);
|
batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE);
|
||||||
createMaxRetries =
|
createMaxRetries =
|
||||||
conf.getInt(ASYNC_WAL_CREATE_MAX_RETRIES, DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES);
|
conf.getInt(ASYNC_WAL_CREATE_MAX_RETRIES, DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES);
|
||||||
|
waitOnShutdownInSeconds = conf.getInt(ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS,
|
||||||
|
DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS);
|
||||||
rollWriter();
|
rollWriter();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -699,30 +707,26 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
||||||
} finally {
|
} finally {
|
||||||
consumeLock.unlock();
|
consumeLock.unlock();
|
||||||
}
|
}
|
||||||
long oldFileLen;
|
return executeClose(closeExecutor, oldWriter);
|
||||||
if (oldWriter != null) {
|
|
||||||
oldFileLen = oldWriter.getLength();
|
|
||||||
closeExecutor.execute(() -> {
|
|
||||||
try {
|
|
||||||
oldWriter.close();
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.warn("close old writer failed", e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
oldFileLen = 0L;
|
|
||||||
}
|
|
||||||
return oldFileLen;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doShutdown() throws IOException {
|
protected void doShutdown() throws IOException {
|
||||||
waitForSafePoint();
|
waitForSafePoint();
|
||||||
if (this.writer != null) {
|
executeClose(closeExecutor, writer);
|
||||||
this.writer.close();
|
|
||||||
this.writer = null;
|
|
||||||
}
|
|
||||||
closeExecutor.shutdown();
|
closeExecutor.shutdown();
|
||||||
|
try {
|
||||||
|
if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) {
|
||||||
|
LOG.error("We have waited " + waitOnShutdownInSeconds + " seconds but"
|
||||||
|
+ " the close of async writer doesn't complete."
|
||||||
|
+ "Please check the status of underlying filesystem"
|
||||||
|
+ " or increase the wait time by the config \""
|
||||||
|
+ ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS + "\"");
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
LOG.error("The wait for close of async writer is interrupted");
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
IOException error = new IOException("WAL has been closed");
|
IOException error = new IOException("WAL has been closed");
|
||||||
syncFutures.forEach(f -> f.done(f.getTxid(), error));
|
syncFutures.forEach(f -> f.done(f.getTxid(), error));
|
||||||
if (!(consumeExecutor instanceof EventLoop)) {
|
if (!(consumeExecutor instanceof EventLoop)) {
|
||||||
|
@ -730,6 +734,23 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static long executeClose(ExecutorService closeExecutor, AsyncWriter writer) {
|
||||||
|
long fileLength;
|
||||||
|
if (writer != null) {
|
||||||
|
fileLength = writer.getLength();
|
||||||
|
closeExecutor.execute(() -> {
|
||||||
|
try {
|
||||||
|
writer.close();
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("close old writer failed", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
fileLength = 0L;
|
||||||
|
}
|
||||||
|
return fileLength;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doAppend(AsyncWriter writer, FSWALEntry entry) {
|
protected void doAppend(AsyncWriter writer, FSWALEntry entry) {
|
||||||
writer.append(entry);
|
writer.append(entry);
|
||||||
|
|
Loading…
Reference in New Issue