HBASE-21806 add an option to roll WAL on very slow syncs
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
513ba9ac59
commit
c90e9ff5ef
|
@ -407,6 +407,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
|
|||
waitingAckQueue.removeFirst();
|
||||
return;
|
||||
}
|
||||
// TODO: we should perhaps measure time taken per DN here;
|
||||
// we could collect statistics per DN, and/or exclude bad nodes in createOutput.
|
||||
datanodeList.forEach(ch -> {
|
||||
ch.write(headerBuf.retainedDuplicate());
|
||||
ch.write(checksumBuf.retainedDuplicate());
|
||||
|
|
|
@ -116,8 +116,13 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
/** Don't log blocking regions more frequently than this. */
|
||||
private static final long SURVIVED_TOO_LONG_LOG_INTERVAL_NS = TimeUnit.MINUTES.toNanos(5);
|
||||
|
||||
private static final String SLOW_SYNC_TIME_MS ="hbase.regionserver.hlog.slowsync.ms";
|
||||
protected static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms
|
||||
|
||||
private static final String ROLL_ON_SYNC_TIME_MS = "hbase.regionserver.hlog.roll.on.sync.ms";
|
||||
protected static final int DEFAULT_ROLL_ON_SYNC_TIME_MS = 10000; // in ms
|
||||
|
||||
private static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.hlog.sync.timeout";
|
||||
private static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min
|
||||
|
||||
/**
|
||||
|
@ -172,7 +177,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
*/
|
||||
protected final SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting();
|
||||
|
||||
protected final long slowSyncNs;
|
||||
/** The slow sync will be logged; the very slow sync will cause the WAL to be rolled. */
|
||||
protected final long slowSyncNs, rollOnSyncNs;
|
||||
|
||||
private final long walSyncTimeoutNs;
|
||||
|
||||
|
@ -429,10 +435,13 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize=" +
|
||||
StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix=" +
|
||||
walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir);
|
||||
this.slowSyncNs = TimeUnit.MILLISECONDS
|
||||
.toNanos(conf.getInt("hbase.regionserver.hlog.slowsync.ms", DEFAULT_SLOW_SYNC_TIME_MS));
|
||||
this.walSyncTimeoutNs = TimeUnit.MILLISECONDS
|
||||
.toNanos(conf.getLong("hbase.regionserver.hlog.sync.timeout", DEFAULT_WAL_SYNC_TIMEOUT_MS));
|
||||
this.slowSyncNs = TimeUnit.MILLISECONDS.toNanos(
|
||||
conf.getInt(SLOW_SYNC_TIME_MS, DEFAULT_SLOW_SYNC_TIME_MS));
|
||||
this.rollOnSyncNs = TimeUnit.MILLISECONDS.toNanos(
|
||||
conf.getInt(ROLL_ON_SYNC_TIME_MS, DEFAULT_ROLL_ON_SYNC_TIME_MS));
|
||||
this.walSyncTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
|
||||
conf.getLong(WAL_SYNC_TIMEOUT_MS, DEFAULT_WAL_SYNC_TIMEOUT_MS));
|
||||
|
||||
this.cachedSyncFutures = new ThreadLocal<SyncFuture>() {
|
||||
@Override
|
||||
protected SyncFuture initialValue() {
|
||||
|
@ -988,7 +997,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
return len;
|
||||
}
|
||||
|
||||
protected final void postSync(final long timeInNanos, final int handlerSyncs) {
|
||||
protected final boolean postSync(long timeInNanos, int handlerSyncs) {
|
||||
if (timeInNanos > this.slowSyncNs) {
|
||||
String msg = new StringBuilder().append("Slow sync cost: ").append(timeInNanos / 1000000)
|
||||
.append(" ms, current pipeline: ").append(Arrays.toString(getPipeline())).toString();
|
||||
|
@ -1000,6 +1009,11 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
listener.postSync(timeInNanos, handlerSyncs);
|
||||
}
|
||||
}
|
||||
if (timeInNanos > this.rollOnSyncNs) {
|
||||
LOG.info("Trying to request a roll due to a very long sync ({} ms)", timeInNanos / 1000000);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key,
|
||||
|
|
|
@ -328,13 +328,14 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||
break;
|
||||
}
|
||||
}
|
||||
postSync(System.nanoTime() - startTimeNs, finishSync(true));
|
||||
|
||||
boolean doRequestRoll = postSync(System.nanoTime() - startTimeNs, finishSync(true));
|
||||
if (trySetReadyForRolling()) {
|
||||
// we have just finished a roll, then do not need to check for log rolling, the writer will be
|
||||
// closed soon.
|
||||
return;
|
||||
}
|
||||
if (writer.getLength() < logrollsize || rollRequested) {
|
||||
if ((!doRequestRoll && writer.getLength() < logrollsize) || rollRequested) {
|
||||
return;
|
||||
}
|
||||
rollRequested = true;
|
||||
|
|
|
@ -257,7 +257,10 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||
long startTimeNanos = System.nanoTime();
|
||||
try {
|
||||
nextWriter.sync(useHsync);
|
||||
postSync(System.nanoTime() - startTimeNanos, 0);
|
||||
boolean doRequestRoll = postSync(System.nanoTime() - startTimeNanos, 0);
|
||||
if (doRequestRoll) {
|
||||
LOG.info("Ignoring a roll request after a sync for a new file");
|
||||
}
|
||||
} catch (IOException e) {
|
||||
// optimization failed, no need to abort here.
|
||||
LOG.warn("pre-sync failed but an optimization so keep going", e);
|
||||
|
@ -576,6 +579,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||
//TraceScope scope = Trace.continueSpan(takeSyncFuture.getSpan());
|
||||
long start = System.nanoTime();
|
||||
Throwable lastException = null;
|
||||
boolean wasRollRequested = false;
|
||||
try {
|
||||
TraceUtil.addTimelineAnnotation("syncing writer");
|
||||
writer.sync(useHsync);
|
||||
|
@ -596,12 +600,16 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||
// Can we release other syncs?
|
||||
syncCount += releaseSyncFutures(currentSequence, lastException);
|
||||
if (lastException != null) {
|
||||
wasRollRequested = true;
|
||||
requestLogRoll();
|
||||
} else {
|
||||
checkLogRoll();
|
||||
wasRollRequested = checkLogRoll();
|
||||
}
|
||||
}
|
||||
postSync(System.nanoTime() - start, syncCount);
|
||||
boolean doRequestRoll = postSync(System.nanoTime() - start, syncCount);
|
||||
if (!wasRollRequested && doRequestRoll) {
|
||||
requestLogRoll();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
// Presume legit interrupt.
|
||||
Thread.currentThread().interrupt();
|
||||
|
@ -615,10 +623,10 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||
/**
|
||||
* Schedule a log roll if needed.
|
||||
*/
|
||||
private void checkLogRoll() {
|
||||
private boolean checkLogRoll() {
|
||||
// Will return immediately if we are in the middle of a WAL log roll currently.
|
||||
if (!rollWriterLock.tryLock()) {
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
boolean lowReplication;
|
||||
try {
|
||||
|
@ -628,7 +636,9 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||
}
|
||||
if (lowReplication || (writer != null && writer.getLength() > logrollsize)) {
|
||||
requestLogRoll(lowReplication);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in New Issue