HBASE-22301 Consider rolling the WAL if the HDFS write pipeline is slow
This commit is contained in:
parent
38c3178c18
commit
b06ad82b9e
|
@ -60,10 +60,19 @@ public interface MetricsWALSource extends BaseSource {
|
|||
String SYNC_TIME = "syncTime";
|
||||
String SYNC_TIME_DESC = "The time it took to sync the WAL to HDFS.";
|
||||
String ROLL_REQUESTED = "rollRequest";
|
||||
String ROLL_REQUESTED_DESC = "How many times a log roll has been requested total";
|
||||
String ROLL_REQUESTED_DESC = "How many times a roll has been requested total";
|
||||
String ERROR_ROLL_REQUESTED = "errorRollRequest";
|
||||
String ERROR_ROLL_REQUESTED_DESC =
|
||||
"How many times a roll was requested due to I/O or other errors.";
|
||||
String LOW_REPLICA_ROLL_REQUESTED = "lowReplicaRollRequest";
|
||||
String LOW_REPLICA_ROLL_REQUESTED_DESC =
|
||||
"How many times a log roll was requested due to too few DN's in the write pipeline.";
|
||||
"How many times a roll was requested due to too few datanodes in the write pipeline.";
|
||||
String SLOW_SYNC_ROLL_REQUESTED = "slowSyncRollRequest";
|
||||
String SLOW_SYNC_ROLL_REQUESTED_DESC =
|
||||
"How many times a roll was requested due to sync too slow on the write pipeline.";
|
||||
String SIZE_ROLL_REQUESTED = "sizeRollRequest";
|
||||
String SIZE_ROLL_REQUESTED_DESC =
|
||||
"How many times a roll was requested due to file size roll threshold.";
|
||||
String WRITTEN_BYTES = "writtenBytes";
|
||||
String WRITTEN_BYTES_DESC = "Size (in bytes) of the data written to the WAL.";
|
||||
|
||||
|
@ -94,10 +103,16 @@ public interface MetricsWALSource extends BaseSource {
|
|||
|
||||
void incrementLogRollRequested();
|
||||
|
||||
void incrementErrorLogRoll();
|
||||
|
||||
void incrementLowReplicationLogRoll();
|
||||
|
||||
long getSlowAppendCount();
|
||||
|
||||
void incrementSlowSyncLogRoll();
|
||||
|
||||
void incrementSizeLogRoll();
|
||||
|
||||
void incrementWrittenBytes(long val);
|
||||
|
||||
long getWrittenBytes();
|
||||
|
|
|
@ -38,7 +38,10 @@ public class MetricsWALSourceImpl extends BaseSourceImpl implements MetricsWALSo
|
|||
private final MutableFastCounter appendCount;
|
||||
private final MutableFastCounter slowAppendCount;
|
||||
private final MutableFastCounter logRollRequested;
|
||||
private final MutableFastCounter lowReplicationLogRollRequested;
|
||||
private final MutableFastCounter errorRollRequested;
|
||||
private final MutableFastCounter lowReplicationRollRequested;
|
||||
private final MutableFastCounter slowSyncRollRequested;
|
||||
private final MutableFastCounter sizeRollRequested;
|
||||
private final MutableFastCounter writtenBytes;
|
||||
|
||||
public MetricsWALSourceImpl() {
|
||||
|
@ -60,8 +63,14 @@ public class MetricsWALSourceImpl extends BaseSourceImpl implements MetricsWALSo
|
|||
syncTimeHisto = this.getMetricsRegistry().newTimeHistogram(SYNC_TIME, SYNC_TIME_DESC);
|
||||
logRollRequested =
|
||||
this.getMetricsRegistry().newCounter(ROLL_REQUESTED, ROLL_REQUESTED_DESC, 0L);
|
||||
lowReplicationLogRollRequested = this.getMetricsRegistry()
|
||||
errorRollRequested = this.getMetricsRegistry()
|
||||
.newCounter(ERROR_ROLL_REQUESTED, ERROR_ROLL_REQUESTED_DESC, 0L);
|
||||
lowReplicationRollRequested = this.getMetricsRegistry()
|
||||
.newCounter(LOW_REPLICA_ROLL_REQUESTED, LOW_REPLICA_ROLL_REQUESTED_DESC, 0L);
|
||||
slowSyncRollRequested = this.getMetricsRegistry()
|
||||
.newCounter(SLOW_SYNC_ROLL_REQUESTED, SLOW_SYNC_ROLL_REQUESTED_DESC, 0L);
|
||||
sizeRollRequested = this.getMetricsRegistry()
|
||||
.newCounter(SIZE_ROLL_REQUESTED, SIZE_ROLL_REQUESTED_DESC, 0L);
|
||||
writtenBytes = this.getMetricsRegistry().newCounter(WRITTEN_BYTES, WRITTEN_BYTES_DESC, 0L);
|
||||
}
|
||||
|
||||
|
@ -95,9 +104,24 @@ public class MetricsWALSourceImpl extends BaseSourceImpl implements MetricsWALSo
|
|||
logRollRequested.incr();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incrementErrorLogRoll() {
|
||||
errorRollRequested.incr();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incrementLowReplicationLogRoll() {
|
||||
lowReplicationLogRollRequested.incr();
|
||||
lowReplicationRollRequested.incr();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incrementSlowSyncLogRoll() {
|
||||
slowSyncRollRequested.incr();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incrementSizeLogRoll() {
|
||||
sizeRollRequested.incr();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -70,7 +70,7 @@ public class LogRoller extends HasThread implements Closeable {
|
|||
if (null == walNeedsRoll.putIfAbsent(wal, Boolean.FALSE)) {
|
||||
wal.registerWALActionsListener(new WALActionsListener() {
|
||||
@Override
|
||||
public void logRollRequested(boolean lowReplicas) {
|
||||
public void logRollRequested(WALActionsListener.RollRequestReason reason) {
|
||||
walNeedsRoll.put(wal, Boolean.TRUE);
|
||||
// TODO logs will contend with each other here, replace with e.g. DelayedQueue
|
||||
synchronized(rollLog) {
|
||||
|
|
|
@ -17,6 +17,9 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.wal;
|
||||
|
||||
import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR;
|
||||
import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.LOW_REPLICATION;
|
||||
import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SLOW_SYNC;
|
||||
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
|
||||
import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument;
|
||||
import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;
|
||||
|
@ -112,9 +115,19 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(AbstractFSWAL.class);
|
||||
|
||||
protected static final String SLOW_SYNC_TIME_MS ="hbase.regionserver.wal.slowsync.ms";
|
||||
protected static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms
|
||||
protected static final String ROLL_ON_SYNC_TIME_MS = "hbase.regionserver.wal.roll.on.sync.ms";
|
||||
protected static final int DEFAULT_ROLL_ON_SYNC_TIME_MS = 10000; // in ms
|
||||
protected static final String SLOW_SYNC_ROLL_THRESHOLD =
|
||||
"hbase.regionserver.wal.slowsync.roll.threshold";
|
||||
protected static final int DEFAULT_SLOW_SYNC_ROLL_THRESHOLD = 100; // 100 slow sync warnings
|
||||
protected static final String SLOW_SYNC_ROLL_INTERVAL_MS =
|
||||
"hbase.regionserver.wal.slowsync.roll.interval.ms";
|
||||
protected static final int DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS = 60 * 1000; // in ms, 1 minute
|
||||
|
||||
private static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min
|
||||
protected static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.wal.sync.timeout";
|
||||
protected static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min
|
||||
|
||||
/**
|
||||
* file system instance
|
||||
|
@ -168,7 +181,10 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
*/
|
||||
protected final SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting();
|
||||
|
||||
protected final long slowSyncNs;
|
||||
protected final long slowSyncNs, rollOnSyncNs;
|
||||
protected final int slowSyncRollThreshold;
|
||||
protected final int slowSyncCheckInterval;
|
||||
protected final AtomicInteger slowSyncCount = new AtomicInteger();
|
||||
|
||||
private final long walSyncTimeoutNs;
|
||||
|
||||
|
@ -225,7 +241,10 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
volatile W writer;
|
||||
|
||||
// Last time to check low replication on hlog's pipeline
|
||||
private long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime();
|
||||
private volatile long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime();
|
||||
|
||||
// Last time we asked to roll the log due to a slow sync
|
||||
private volatile long lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime();
|
||||
|
||||
protected volatile boolean closed = false;
|
||||
|
||||
|
@ -280,6 +299,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
*/
|
||||
protected final String implClassName;
|
||||
|
||||
protected volatile boolean rollRequested;
|
||||
|
||||
public long getFilenum() {
|
||||
return this.filenum.get();
|
||||
}
|
||||
|
@ -414,10 +435,16 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize=" +
|
||||
StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix=" +
|
||||
walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir);
|
||||
this.slowSyncNs = TimeUnit.MILLISECONDS
|
||||
.toNanos(conf.getInt("hbase.regionserver.hlog.slowsync.ms", DEFAULT_SLOW_SYNC_TIME_MS));
|
||||
this.walSyncTimeoutNs = TimeUnit.MILLISECONDS
|
||||
.toNanos(conf.getLong("hbase.regionserver.hlog.sync.timeout", DEFAULT_WAL_SYNC_TIMEOUT_MS));
|
||||
this.slowSyncNs = TimeUnit.MILLISECONDS.toNanos(conf.getInt(SLOW_SYNC_TIME_MS,
|
||||
conf.getInt("hbase.regionserver.hlog.slowsync.ms", DEFAULT_SLOW_SYNC_TIME_MS)));
|
||||
this.rollOnSyncNs = TimeUnit.MILLISECONDS.toNanos(conf.getInt(ROLL_ON_SYNC_TIME_MS,
|
||||
DEFAULT_ROLL_ON_SYNC_TIME_MS));
|
||||
this.slowSyncRollThreshold = conf.getInt(SLOW_SYNC_ROLL_THRESHOLD,
|
||||
DEFAULT_SLOW_SYNC_ROLL_THRESHOLD);
|
||||
this.slowSyncCheckInterval = conf.getInt(SLOW_SYNC_ROLL_INTERVAL_MS,
|
||||
DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS);
|
||||
this.walSyncTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(WAL_SYNC_TIMEOUT_MS,
|
||||
conf.getLong("hbase.regionserver.hlog.sync.timeout", DEFAULT_WAL_SYNC_TIMEOUT_MS)));
|
||||
this.cachedSyncFutures = new ThreadLocal<SyncFuture>() {
|
||||
@Override
|
||||
protected SyncFuture initialValue() {
|
||||
|
@ -757,11 +784,16 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
tellListenersAboutPreLogRoll(oldPath, newPath);
|
||||
// NewPath could be equal to oldPath if replaceWriter fails.
|
||||
newPath = replaceWriter(oldPath, newPath, nextWriter);
|
||||
// Reset rollRequested status
|
||||
rollRequested = false;
|
||||
tellListenersAboutPostLogRoll(oldPath, newPath);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Create new " + implClassName + " writer with pipeline: " +
|
||||
Arrays.toString(getPipeline()));
|
||||
}
|
||||
// We got a new writer, so reset the slow sync count
|
||||
lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime();
|
||||
slowSyncCount.set(0);
|
||||
// Can we delete any of the old log files?
|
||||
if (getNumRolledLogFiles() > 0) {
|
||||
cleanOldLogs();
|
||||
|
@ -788,7 +820,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
|
||||
// public only until class moves to o.a.h.h.wal
|
||||
public void requestLogRoll() {
|
||||
requestLogRoll(false);
|
||||
requestLogRoll(ERROR);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -868,10 +900,19 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
return cachedSyncFutures.get().reset(sequence);
|
||||
}
|
||||
|
||||
protected final void requestLogRoll(boolean tooFewReplicas) {
|
||||
protected boolean isLogRollRequested() {
|
||||
return rollRequested;
|
||||
}
|
||||
|
||||
protected final void requestLogRoll(final WALActionsListener.RollRequestReason reason) {
|
||||
// If we have already requested a roll, don't do it again
|
||||
if (rollRequested) {
|
||||
return;
|
||||
}
|
||||
if (!this.listeners.isEmpty()) {
|
||||
rollRequested = true; // No point to assert this unless there is a registered listener
|
||||
for (WALActionsListener i : this.listeners) {
|
||||
i.logRollRequested(tooFewReplicas);
|
||||
i.logRollRequested(reason);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -942,10 +983,24 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
|
||||
protected final void postSync(final long timeInNanos, final int handlerSyncs) {
|
||||
if (timeInNanos > this.slowSyncNs) {
|
||||
String msg = new StringBuilder().append("Slow sync cost: ").append(timeInNanos / 1000000)
|
||||
.append(" ms, current pipeline: ").append(Arrays.toString(getPipeline())).toString();
|
||||
String msg = new StringBuilder().append("Slow sync cost: ")
|
||||
.append(TimeUnit.NANOSECONDS.toMillis(timeInNanos))
|
||||
.append(" ms, current pipeline: ")
|
||||
.append(Arrays.toString(getPipeline())).toString();
|
||||
TraceUtil.addTimelineAnnotation(msg);
|
||||
LOG.info(msg);
|
||||
// A single sync took too long.
|
||||
// Elsewhere in checkSlowSync, called from checkLogRoll, we will look at cumulative
|
||||
// effects. Here we have a single data point that indicates we should take immediate
|
||||
// action, so do so.
|
||||
if (timeInNanos > this.rollOnSyncNs) {
|
||||
LOG.warn("Requesting log roll because we exceeded slow sync threshold; time=" +
|
||||
TimeUnit.NANOSECONDS.toMillis(timeInNanos) + " ms, threshold=" +
|
||||
TimeUnit.NANOSECONDS.toMillis(rollOnSyncNs) + " ms, current pipeline: " +
|
||||
Arrays.toString(getPipeline()));
|
||||
requestLogRoll(SLOW_SYNC);
|
||||
}
|
||||
slowSyncCount.incrementAndGet(); // it's fine to unconditionally increment this
|
||||
}
|
||||
if (!listeners.isEmpty()) {
|
||||
for (WALActionsListener listener : listeners) {
|
||||
|
@ -1031,6 +1086,41 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
|
||||
protected abstract boolean doCheckLogLowReplication();
|
||||
|
||||
/**
|
||||
* @return true if we exceeded the slow sync roll threshold over the last check
|
||||
* interval
|
||||
*/
|
||||
protected boolean doCheckSlowSync() {
|
||||
boolean result = false;
|
||||
long now = EnvironmentEdgeManager.currentTime();
|
||||
long elapsedTime = now - lastTimeCheckSlowSync;
|
||||
if (elapsedTime >= slowSyncCheckInterval) {
|
||||
if (slowSyncCount.get() >= slowSyncRollThreshold) {
|
||||
if (elapsedTime >= (2 * slowSyncCheckInterval)) {
|
||||
// If two or more slowSyncCheckInterval have elapsed this is a corner case
|
||||
// where a train of slow syncs almost triggered us but then there was a long
|
||||
// interval from then until the one more that pushed us over. If so, we
|
||||
// should do nothing and let the count reset.
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("checkSlowSync triggered but we decided to ignore it; " +
|
||||
"count=" + slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold +
|
||||
", elapsedTime=" + elapsedTime + " ms, slowSyncCheckInterval=" +
|
||||
slowSyncCheckInterval + " ms");
|
||||
}
|
||||
// Fall through to count reset below
|
||||
} else {
|
||||
LOG.warn("Requesting log roll because we exceeded slow sync threshold; count=" +
|
||||
slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold +
|
||||
", current pipeline: " + Arrays.toString(getPipeline()));
|
||||
result = true;
|
||||
}
|
||||
}
|
||||
lastTimeCheckSlowSync = now;
|
||||
slowSyncCount.set(0);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public void checkLogLowReplication(long checkInterval) {
|
||||
long now = EnvironmentEdgeManager.currentTime();
|
||||
if (now - lastTimeCheckLowReplication < checkInterval) {
|
||||
|
@ -1043,7 +1133,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
try {
|
||||
lastTimeCheckLowReplication = now;
|
||||
if (doCheckLogLowReplication()) {
|
||||
requestLogRoll(true);
|
||||
requestLogRoll(LOW_REPLICATION);
|
||||
}
|
||||
} finally {
|
||||
rollWriterLock.unlock();
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.wal;
|
||||
|
||||
import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR;
|
||||
import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SIZE;
|
||||
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
|
||||
|
||||
import com.lmax.disruptor.RingBuffer;
|
||||
|
@ -167,9 +169,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||
// notice that, modification to this field is only allowed under the protection of consumeLock.
|
||||
private volatile int epochAndState;
|
||||
|
||||
// used to guard the log roll request when we exceed the log roll size.
|
||||
private boolean rollRequested;
|
||||
|
||||
private boolean readyForRolling;
|
||||
|
||||
private final Condition readyForRollingCond = consumeLock.newCondition();
|
||||
|
@ -318,7 +317,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||
highestUnsyncedTxid = highestSyncedTxid.get();
|
||||
if (shouldRequestLogRoll) {
|
||||
// request a roll.
|
||||
requestLogRoll();
|
||||
requestLogRoll(ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -337,11 +336,14 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||
// closed soon.
|
||||
return;
|
||||
}
|
||||
if (writer.getLength() < logrollsize || rollRequested) {
|
||||
return;
|
||||
// If we haven't already requested a roll, check if we have exceeded logrollsize
|
||||
if (!isLogRollRequested() && writer.getLength() > logrollsize) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Requesting log roll because of file size threshold; length=" +
|
||||
writer.getLength() + ", logrollsize=" + logrollsize);
|
||||
}
|
||||
requestLogRoll(SIZE);
|
||||
}
|
||||
rollRequested = true;
|
||||
requestLogRoll();
|
||||
}
|
||||
|
||||
private void sync(AsyncWriter writer) {
|
||||
|
@ -663,7 +665,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||
this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput();
|
||||
}
|
||||
this.fileLengthAtLastSync = nextWriter.getLength();
|
||||
this.rollRequested = false;
|
||||
this.highestProcessedAppendTxidAtLastSync = 0L;
|
||||
consumeLock.lock();
|
||||
try {
|
||||
|
|
|
@ -17,6 +17,11 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.wal;
|
||||
|
||||
import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR;
|
||||
import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.LOW_REPLICATION;
|
||||
import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SIZE;
|
||||
import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SLOW_SYNC;
|
||||
|
||||
import com.lmax.disruptor.BlockingWaitStrategy;
|
||||
import com.lmax.disruptor.EventHandler;
|
||||
import com.lmax.disruptor.ExceptionHandler;
|
||||
|
@ -600,7 +605,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||
// Can we release other syncs?
|
||||
syncCount += releaseSyncFutures(currentSequence, lastException);
|
||||
if (lastException != null) {
|
||||
requestLogRoll();
|
||||
requestLogRoll(ERROR);
|
||||
} else {
|
||||
checkLogRoll();
|
||||
}
|
||||
|
@ -620,19 +625,32 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||
* Schedule a log roll if needed.
|
||||
*/
|
||||
private void checkLogRoll() {
|
||||
// If we have already requested a roll, do nothing
|
||||
if (isLogRollRequested()) {
|
||||
return;
|
||||
}
|
||||
// Will return immediately if we are in the middle of a WAL log roll currently.
|
||||
if (!rollWriterLock.tryLock()) {
|
||||
return;
|
||||
}
|
||||
boolean lowReplication;
|
||||
try {
|
||||
lowReplication = doCheckLogLowReplication();
|
||||
if (doCheckLogLowReplication()) {
|
||||
LOG.warn("Requesting log roll because of low replication, current pipeline: " +
|
||||
Arrays.toString(getPipeline()));
|
||||
requestLogRoll(LOW_REPLICATION);
|
||||
} else if (writer != null && writer.getLength() > logrollsize) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Requesting log roll because of file size threshold; length=" +
|
||||
writer.getLength() + ", logrollsize=" + logrollsize);
|
||||
}
|
||||
requestLogRoll(SIZE);
|
||||
} else if (doCheckSlowSync()) {
|
||||
// We log this already in checkSlowSync
|
||||
requestLogRoll(SLOW_SYNC);
|
||||
}
|
||||
} finally {
|
||||
rollWriterLock.unlock();
|
||||
}
|
||||
if (lowReplication || (writer != null && writer.getLength() > logrollsize)) {
|
||||
requestLogRoll(lowReplication);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -768,8 +786,8 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||
}
|
||||
|
||||
public static final long FIXED_OVERHEAD = ClassSize
|
||||
.align(ClassSize.OBJECT + (5 * ClassSize.REFERENCE) + ClassSize.ATOMIC_INTEGER
|
||||
+ Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG));
|
||||
.align(ClassSize.OBJECT + (5 * ClassSize.REFERENCE) + (2 * ClassSize.ATOMIC_INTEGER)
|
||||
+ (3 * Bytes.SIZEOF_INT) + (4 * Bytes.SIZEOF_LONG));
|
||||
|
||||
/**
|
||||
* This class is used coordinating two threads holding one thread at a 'safe point' while the
|
||||
|
@ -1017,7 +1035,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||
this.syncFuturesCount.get());
|
||||
} catch (Exception e) {
|
||||
// Should NEVER get here.
|
||||
requestLogRoll();
|
||||
requestLogRoll(ERROR);
|
||||
this.exception = new DamagedWALException("Failed offering sync", e);
|
||||
}
|
||||
}
|
||||
|
@ -1084,7 +1102,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||
String msg = "Append sequenceId=" + entry.getKey().getSequenceId()
|
||||
+ ", requesting roll of WAL";
|
||||
LOG.warn(msg, e);
|
||||
requestLogRoll();
|
||||
requestLogRoll(ERROR);
|
||||
throw new DamagedWALException(msg, e);
|
||||
}
|
||||
}
|
||||
|
@ -1116,4 +1134,14 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||
}
|
||||
return new DatanodeInfo[0];
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
Writer getWriter() {
|
||||
return this.writer;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void setWriter(Writer writer) {
|
||||
this.writer = writer;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -73,10 +73,23 @@ public class MetricsWAL implements WALActionsListener {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void logRollRequested(boolean underReplicated) {
|
||||
public void logRollRequested(WALActionsListener.RollRequestReason reason) {
|
||||
source.incrementLogRollRequested();
|
||||
if (underReplicated) {
|
||||
source.incrementLowReplicationLogRoll();
|
||||
switch (reason) {
|
||||
case ERROR:
|
||||
source.incrementErrorLogRoll();
|
||||
break;
|
||||
case LOW_REPLICATION:
|
||||
source.incrementLowReplicationLogRoll();
|
||||
break;
|
||||
case SIZE:
|
||||
source.incrementSizeLogRoll();
|
||||
break;
|
||||
case SLOW_SYNC:
|
||||
source.incrementSlowSyncLogRoll();
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,6 +32,18 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
@InterfaceAudience.Private
|
||||
public interface WALActionsListener {
|
||||
|
||||
/** The reason for the log roll request. */
|
||||
static enum RollRequestReason {
|
||||
/** The length of the log exceeds the roll size threshold. */
|
||||
SIZE,
|
||||
/** Too few replicas in the writer pipeline. */
|
||||
LOW_REPLICATION,
|
||||
/** Too much time spent waiting for sync. */
|
||||
SLOW_SYNC,
|
||||
/** I/O or other error. */
|
||||
ERROR
|
||||
};
|
||||
|
||||
/**
|
||||
* The WAL is going to be rolled. The oldPath can be null if this is
|
||||
* the first log file from the regionserver.
|
||||
|
@ -65,7 +77,7 @@ public interface WALActionsListener {
|
|||
/**
|
||||
* A request was made that the WAL be rolled.
|
||||
*/
|
||||
default void logRollRequested(boolean tooFewReplicas) {}
|
||||
default void logRollRequested(RollRequestReason reason) {}
|
||||
|
||||
/**
|
||||
* The WAL is about to close.
|
||||
|
|
|
@ -119,7 +119,7 @@ class DisabledWALProvider implements WALProvider {
|
|||
public byte[][] rollWriter() {
|
||||
if (!listeners.isEmpty()) {
|
||||
for (WALActionsListener listener : listeners) {
|
||||
listener.logRollRequested(false);
|
||||
listener.logRollRequested(WALActionsListener.RollRequestReason.ERROR);
|
||||
}
|
||||
for (WALActionsListener listener : listeners) {
|
||||
try {
|
||||
|
|
|
@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
|
|||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
|
@ -35,6 +36,7 @@ import org.apache.hadoop.hbase.Cell;
|
|||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
|
@ -53,7 +55,9 @@ import org.apache.hadoop.hbase.util.FSUtils;
|
|||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.junit.BeforeClass;
|
||||
|
@ -91,6 +95,178 @@ public class TestLogRolling extends AbstractTestLogRolling {
|
|||
conf.setInt("hbase.regionserver.hlog.lowreplication.rolllimit", 3);
|
||||
conf.set(WALFactory.WAL_PROVIDER, "filesystem");
|
||||
AbstractTestLogRolling.setUpBeforeClass();
|
||||
|
||||
// For slow sync threshold test: roll after 5 slow syncs in 10 seconds
|
||||
TEST_UTIL.getConfiguration().setInt(FSHLog.SLOW_SYNC_ROLL_THRESHOLD, 5);
|
||||
TEST_UTIL.getConfiguration().setInt(FSHLog.SLOW_SYNC_ROLL_INTERVAL_MS, 10 * 1000);
|
||||
// For slow sync threshold test: roll once after a sync above this threshold
|
||||
TEST_UTIL.getConfiguration().setInt(FSHLog.ROLL_ON_SYNC_TIME_MS, 5000);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSlowSyncLogRolling() throws Exception {
|
||||
// Create the test table
|
||||
TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName()))
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build();
|
||||
admin.createTable(desc);
|
||||
Table table = TEST_UTIL.getConnection().getTable(desc.getTableName());
|
||||
int row = 1;
|
||||
try {
|
||||
// Get a reference to the FSHLog
|
||||
server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName());
|
||||
RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo();
|
||||
final FSHLog log = (FSHLog) server.getWAL(region);
|
||||
|
||||
// Register a WALActionsListener to observe if a SLOW_SYNC roll is requested
|
||||
|
||||
final AtomicBoolean slowSyncHookCalled = new AtomicBoolean();
|
||||
log.registerWALActionsListener(new WALActionsListener() {
|
||||
@Override
|
||||
public void logRollRequested(WALActionsListener.RollRequestReason reason) {
|
||||
switch (reason) {
|
||||
case SLOW_SYNC:
|
||||
slowSyncHookCalled.lazySet(true);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Write some data
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
writeData(table, row++);
|
||||
}
|
||||
|
||||
assertFalse("Should not have triggered log roll due to SLOW_SYNC",
|
||||
slowSyncHookCalled.get());
|
||||
|
||||
// Set up for test
|
||||
slowSyncHookCalled.set(false);
|
||||
|
||||
// Wrap the current writer with the anonymous class below that adds 200 ms of
|
||||
// latency to any sync on the hlog. This should be more than sufficient to trigger
|
||||
// slow sync warnings.
|
||||
final Writer oldWriter1 = log.getWriter();
|
||||
final Writer newWriter1 = new Writer() {
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
oldWriter1.close();
|
||||
}
|
||||
@Override
|
||||
public void sync(boolean forceSync) throws IOException {
|
||||
try {
|
||||
Thread.sleep(200);
|
||||
} catch (InterruptedException e) {
|
||||
InterruptedIOException ex = new InterruptedIOException();
|
||||
ex.initCause(e);
|
||||
throw ex;
|
||||
}
|
||||
oldWriter1.sync(forceSync);
|
||||
}
|
||||
@Override
|
||||
public void append(Entry entry) throws IOException {
|
||||
oldWriter1.append(entry);
|
||||
}
|
||||
@Override
|
||||
public long getLength() {
|
||||
return oldWriter1.getLength();
|
||||
}
|
||||
};
|
||||
log.setWriter(newWriter1);
|
||||
|
||||
// Write some data.
|
||||
// We need to write at least 5 times, but double it. We should only request
|
||||
// a SLOW_SYNC roll once in the current interval.
|
||||
for (int i = 0; i < 10; i++) {
|
||||
writeData(table, row++);
|
||||
}
|
||||
|
||||
// Wait for our wait injecting writer to get rolled out, as needed.
|
||||
|
||||
TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate<Exception>() {
|
||||
@Override
|
||||
public boolean evaluate() throws Exception {
|
||||
return log.getWriter() != newWriter1;
|
||||
}
|
||||
@Override
|
||||
public String explainFailure() throws Exception {
|
||||
return "Waited too long for our test writer to get rolled out";
|
||||
}
|
||||
});
|
||||
|
||||
assertTrue("Should have triggered log roll due to SLOW_SYNC",
|
||||
slowSyncHookCalled.get());
|
||||
|
||||
// Set up for test
|
||||
slowSyncHookCalled.set(false);
|
||||
|
||||
// Wrap the current writer with the anonymous class below that adds 5000 ms of
|
||||
// latency to any sync on the hlog.
|
||||
// This will trip the other threshold.
|
||||
final Writer oldWriter2 = (Writer)log.getWriter();
|
||||
final Writer newWriter2 = new Writer() {
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
oldWriter2.close();
|
||||
}
|
||||
@Override
|
||||
public void sync(boolean forceSync) throws IOException {
|
||||
try {
|
||||
Thread.sleep(5000);
|
||||
} catch (InterruptedException e) {
|
||||
InterruptedIOException ex = new InterruptedIOException();
|
||||
ex.initCause(e);
|
||||
throw ex;
|
||||
}
|
||||
oldWriter2.sync(forceSync);
|
||||
}
|
||||
@Override
|
||||
public void append(Entry entry) throws IOException {
|
||||
oldWriter2.append(entry);
|
||||
}
|
||||
@Override
|
||||
public long getLength() {
|
||||
return oldWriter2.getLength();
|
||||
}
|
||||
};
|
||||
log.setWriter(newWriter2);
|
||||
|
||||
// Write some data. Should only take one sync.
|
||||
|
||||
writeData(table, row++);
|
||||
|
||||
// Wait for our wait injecting writer to get rolled out, as needed.
|
||||
|
||||
TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate<Exception>() {
|
||||
@Override
|
||||
public boolean evaluate() throws Exception {
|
||||
return log.getWriter() != newWriter2;
|
||||
}
|
||||
@Override
|
||||
public String explainFailure() throws Exception {
|
||||
return "Waited too long for our test writer to get rolled out";
|
||||
}
|
||||
});
|
||||
|
||||
assertTrue("Should have triggered log roll due to SLOW_SYNC",
|
||||
slowSyncHookCalled.get());
|
||||
|
||||
// Set up for test
|
||||
slowSyncHookCalled.set(false);
|
||||
|
||||
// Write some data
|
||||
for (int i = 0; i < 10; i++) {
|
||||
writeData(table, row++);
|
||||
}
|
||||
|
||||
assertFalse("Should not have triggered log roll due to SLOW_SYNC",
|
||||
slowSyncHookCalled.get());
|
||||
|
||||
} finally {
|
||||
table.close();
|
||||
}
|
||||
}
|
||||
|
||||
void batchWriteAndWait(Table table, final FSHLog log, int start, boolean expect, int timeout)
|
||||
|
@ -147,9 +323,13 @@ public class TestLogRolling extends AbstractTestLogRolling {
|
|||
|
||||
log.registerWALActionsListener(new WALActionsListener() {
|
||||
@Override
|
||||
public void logRollRequested(boolean lowReplication) {
|
||||
if (lowReplication) {
|
||||
lowReplicationHookCalled.lazySet(true);
|
||||
public void logRollRequested(WALActionsListener.RollRequestReason reason) {
|
||||
switch (reason) {
|
||||
case LOW_REPLICATION:
|
||||
lowReplicationHookCalled.lazySet(true);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
|
|
@ -41,13 +41,21 @@ public class TestMetricsWAL {
|
|||
public void testLogRollRequested() throws Exception {
|
||||
MetricsWALSource source = mock(MetricsWALSourceImpl.class);
|
||||
MetricsWAL metricsWAL = new MetricsWAL(source);
|
||||
metricsWAL.logRollRequested(false);
|
||||
metricsWAL.logRollRequested(true);
|
||||
metricsWAL.logRollRequested(WALActionsListener.RollRequestReason.ERROR);
|
||||
metricsWAL.logRollRequested(WALActionsListener.RollRequestReason.LOW_REPLICATION);
|
||||
metricsWAL.logRollRequested(WALActionsListener.RollRequestReason.SLOW_SYNC);
|
||||
metricsWAL.logRollRequested(WALActionsListener.RollRequestReason.SIZE);
|
||||
|
||||
// Log roll was requested twice
|
||||
verify(source, times(2)).incrementLogRollRequested();
|
||||
// Log roll was requested four times
|
||||
verify(source, times(4)).incrementLogRollRequested();
|
||||
// One was because of an IO error.
|
||||
verify(source, times(1)).incrementErrorLogRoll();
|
||||
// One was because of low replication on the hlog.
|
||||
verify(source, times(1)).incrementLowReplicationLogRoll();
|
||||
// One was because of slow sync on the hlog.
|
||||
verify(source, times(1)).incrementSlowSyncLogRoll();
|
||||
// One was because of hlog file length limit.
|
||||
verify(source, times(1)).incrementSizeLogRoll();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue