diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSource.java index fbe95f51dbd..7dc27f60973 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSource.java @@ -60,10 +60,19 @@ public interface MetricsWALSource extends BaseSource { String SYNC_TIME = "syncTime"; String SYNC_TIME_DESC = "The time it took to sync the WAL to HDFS."; String ROLL_REQUESTED = "rollRequest"; - String ROLL_REQUESTED_DESC = "How many times a log roll has been requested total"; + String ROLL_REQUESTED_DESC = "How many times a roll has been requested total"; + String ERROR_ROLL_REQUESTED = "errorRollRequest"; + String ERROR_ROLL_REQUESTED_DESC = + "How many times a roll was requested due to I/O or other errors."; String LOW_REPLICA_ROLL_REQUESTED = "lowReplicaRollRequest"; String LOW_REPLICA_ROLL_REQUESTED_DESC = - "How many times a log roll was requested due to too few DN's in the write pipeline."; + "How many times a roll was requested due to too few datanodes in the write pipeline."; + String SLOW_SYNC_ROLL_REQUESTED = "slowSyncRollRequest"; + String SLOW_SYNC_ROLL_REQUESTED_DESC = + "How many times a roll was requested due to sync too slow on the write pipeline."; + String SIZE_ROLL_REQUESTED = "sizeRollRequest"; + String SIZE_ROLL_REQUESTED_DESC = + "How many times a roll was requested due to file size roll threshold."; String WRITTEN_BYTES = "writtenBytes"; String WRITTEN_BYTES_DESC = "Size (in bytes) of the data written to the WAL."; @@ -94,10 +103,16 @@ public interface MetricsWALSource extends BaseSource { void incrementLogRollRequested(); + void incrementErrorLogRoll(); + void incrementLowReplicationLogRoll(); long getSlowAppendCount(); + void incrementSlowSyncLogRoll(); + + void incrementSizeLogRoll(); + void incrementWrittenBytes(long val); long getWrittenBytes(); diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSourceImpl.java index 2f35d4c1301..eb605c50362 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSourceImpl.java @@ -38,7 +38,10 @@ public class MetricsWALSourceImpl extends BaseSourceImpl implements MetricsWALSo private final MutableFastCounter appendCount; private final MutableFastCounter slowAppendCount; private final MutableFastCounter logRollRequested; - private final MutableFastCounter lowReplicationLogRollRequested; + private final MutableFastCounter errorRollRequested; + private final MutableFastCounter lowReplicationRollRequested; + private final MutableFastCounter slowSyncRollRequested; + private final MutableFastCounter sizeRollRequested; private final MutableFastCounter writtenBytes; public MetricsWALSourceImpl() { @@ -60,8 +63,14 @@ public class MetricsWALSourceImpl extends BaseSourceImpl implements MetricsWALSo syncTimeHisto = this.getMetricsRegistry().newTimeHistogram(SYNC_TIME, SYNC_TIME_DESC); logRollRequested = this.getMetricsRegistry().newCounter(ROLL_REQUESTED, ROLL_REQUESTED_DESC, 0L); - lowReplicationLogRollRequested = this.getMetricsRegistry() + errorRollRequested = this.getMetricsRegistry() + .newCounter(ERROR_ROLL_REQUESTED, ERROR_ROLL_REQUESTED_DESC, 0L); + lowReplicationRollRequested = this.getMetricsRegistry() .newCounter(LOW_REPLICA_ROLL_REQUESTED, LOW_REPLICA_ROLL_REQUESTED_DESC, 0L); + slowSyncRollRequested = this.getMetricsRegistry() + .newCounter(SLOW_SYNC_ROLL_REQUESTED, SLOW_SYNC_ROLL_REQUESTED_DESC, 0L); + sizeRollRequested = this.getMetricsRegistry() + .newCounter(SIZE_ROLL_REQUESTED, SIZE_ROLL_REQUESTED_DESC, 0L); writtenBytes = this.getMetricsRegistry().newCounter(WRITTEN_BYTES, WRITTEN_BYTES_DESC, 0L); } @@ -95,9 +104,24 @@ public class MetricsWALSourceImpl extends BaseSourceImpl implements MetricsWALSo logRollRequested.incr(); } + @Override + public void incrementErrorLogRoll() { + errorRollRequested.incr(); + } + @Override public void incrementLowReplicationLogRoll() { - lowReplicationLogRollRequested.incr(); + lowReplicationRollRequested.incr(); + } + + @Override + public void incrementSlowSyncLogRoll() { + slowSyncRollRequested.incr(); + } + + @Override + public void incrementSizeLogRoll() { + sizeRollRequested.incr(); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java index 05a8fdf778c..8adb9b46805 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java @@ -73,7 +73,7 @@ public class LogRoller extends HasThread implements Closeable { if (null == walNeedsRoll.putIfAbsent(wal, Boolean.FALSE)) { wal.registerWALActionsListener(new WALActionsListener() { @Override - public void logRollRequested(boolean lowReplicas) { + public void logRollRequested(WALActionsListener.RollRequestReason reason) { walNeedsRoll.put(wal, Boolean.TRUE); // TODO logs will contend with each other here, replace with e.g. DelayedQueue synchronized(rollLog) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index 6c8cbfa2e4d..ad2eec61301 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hbase.regionserver.wal; +import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR; +import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.LOW_REPLICATION; +import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SLOW_SYNC; import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER; import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument; import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull; @@ -116,14 +119,19 @@ public abstract class AbstractFSWAL implements WAL { /** Don't log blocking regions more frequently than this. */ private static final long SURVIVED_TOO_LONG_LOG_INTERVAL_NS = TimeUnit.MINUTES.toNanos(5); - private static final String SLOW_SYNC_TIME_MS ="hbase.regionserver.hlog.slowsync.ms"; + protected static final String SLOW_SYNC_TIME_MS ="hbase.regionserver.wal.slowsync.ms"; protected static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms - - private static final String ROLL_ON_SYNC_TIME_MS = "hbase.regionserver.hlog.roll.on.sync.ms"; + protected static final String ROLL_ON_SYNC_TIME_MS = "hbase.regionserver.wal.roll.on.sync.ms"; protected static final int DEFAULT_ROLL_ON_SYNC_TIME_MS = 10000; // in ms + protected static final String SLOW_SYNC_ROLL_THRESHOLD = + "hbase.regionserver.wal.slowsync.roll.threshold"; + protected static final int DEFAULT_SLOW_SYNC_ROLL_THRESHOLD = 100; // 100 slow sync warnings + protected static final String SLOW_SYNC_ROLL_INTERVAL_MS = + "hbase.regionserver.wal.slowsync.roll.interval.ms"; + protected static final int DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS = 60 * 1000; // in ms, 1 minute - private static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.hlog.sync.timeout"; - private static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min + protected static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.wal.sync.timeout"; + protected static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min /** * file system instance @@ -179,6 +187,9 @@ public abstract class AbstractFSWAL implements WAL { /** The slow sync will be logged; the very slow sync will cause the WAL to be rolled. */ protected final long slowSyncNs, rollOnSyncNs; + protected final int slowSyncRollThreshold; + protected final int slowSyncCheckInterval; + protected final AtomicInteger slowSyncCount = new AtomicInteger(); private final long walSyncTimeoutNs; @@ -237,7 +248,10 @@ public abstract class AbstractFSWAL implements WAL { volatile W writer; // Last time to check low replication on hlog's pipeline - private long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime(); + private volatile long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime(); + + // Last time we asked to roll the log due to a slow sync + private volatile long lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime(); protected volatile boolean closed = false; @@ -301,6 +315,8 @@ public abstract class AbstractFSWAL implements WAL { */ protected final String implClassName; + protected volatile boolean rollRequested; + public long getFilenum() { return this.filenum.get(); } @@ -435,13 +451,16 @@ public abstract class AbstractFSWAL implements WAL { LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize=" + StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix=" + walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir); - this.slowSyncNs = TimeUnit.MILLISECONDS.toNanos( - conf.getInt(SLOW_SYNC_TIME_MS, DEFAULT_SLOW_SYNC_TIME_MS)); - this.rollOnSyncNs = TimeUnit.MILLISECONDS.toNanos( - conf.getInt(ROLL_ON_SYNC_TIME_MS, DEFAULT_ROLL_ON_SYNC_TIME_MS)); - this.walSyncTimeoutNs = TimeUnit.MILLISECONDS.toNanos( - conf.getLong(WAL_SYNC_TIMEOUT_MS, DEFAULT_WAL_SYNC_TIMEOUT_MS)); - + this.slowSyncNs = TimeUnit.MILLISECONDS.toNanos(conf.getInt(SLOW_SYNC_TIME_MS, + DEFAULT_SLOW_SYNC_TIME_MS)); + this.rollOnSyncNs = TimeUnit.MILLISECONDS.toNanos(conf.getInt(ROLL_ON_SYNC_TIME_MS, + DEFAULT_ROLL_ON_SYNC_TIME_MS)); + this.slowSyncRollThreshold = conf.getInt(SLOW_SYNC_ROLL_THRESHOLD, + DEFAULT_SLOW_SYNC_ROLL_THRESHOLD); + this.slowSyncCheckInterval = conf.getInt(SLOW_SYNC_ROLL_INTERVAL_MS, + DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS); + this.walSyncTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(WAL_SYNC_TIMEOUT_MS, + DEFAULT_WAL_SYNC_TIMEOUT_MS)); this.cachedSyncFutures = new ThreadLocal() { @Override protected SyncFuture initialValue() { @@ -814,11 +833,16 @@ public abstract class AbstractFSWAL implements WAL { tellListenersAboutPreLogRoll(oldPath, newPath); // NewPath could be equal to oldPath if replaceWriter fails. newPath = replaceWriter(oldPath, newPath, nextWriter); + // Reset rollRequested status + rollRequested = false; tellListenersAboutPostLogRoll(oldPath, newPath); if (LOG.isDebugEnabled()) { LOG.debug("Create new " + implClassName + " writer with pipeline: " + Arrays.toString(getPipeline())); } + // We got a new writer, so reset the slow sync count + lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime(); + slowSyncCount.set(0); // Can we delete any of the old log files? if (getNumRolledLogFiles() > 0) { cleanOldLogs(); @@ -845,7 +869,7 @@ public abstract class AbstractFSWAL implements WAL { // public only until class moves to o.a.h.h.wal public void requestLogRoll() { - requestLogRoll(false); + requestLogRoll(ERROR); } /** @@ -925,10 +949,19 @@ public abstract class AbstractFSWAL implements WAL { return cachedSyncFutures.get().reset(sequence); } - protected final void requestLogRoll(boolean tooFewReplicas) { + protected boolean isLogRollRequested() { + return rollRequested; + } + + protected final void requestLogRoll(final WALActionsListener.RollRequestReason reason) { + // If we have already requested a roll, don't do it again + if (rollRequested) { + return; + } if (!this.listeners.isEmpty()) { + rollRequested = true; // No point to assert this unless there is a registered listener for (WALActionsListener i : this.listeners) { - i.logRollRequested(tooFewReplicas); + i.logRollRequested(reason); } } } @@ -997,23 +1030,32 @@ public abstract class AbstractFSWAL implements WAL { return len; } - protected final boolean postSync(long timeInNanos, int handlerSyncs) { + protected final void postSync(long timeInNanos, int handlerSyncs) { if (timeInNanos > this.slowSyncNs) { - String msg = new StringBuilder().append("Slow sync cost: ").append(timeInNanos / 1000000) - .append(" ms, current pipeline: ").append(Arrays.toString(getPipeline())).toString(); + String msg = new StringBuilder().append("Slow sync cost: ") + .append(TimeUnit.NANOSECONDS.toMillis(timeInNanos)) + .append(" ms, current pipeline: ") + .append(Arrays.toString(getPipeline())).toString(); TraceUtil.addTimelineAnnotation(msg); LOG.info(msg); + if (timeInNanos > this.rollOnSyncNs) { + // A single sync took too long. + // Elsewhere in checkSlowSync, called from checkLogRoll, we will look at cumulative + // effects. Here we have a single data point that indicates we should take immediate + // action, so do so. + LOG.warn("Requesting log roll because we exceeded slow sync threshold; time=" + + TimeUnit.NANOSECONDS.toMillis(timeInNanos) + " ms, threshold=" + + TimeUnit.NANOSECONDS.toMillis(rollOnSyncNs) + " ms, current pipeline: " + + Arrays.toString(getPipeline())); + requestLogRoll(SLOW_SYNC); + } + slowSyncCount.incrementAndGet(); // it's fine to unconditionally increment this } if (!listeners.isEmpty()) { for (WALActionsListener listener : listeners) { listener.postSync(timeInNanos, handlerSyncs); } } - if (timeInNanos > this.rollOnSyncNs) { - LOG.info("Trying to request a roll due to a very long sync ({} ms)", timeInNanos / 1000000); - return true; - } - return false; } protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key, @@ -1093,6 +1135,41 @@ public abstract class AbstractFSWAL implements WAL { protected abstract boolean doCheckLogLowReplication(); + /** + * @return true if we exceeded the slow sync roll threshold over the last check + * interval + */ + protected boolean doCheckSlowSync() { + boolean result = false; + long now = EnvironmentEdgeManager.currentTime(); + long elapsedTime = now - lastTimeCheckSlowSync; + if (elapsedTime >= slowSyncCheckInterval) { + if (slowSyncCount.get() >= slowSyncRollThreshold) { + if (elapsedTime >= (2 * slowSyncCheckInterval)) { + // If two or more slowSyncCheckInterval have elapsed this is a corner case + // where a train of slow syncs almost triggered us but then there was a long + // interval from then until the one more that pushed us over. If so, we + // should do nothing and let the count reset. + if (LOG.isDebugEnabled()) { + LOG.debug("checkSlowSync triggered but we decided to ignore it; " + + "count=" + slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold + + ", elapsedTime=" + elapsedTime + " ms, slowSyncCheckInterval=" + + slowSyncCheckInterval + " ms"); + } + // Fall through to count reset below + } else { + LOG.warn("Requesting log roll because we exceeded slow sync threshold; count=" + + slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold + + ", current pipeline: " + Arrays.toString(getPipeline())); + result = true; + } + } + lastTimeCheckSlowSync = now; + slowSyncCount.set(0); + } + return result; + } + public void checkLogLowReplication(long checkInterval) { long now = EnvironmentEdgeManager.currentTime(); if (now - lastTimeCheckLowReplication < checkInterval) { @@ -1105,7 +1182,7 @@ public abstract class AbstractFSWAL implements WAL { try { lastTimeCheckLowReplication = now; if (doCheckLogLowReplication()) { - requestLogRoll(true); + requestLogRoll(LOW_REPLICATION); } } finally { rollWriterLock.unlock(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index 553ff3d1668..209ace68425 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.regionserver.wal; +import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR; +import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SIZE; import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import com.lmax.disruptor.RingBuffer; @@ -167,9 +169,6 @@ public class AsyncFSWAL extends AbstractFSWAL { // notice that, modification to this field is only allowed under the protection of consumeLock. private volatile int epochAndState; - // used to guard the log roll request when we exceed the log roll size. - private boolean rollRequested; - private boolean readyForRolling; private final Condition readyForRollingCond = consumeLock.newCondition(); @@ -317,7 +316,7 @@ public class AsyncFSWAL extends AbstractFSWAL { highestUnsyncedTxid = highestSyncedTxid.get(); if (shouldRequestLogRoll) { // request a roll. - requestLogRoll(); + requestLogRoll(ERROR); } } @@ -330,18 +329,20 @@ public class AsyncFSWAL extends AbstractFSWAL { break; } } - - boolean doRequestRoll = postSync(System.nanoTime() - startTimeNs, finishSync(true)); + postSync(System.nanoTime() - startTimeNs, finishSync(true)); if (trySetReadyForRolling()) { // we have just finished a roll, then do not need to check for log rolling, the writer will be // closed soon. return; } - if ((!doRequestRoll && writer.getLength() < logrollsize) || rollRequested) { - return; + // If we haven't already requested a roll, check if we have exceeded logrollsize + if (!isLogRollRequested() && writer.getLength() > logrollsize) { + if (LOG.isDebugEnabled()) { + LOG.debug("Requesting log roll because of file size threshold; length=" + + writer.getLength() + ", logrollsize=" + logrollsize); + } + requestLogRoll(SIZE); } - rollRequested = true; - requestLogRoll(); } private void sync(AsyncWriter writer) { @@ -716,7 +717,6 @@ public class AsyncFSWAL extends AbstractFSWAL { this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput(); } this.fileLengthAtLastSync = nextWriter.getLength(); - this.rollRequested = false; this.highestProcessedAppendTxidAtLastSync = 0L; consumeLock.lock(); try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 68cd3389845..44e919be0af 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -17,6 +17,11 @@ */ package org.apache.hadoop.hbase.regionserver.wal; +import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR; +import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.LOW_REPLICATION; +import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SIZE; +import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SLOW_SYNC; + import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.ExceptionHandler; @@ -257,10 +262,7 @@ public class FSHLog extends AbstractFSWAL { long startTimeNanos = System.nanoTime(); try { nextWriter.sync(useHsync); - boolean doRequestRoll = postSync(System.nanoTime() - startTimeNanos, 0); - if (doRequestRoll) { - LOG.info("Ignoring a roll request after a sync for a new file"); - } + postSync(System.nanoTime() - startTimeNanos, 0); } catch (IOException e) { // optimization failed, no need to abort here. LOG.warn("pre-sync failed but an optimization so keep going", e); @@ -600,16 +602,12 @@ public class FSHLog extends AbstractFSWAL { // Can we release other syncs? syncCount += releaseSyncFutures(currentSequence, lastException); if (lastException != null) { - wasRollRequested = true; - requestLogRoll(); + requestLogRoll(ERROR); } else { - wasRollRequested = checkLogRoll(); + checkLogRoll(); } } - boolean doRequestRoll = postSync(System.nanoTime() - start, syncCount); - if (!wasRollRequested && doRequestRoll) { - requestLogRoll(); - } + postSync(System.nanoTime() - start, syncCount); } catch (InterruptedException e) { // Presume legit interrupt. Thread.currentThread().interrupt(); @@ -624,20 +622,35 @@ public class FSHLog extends AbstractFSWAL { * Schedule a log roll if needed. */ private boolean checkLogRoll() { + // If we have already requested a roll, do nothing + if (isLogRollRequested()) { + return false; + } // Will return immediately if we are in the middle of a WAL log roll currently. if (!rollWriterLock.tryLock()) { return false; } - boolean lowReplication; try { - lowReplication = doCheckLogLowReplication(); + if (doCheckLogLowReplication()) { + LOG.warn("Requesting log roll because of low replication, current pipeline: " + + Arrays.toString(getPipeline())); + requestLogRoll(LOW_REPLICATION); + return true; + } else if (writer != null && writer.getLength() > logrollsize) { + if (LOG.isDebugEnabled()) { + LOG.debug("Requesting log roll because of file size threshold; length=" + + writer.getLength() + ", logrollsize=" + logrollsize); + } + requestLogRoll(SIZE); + return true; + } else if (doCheckSlowSync()) { + // We log this already in checkSlowSync + requestLogRoll(SLOW_SYNC); + return true; + } } finally { rollWriterLock.unlock(); } - if (lowReplication || (writer != null && writer.getLength() > logrollsize)) { - requestLogRoll(lowReplication); - return true; - } return false; } @@ -774,8 +787,8 @@ public class FSHLog extends AbstractFSWAL { } public static final long FIXED_OVERHEAD = ClassSize - .align(ClassSize.OBJECT + (5 * ClassSize.REFERENCE) + ClassSize.ATOMIC_INTEGER - + Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG)); + .align(ClassSize.OBJECT + (5 * ClassSize.REFERENCE) + (2 * ClassSize.ATOMIC_INTEGER) + + (3 * Bytes.SIZEOF_INT) + (4 * Bytes.SIZEOF_LONG)); /** * This class is used coordinating two threads holding one thread at a 'safe point' while the @@ -1023,7 +1036,7 @@ public class FSHLog extends AbstractFSWAL { this.syncFuturesCount.get()); } catch (Exception e) { // Should NEVER get here. - requestLogRoll(); + requestLogRoll(ERROR); this.exception = new DamagedWALException("Failed offering sync", e); } } @@ -1090,7 +1103,7 @@ public class FSHLog extends AbstractFSWAL { String msg = "Append sequenceId=" + entry.getKey().getSequenceId() + ", requesting roll of WAL"; LOG.warn(msg, e); - requestLogRoll(); + requestLogRoll(ERROR); throw new DamagedWALException(msg, e); } } @@ -1122,4 +1135,14 @@ public class FSHLog extends AbstractFSWAL { } return new DatanodeInfo[0]; } + + @VisibleForTesting + Writer getWriter() { + return this.writer; + } + + @VisibleForTesting + void setWriter(Writer writer) { + this.writer = writer; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java index 900e55fc710..b2af4a80ad3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java @@ -73,10 +73,23 @@ public class MetricsWAL implements WALActionsListener { } @Override - public void logRollRequested(boolean underReplicated) { + public void logRollRequested(WALActionsListener.RollRequestReason reason) { source.incrementLogRollRequested(); - if (underReplicated) { - source.incrementLowReplicationLogRoll(); + switch (reason) { + case ERROR: + source.incrementErrorLogRoll(); + break; + case LOW_REPLICATION: + source.incrementLowReplicationLogRoll(); + break; + case SIZE: + source.incrementSizeLogRoll(); + break; + case SLOW_SYNC: + source.incrementSlowSyncLogRoll(); + break; + default: + break; } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java index 13ffac75627..7fba7dfd4b5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java @@ -32,6 +32,18 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private public interface WALActionsListener { + /** The reason for the log roll request. */ + static enum RollRequestReason { + /** The length of the log exceeds the roll size threshold. */ + SIZE, + /** Too few replicas in the writer pipeline. */ + LOW_REPLICATION, + /** Too much time spent waiting for sync. */ + SLOW_SYNC, + /** I/O or other error. */ + ERROR + }; + /** * The WAL is going to be rolled. The oldPath can be null if this is * the first log file from the regionserver. @@ -65,7 +77,7 @@ public interface WALActionsListener { /** * A request was made that the WAL be rolled. */ - default void logRollRequested(boolean tooFewReplicas) {} + default void logRollRequested(RollRequestReason reason) {} /** * The WAL is about to close. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index 75439fe6c52..5f787fef599 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -119,7 +119,7 @@ class DisabledWALProvider implements WALProvider { public byte[][] rollWriter() { if (!listeners.isEmpty()) { for (WALActionsListener listener : listeners) { - listener.logRollRequested(false); + listener.logRollRequested(WALActionsListener.RollRequestReason.ERROR); } for (WALActionsListener listener : listeners) { try { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java index e19361e2003..6c257e250bd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue; import java.io.EOFException; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -35,6 +36,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionInfo; @@ -53,7 +55,9 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.junit.BeforeClass; @@ -91,6 +95,178 @@ public class TestLogRolling extends AbstractTestLogRolling { conf.setInt("hbase.regionserver.hlog.lowreplication.rolllimit", 3); conf.set(WALFactory.WAL_PROVIDER, "filesystem"); AbstractTestLogRolling.setUpBeforeClass(); + + // For slow sync threshold test: roll after 5 slow syncs in 10 seconds + TEST_UTIL.getConfiguration().setInt(FSHLog.SLOW_SYNC_ROLL_THRESHOLD, 5); + TEST_UTIL.getConfiguration().setInt(FSHLog.SLOW_SYNC_ROLL_INTERVAL_MS, 10 * 1000); + // For slow sync threshold test: roll once after a sync above this threshold + TEST_UTIL.getConfiguration().setInt(FSHLog.ROLL_ON_SYNC_TIME_MS, 5000); + } + + @Test + public void testSlowSyncLogRolling() throws Exception { + // Create the test table + TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName())) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); + admin.createTable(desc); + Table table = TEST_UTIL.getConnection().getTable(desc.getTableName()); + int row = 1; + try { + // Get a reference to the FSHLog + server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); + RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); + final FSHLog log = (FSHLog) server.getWAL(region); + + // Register a WALActionsListener to observe if a SLOW_SYNC roll is requested + + final AtomicBoolean slowSyncHookCalled = new AtomicBoolean(); + log.registerWALActionsListener(new WALActionsListener() { + @Override + public void logRollRequested(WALActionsListener.RollRequestReason reason) { + switch (reason) { + case SLOW_SYNC: + slowSyncHookCalled.lazySet(true); + break; + default: + break; + } + } + }); + + // Write some data + + for (int i = 0; i < 10; i++) { + writeData(table, row++); + } + + assertFalse("Should not have triggered log roll due to SLOW_SYNC", + slowSyncHookCalled.get()); + + // Set up for test + slowSyncHookCalled.set(false); + + // Wrap the current writer with the anonymous class below that adds 200 ms of + // latency to any sync on the hlog. This should be more than sufficient to trigger + // slow sync warnings. + final Writer oldWriter1 = log.getWriter(); + final Writer newWriter1 = new Writer() { + @Override + public void close() throws IOException { + oldWriter1.close(); + } + @Override + public void sync(boolean forceSync) throws IOException { + try { + Thread.sleep(200); + } catch (InterruptedException e) { + InterruptedIOException ex = new InterruptedIOException(); + ex.initCause(e); + throw ex; + } + oldWriter1.sync(forceSync); + } + @Override + public void append(Entry entry) throws IOException { + oldWriter1.append(entry); + } + @Override + public long getLength() { + return oldWriter1.getLength(); + } + }; + log.setWriter(newWriter1); + + // Write some data. + // We need to write at least 5 times, but double it. We should only request + // a SLOW_SYNC roll once in the current interval. + for (int i = 0; i < 10; i++) { + writeData(table, row++); + } + + // Wait for our wait injecting writer to get rolled out, as needed. + + TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate() { + @Override + public boolean evaluate() throws Exception { + return log.getWriter() != newWriter1; + } + @Override + public String explainFailure() throws Exception { + return "Waited too long for our test writer to get rolled out"; + } + }); + + assertTrue("Should have triggered log roll due to SLOW_SYNC", + slowSyncHookCalled.get()); + + // Set up for test + slowSyncHookCalled.set(false); + + // Wrap the current writer with the anonymous class below that adds 5000 ms of + // latency to any sync on the hlog. + // This will trip the other threshold. + final Writer oldWriter2 = (Writer)log.getWriter(); + final Writer newWriter2 = new Writer() { + @Override + public void close() throws IOException { + oldWriter2.close(); + } + @Override + public void sync(boolean forceSync) throws IOException { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + InterruptedIOException ex = new InterruptedIOException(); + ex.initCause(e); + throw ex; + } + oldWriter2.sync(forceSync); + } + @Override + public void append(Entry entry) throws IOException { + oldWriter2.append(entry); + } + @Override + public long getLength() { + return oldWriter2.getLength(); + } + }; + log.setWriter(newWriter2); + + // Write some data. Should only take one sync. + + writeData(table, row++); + + // Wait for our wait injecting writer to get rolled out, as needed. + + TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate() { + @Override + public boolean evaluate() throws Exception { + return log.getWriter() != newWriter2; + } + @Override + public String explainFailure() throws Exception { + return "Waited too long for our test writer to get rolled out"; + } + }); + + assertTrue("Should have triggered log roll due to SLOW_SYNC", + slowSyncHookCalled.get()); + + // Set up for test + slowSyncHookCalled.set(false); + + // Write some data + for (int i = 0; i < 10; i++) { + writeData(table, row++); + } + + assertFalse("Should not have triggered log roll due to SLOW_SYNC", + slowSyncHookCalled.get()); + + } finally { + table.close(); + } } void batchWriteAndWait(Table table, final FSHLog log, int start, boolean expect, int timeout) @@ -147,9 +323,13 @@ public class TestLogRolling extends AbstractTestLogRolling { log.registerWALActionsListener(new WALActionsListener() { @Override - public void logRollRequested(boolean lowReplication) { - if (lowReplication) { - lowReplicationHookCalled.lazySet(true); + public void logRollRequested(WALActionsListener.RollRequestReason reason) { + switch (reason) { + case LOW_REPLICATION: + lowReplicationHookCalled.lazySet(true); + break; + default: + break; } } }); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java index c0d34166de4..1c324dd8856 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java @@ -41,13 +41,21 @@ public class TestMetricsWAL { public void testLogRollRequested() throws Exception { MetricsWALSource source = mock(MetricsWALSourceImpl.class); MetricsWAL metricsWAL = new MetricsWAL(source); - metricsWAL.logRollRequested(false); - metricsWAL.logRollRequested(true); + metricsWAL.logRollRequested(WALActionsListener.RollRequestReason.ERROR); + metricsWAL.logRollRequested(WALActionsListener.RollRequestReason.LOW_REPLICATION); + metricsWAL.logRollRequested(WALActionsListener.RollRequestReason.SLOW_SYNC); + metricsWAL.logRollRequested(WALActionsListener.RollRequestReason.SIZE); - // Log roll was requested twice - verify(source, times(2)).incrementLogRollRequested(); + // Log roll was requested four times + verify(source, times(4)).incrementLogRollRequested(); + // One was because of an IO error. + verify(source, times(1)).incrementErrorLogRoll(); // One was because of low replication on the hlog. verify(source, times(1)).incrementLowReplicationLogRoll(); + // One was because of slow sync on the hlog. + verify(source, times(1)).incrementSlowSyncLogRoll(); + // One was because of hlog file length limit. + verify(source, times(1)).incrementSizeLogRoll(); } @Test