HBASE-22301 Consider rolling the WAL if the HDFS write pipeline is slow

This commit is contained in:
Andrew Purtell 2019-04-30 15:22:54 -07:00
parent 36b4c0fc6f
commit 26b9e76bbf
No known key found for this signature in database
GPG Key ID: 8597754DD5365CCD
9 changed files with 419 additions and 54 deletions

View File

@ -58,10 +58,19 @@ public interface MetricsWALSource extends BaseSource {
String SYNC_TIME = "syncTime"; String SYNC_TIME = "syncTime";
String SYNC_TIME_DESC = "The time it took to sync the WAL to HDFS."; String SYNC_TIME_DESC = "The time it took to sync the WAL to HDFS.";
String ROLL_REQUESTED = "rollRequest"; String ROLL_REQUESTED = "rollRequest";
String ROLL_REQUESTED_DESC = "How many times a log roll has been requested total"; String ROLL_REQUESTED_DESC = "How many times a roll has been requested total";
String ERROR_ROLL_REQUESTED = "errorRollRequest";
String ERROR_ROLL_REQUESTED_DESC =
"How many times a roll was requested due to I/O or other errors.";
String LOW_REPLICA_ROLL_REQUESTED = "lowReplicaRollRequest"; String LOW_REPLICA_ROLL_REQUESTED = "lowReplicaRollRequest";
String LOW_REPLICA_ROLL_REQUESTED_DESC = String LOW_REPLICA_ROLL_REQUESTED_DESC =
"How many times a log roll was requested due to too few DN's in the write pipeline."; "How many times a roll was requested due to too few datanodes in the write pipeline.";
String SLOW_SYNC_ROLL_REQUESTED = "slowSyncRollRequest";
String SLOW_SYNC_ROLL_REQUESTED_DESC =
"How many times a roll was requested due to sync too slow on the write pipeline.";
String SIZE_ROLL_REQUESTED = "sizeRollRequest";
String SIZE_ROLL_REQUESTED_DESC =
"How many times a roll was requested due to file size roll threshold.";
String WRITTEN_BYTES = "writtenBytes"; String WRITTEN_BYTES = "writtenBytes";
String WRITTEN_BYTES_DESC = "Size (in bytes) of the data written to the WAL."; String WRITTEN_BYTES_DESC = "Size (in bytes) of the data written to the WAL.";
@ -92,8 +101,14 @@ public interface MetricsWALSource extends BaseSource {
void incrementLogRollRequested(); void incrementLogRollRequested();
void incrementErrorLogRoll();
void incrementLowReplicationLogRoll(); void incrementLowReplicationLogRoll();
void incrementSlowSyncLogRoll();
void incrementSizeLogRoll();
void incrementWrittenBytes(long val); void incrementWrittenBytes(long val);
long getWrittenBytes(); long getWrittenBytes();

View File

@ -39,7 +39,10 @@ public class MetricsWALSourceImpl extends BaseSourceImpl implements MetricsWALSo
private final MutableFastCounter appendCount; private final MutableFastCounter appendCount;
private final MutableFastCounter slowAppendCount; private final MutableFastCounter slowAppendCount;
private final MutableFastCounter logRollRequested; private final MutableFastCounter logRollRequested;
private final MutableFastCounter lowReplicationLogRollRequested; private final MutableFastCounter errorRollRequested;
private final MutableFastCounter lowReplicationRollRequested;
private final MutableFastCounter slowSyncRollRequested;
private final MutableFastCounter sizeRollRequested;
private final MutableFastCounter writtenBytes; private final MutableFastCounter writtenBytes;
public MetricsWALSourceImpl() { public MetricsWALSourceImpl() {
@ -61,8 +64,14 @@ public class MetricsWALSourceImpl extends BaseSourceImpl implements MetricsWALSo
syncTimeHisto = this.getMetricsRegistry().newTimeHistogram(SYNC_TIME, SYNC_TIME_DESC); syncTimeHisto = this.getMetricsRegistry().newTimeHistogram(SYNC_TIME, SYNC_TIME_DESC);
logRollRequested = logRollRequested =
this.getMetricsRegistry().newCounter(ROLL_REQUESTED, ROLL_REQUESTED_DESC, 0L); this.getMetricsRegistry().newCounter(ROLL_REQUESTED, ROLL_REQUESTED_DESC, 0L);
lowReplicationLogRollRequested = this.getMetricsRegistry() errorRollRequested = this.getMetricsRegistry()
.newCounter(ERROR_ROLL_REQUESTED, ERROR_ROLL_REQUESTED_DESC, 0L);
lowReplicationRollRequested = this.getMetricsRegistry()
.newCounter(LOW_REPLICA_ROLL_REQUESTED, LOW_REPLICA_ROLL_REQUESTED_DESC, 0L); .newCounter(LOW_REPLICA_ROLL_REQUESTED, LOW_REPLICA_ROLL_REQUESTED_DESC, 0L);
slowSyncRollRequested = this.getMetricsRegistry()
.newCounter(SLOW_SYNC_ROLL_REQUESTED, SLOW_SYNC_ROLL_REQUESTED_DESC, 0L);
sizeRollRequested = this.getMetricsRegistry()
.newCounter(SIZE_ROLL_REQUESTED, SIZE_ROLL_REQUESTED_DESC, 0L);
writtenBytes = this.getMetricsRegistry().newCounter(WRITTEN_BYTES, WRITTEN_BYTES_DESC, 0l); writtenBytes = this.getMetricsRegistry().newCounter(WRITTEN_BYTES, WRITTEN_BYTES_DESC, 0l);
} }
@ -96,9 +105,24 @@ public class MetricsWALSourceImpl extends BaseSourceImpl implements MetricsWALSo
logRollRequested.incr(); logRollRequested.incr();
} }
@Override
public void incrementErrorLogRoll() {
errorRollRequested.incr();
}
@Override @Override
public void incrementLowReplicationLogRoll() { public void incrementLowReplicationLogRoll() {
lowReplicationLogRollRequested.incr(); lowReplicationRollRequested.incr();
}
@Override
public void incrementSlowSyncLogRoll() {
slowSyncRollRequested.incr();
}
@Override
public void incrementSizeLogRoll() {
sizeRollRequested.incr();
} }
@Override @Override

View File

@ -71,7 +71,7 @@ public class LogRoller extends HasThread {
if (null == walNeedsRoll.putIfAbsent(wal, Boolean.FALSE)) { if (null == walNeedsRoll.putIfAbsent(wal, Boolean.FALSE)) {
wal.registerWALActionsListener(new WALActionsListener.Base() { wal.registerWALActionsListener(new WALActionsListener.Base() {
@Override @Override
public void logRollRequested(boolean lowReplicas) { public void logRollRequested(WALActionsListener.RollRequestReason reason) {
walNeedsRoll.put(wal, Boolean.TRUE); walNeedsRoll.put(wal, Boolean.TRUE);
// TODO logs will contend with each other here, replace with e.g. DelayedQueue // TODO logs will contend with each other here, replace with e.g. DelayedQueue
synchronized(rollLog) { synchronized(rollLog) {

View File

@ -17,6 +17,10 @@
*/ */
package org.apache.hadoop.hbase.regionserver.wal; package org.apache.hadoop.hbase.regionserver.wal;
import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR;
import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.LOW_REPLICATION;
import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SIZE;
import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SLOW_SYNC;
import static org.apache.hadoop.hbase.wal.DefaultWALProvider.WAL_FILE_NAME_DELIMITER; import static org.apache.hadoop.hbase.wal.DefaultWALProvider.WAL_FILE_NAME_DELIMITER;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
@ -160,9 +164,18 @@ public class FSHLog implements WAL {
private static final Log LOG = LogFactory.getLog(FSHLog.class); private static final Log LOG = LogFactory.getLog(FSHLog.class);
private static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms static final String SLOW_SYNC_TIME_MS ="hbase.regionserver.wal.slowsync.ms";
static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms
static final String ROLL_ON_SYNC_TIME_MS = "hbase.regionserver.wal.roll.on.sync.ms";
static final int DEFAULT_ROLL_ON_SYNC_TIME_MS = 10000; // in ms
static final String SLOW_SYNC_ROLL_THRESHOLD = "hbase.regionserver.wal.slowsync.roll.threshold";
static final int DEFAULT_SLOW_SYNC_ROLL_THRESHOLD = 100; // 100 slow sync warnings
static final String SLOW_SYNC_ROLL_INTERVAL_MS =
"hbase.regionserver.wal.slowsync.roll.interval.ms";
static final int DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS = 60 * 1000; // in ms, 1 minute
private static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.wal.sync.timeout";
static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min
/** /**
* The nexus at which all incoming handlers meet. Does appends and sync with an ordering. * The nexus at which all incoming handlers meet. Does appends and sync with an ordering.
@ -280,7 +293,10 @@ public class FSHLog implements WAL {
private final boolean useHsync; private final boolean useHsync;
private final int slowSyncNs; private final long slowSyncNs, rollOnSyncNs;
private final int slowSyncRollThreshold;
private final int slowSyncCheckInterval;
private final AtomicInteger slowSyncCount = new AtomicInteger();
private final long walSyncTimeout; private final long walSyncTimeout;
@ -350,9 +366,13 @@ public class FSHLog implements WAL {
private final AtomicInteger closeErrorCount = new AtomicInteger(); private final AtomicInteger closeErrorCount = new AtomicInteger();
protected volatile boolean rollRequested;
// Last time to check low replication on hlog's pipeline // Last time to check low replication on hlog's pipeline
private volatile long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime(); private volatile long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime();
// Last time we asked to roll the log due to a slow sync
private volatile long lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime();
/** /**
* WAL Comparator; it compares the timestamp (log filenum), present in the log file name. * WAL Comparator; it compares the timestamp (log filenum), present in the log file name.
@ -540,11 +560,16 @@ public class FSHLog implements WAL {
// rollWriter sets this.hdfs_out if it can. // rollWriter sets this.hdfs_out if it can.
rollWriter(); rollWriter();
this.slowSyncNs = this.slowSyncNs = TimeUnit.MILLISECONDS.toNanos(conf.getInt(SLOW_SYNC_TIME_MS,
1000000 * conf.getInt("hbase.regionserver.hlog.slowsync.ms", conf.getInt("hbase.regionserver.hlog.slowsync.ms", DEFAULT_SLOW_SYNC_TIME_MS)));
DEFAULT_SLOW_SYNC_TIME_MS); this.rollOnSyncNs = TimeUnit.MILLISECONDS.toNanos(conf.getInt(ROLL_ON_SYNC_TIME_MS,
this.walSyncTimeout = conf.getLong("hbase.regionserver.hlog.sync.timeout", DEFAULT_ROLL_ON_SYNC_TIME_MS));
DEFAULT_WAL_SYNC_TIMEOUT_MS); this.slowSyncRollThreshold = conf.getInt(SLOW_SYNC_ROLL_THRESHOLD,
DEFAULT_SLOW_SYNC_ROLL_THRESHOLD);
this.slowSyncCheckInterval = conf.getInt(SLOW_SYNC_ROLL_INTERVAL_MS,
DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS);
this.walSyncTimeout = conf.getLong(WAL_SYNC_TIMEOUT_MS,
conf.getLong("hbase.regionserver.hlog.sync.timeout", DEFAULT_WAL_SYNC_TIMEOUT_MS));
// This is the 'writer' -- a single threaded executor. This single thread 'consumes' what is // This is the 'writer' -- a single threaded executor. This single thread 'consumes' what is
// put on the ring buffer. // put on the ring buffer.
@ -720,6 +745,11 @@ public class FSHLog implements WAL {
// NewPath could be equal to oldPath if replaceWriter fails. // NewPath could be equal to oldPath if replaceWriter fails.
newPath = replaceWriter(oldPath, newPath, nextWriter, nextHdfsOut); newPath = replaceWriter(oldPath, newPath, nextWriter, nextHdfsOut);
tellListenersAboutPostLogRoll(oldPath, newPath); tellListenersAboutPostLogRoll(oldPath, newPath);
// Reset rollRequested status
rollRequested = false;
// We got a new writer, so reset the slow sync count
lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime();
slowSyncCount.set(0);
// Can we delete any of the old log files? // Can we delete any of the old log files?
if (getNumRolledLogFiles() > 0) { if (getNumRolledLogFiles() > 0) {
cleanOldLogs(); cleanOldLogs();
@ -1303,8 +1333,11 @@ public class FSHLog implements WAL {
syncCount += releaseSyncFuture(takeSyncFuture, currentSequence, lastException); syncCount += releaseSyncFuture(takeSyncFuture, currentSequence, lastException);
// Can we release other syncs? // Can we release other syncs?
syncCount += releaseSyncFutures(currentSequence, lastException); syncCount += releaseSyncFutures(currentSequence, lastException);
if (lastException != null) requestLogRoll(); if (lastException != null) {
else checkLogRoll(); requestLogRoll();
} else {
checkLogRoll();
}
} }
postSync(System.nanoTime() - start, syncCount); postSync(System.nanoTime() - start, syncCount);
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -1321,24 +1354,37 @@ public class FSHLog implements WAL {
* Schedule a log roll if needed. * Schedule a log roll if needed.
*/ */
public void checkLogRoll() { public void checkLogRoll() {
// If we have already requested a roll, do nothing
if (isLogRollRequested()) {
return;
}
// Will return immediately if we are in the middle of a WAL log roll currently. // Will return immediately if we are in the middle of a WAL log roll currently.
if (!rollWriterLock.tryLock()) return; if (!rollWriterLock.tryLock()) {
boolean lowReplication; return;
try {
lowReplication = checkLowReplication();
} finally {
rollWriterLock.unlock();
} }
try { try {
if (lowReplication || (writer != null && writer.getLength() > logrollsize)) { if (checkLowReplication()) {
requestLogRoll(lowReplication); LOG.warn("Requesting log roll because of low replication, current pipeline: " +
Arrays.toString(getPipeLine()));
requestLogRoll(LOW_REPLICATION);
} else if (writer != null && writer.getLength() > logrollsize) {
if (LOG.isDebugEnabled()) {
LOG.debug("Requesting log roll because of file size threshold; length=" +
writer.getLength() + ", logrollsize=" + logrollsize);
}
requestLogRoll(SIZE);
} else if (checkSlowSync()) {
// We log this already in checkSlowSync
requestLogRoll(SLOW_SYNC);
} }
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Writer.getLength() failed; continuing", e); LOG.warn("Writer.getLength() failed; continuing", e);
} finally {
rollWriterLock.unlock();
} }
} }
/* /**
* @return true if number of replicas for the WAL is lower than threshold * @return true if number of replicas for the WAL is lower than threshold
*/ */
private boolean checkLowReplication() { private boolean checkLowReplication() {
@ -1389,6 +1435,41 @@ public class FSHLog implements WAL {
return logRollNeeded; return logRollNeeded;
} }
/**
* @return true if we exceeded the slow sync roll threshold over the last check
* interval
*/
private boolean checkSlowSync() {
boolean result = false;
long now = EnvironmentEdgeManager.currentTime();
long elapsedTime = now - lastTimeCheckSlowSync;
if (elapsedTime >= slowSyncCheckInterval) {
if (slowSyncCount.get() >= slowSyncRollThreshold) {
if (elapsedTime >= (2 * slowSyncCheckInterval)) {
// If two or more slowSyncCheckInterval have elapsed this is a corner case
// where a train of slow syncs almost triggered us but then there was a long
// interval from then until the one more that pushed us over. If so, we
// should do nothing and let the count reset.
if (LOG.isDebugEnabled()) {
LOG.debug("checkSlowSync triggered but we decided to ignore it; " +
"count=" + slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold +
", elapsedTime=" + elapsedTime + " ms, slowSyncCheckInterval=" +
slowSyncCheckInterval + " ms");
}
// Fall through to count reset below
} else {
LOG.warn("Requesting log roll because we exceeded slow sync threshold; count=" +
slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold +
", current pipeline: " + Arrays.toString(getPipeLine()));
result = true;
}
}
lastTimeCheckSlowSync = now;
slowSyncCount.set(0);
}
return result;
}
private SyncFuture publishSyncOnRingBuffer(long sequence) { private SyncFuture publishSyncOnRingBuffer(long sequence) {
return publishSyncOnRingBuffer(sequence, null, false); return publishSyncOnRingBuffer(sequence, null, false);
} }
@ -1452,10 +1533,23 @@ public class FSHLog implements WAL {
if (timeInNanos > this.slowSyncNs) { if (timeInNanos > this.slowSyncNs) {
String msg = String msg =
new StringBuilder().append("Slow sync cost: ") new StringBuilder().append("Slow sync cost: ")
.append(timeInNanos / 1000000).append(" ms, current pipeline: ") .append(TimeUnit.NANOSECONDS.toMillis(timeInNanos))
.append(" ms, current pipeline: ")
.append(Arrays.toString(getPipeLine())).toString(); .append(Arrays.toString(getPipeLine())).toString();
Trace.addTimelineAnnotation(msg); Trace.addTimelineAnnotation(msg);
LOG.info(msg); LOG.info(msg);
// A single sync took too long.
// Elsewhere in checkSlowSync, called from checkLogRoll, we will look at cumulative
// effects. Here we have a single data point that indicates we should take immediate
// action, so do so.
if (timeInNanos > this.rollOnSyncNs) {
LOG.warn("Requesting log roll because we exceeded slow sync threshold; time=" +
TimeUnit.NANOSECONDS.toMillis(timeInNanos) + " ms, threshold=" +
TimeUnit.NANOSECONDS.toMillis(rollOnSyncNs) + " ms, current pipeline: " +
Arrays.toString(getPipeLine()));
requestLogRoll(SLOW_SYNC);
}
slowSyncCount.incrementAndGet(); // it's fine to unconditionally increment this
} }
if (!listeners.isEmpty()) { if (!listeners.isEmpty()) {
for (WALActionsListener listener : listeners) { for (WALActionsListener listener : listeners) {
@ -1539,15 +1633,24 @@ public class FSHLog implements WAL {
} }
} }
// public only until class moves to o.a.h.h.wal protected boolean isLogRollRequested() {
public void requestLogRoll() { return rollRequested;
requestLogRoll(false);
} }
private void requestLogRoll(boolean tooFewReplicas) { // public only until class moves to o.a.h.h.wal
public void requestLogRoll() {
requestLogRoll(ERROR);
}
private void requestLogRoll(final WALActionsListener.RollRequestReason reason) {
// If we have already requested a roll, don't do it again
if (rollRequested) {
return;
}
if (!this.listeners.isEmpty()) { if (!this.listeners.isEmpty()) {
rollRequested = true; // No point to assert this unless there is a registered listener
for (WALActionsListener i: this.listeners) { for (WALActionsListener i: this.listeners) {
i.logRollRequested(tooFewReplicas); i.logRollRequested(reason);
} }
} }
} }
@ -1599,8 +1702,7 @@ public class FSHLog implements WAL {
public static final long FIXED_OVERHEAD = ClassSize.align( public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT + (5 * ClassSize.REFERENCE) + ClassSize.OBJECT + (5 * ClassSize.REFERENCE) +
ClassSize.ATOMIC_INTEGER + Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG)); ClassSize.ATOMIC_INTEGER + (3 * Bytes.SIZEOF_INT) + (4 * Bytes.SIZEOF_LONG));
private static void split(final Configuration conf, final Path p) throws IOException { private static void split(final Configuration conf, final Path p) throws IOException {
FileSystem fs = FSUtils.getWALFileSystem(conf); FileSystem fs = FSUtils.getWALFileSystem(conf);
@ -2083,4 +2185,14 @@ public class FSHLog implements WAL {
public long getLastTimeCheckLowReplication() { public long getLastTimeCheckLowReplication() {
return this.lastTimeCheckLowReplication; return this.lastTimeCheckLowReplication;
} }
@VisibleForTesting
Writer getWriter() {
return this.writer;
}
@VisibleForTesting
void setWriter(Writer writer) {
this.writer = writer;
}
} }

View File

@ -72,10 +72,23 @@ public class MetricsWAL extends WALActionsListener.Base {
} }
@Override @Override
public void logRollRequested(boolean underReplicated) { public void logRollRequested(WALActionsListener.RollRequestReason reason) {
source.incrementLogRollRequested(); source.incrementLogRollRequested();
if (underReplicated) { switch (reason) {
source.incrementLowReplicationLogRoll(); case ERROR:
source.incrementErrorLogRoll();
break;
case LOW_REPLICATION:
source.incrementLowReplicationLogRoll();
break;
case SIZE:
source.incrementSizeLogRoll();
break;
case SLOW_SYNC:
source.incrementSlowSyncLogRoll();
break;
default:
break;
} }
} }
} }

View File

@ -34,6 +34,18 @@ import org.apache.hadoop.hbase.wal.WALKey;
@InterfaceAudience.Private @InterfaceAudience.Private
public interface WALActionsListener { public interface WALActionsListener {
/** The reason for the log roll request. */
static enum RollRequestReason {
/** The length of the log exceeds the roll size threshold. */
SIZE,
/** Too few replicas in the writer pipeline. */
LOW_REPLICATION,
/** Too much time spent waiting for sync. */
SLOW_SYNC,
/** I/O or other error. */
ERROR
};
/** /**
* The WAL is going to be rolled. The oldPath can be null if this is * The WAL is going to be rolled. The oldPath can be null if this is
* the first log file from the regionserver. * the first log file from the regionserver.
@ -67,7 +79,7 @@ public interface WALActionsListener {
/** /**
* A request was made that the WAL be rolled. * A request was made that the WAL be rolled.
*/ */
void logRollRequested(boolean tooFewReplicas); void logRollRequested(RollRequestReason reason);
/** /**
* The WAL is about to close. * The WAL is about to close.
@ -130,7 +142,7 @@ public interface WALActionsListener {
public void postLogArchive(Path oldPath, Path newPath) throws IOException {} public void postLogArchive(Path oldPath, Path newPath) throws IOException {}
@Override @Override
public void logRollRequested(boolean tooFewReplicas) {} public void logRollRequested(RollRequestReason reason) {}
@Override @Override
public void logCloseRequested() {} public void logCloseRequested() {}

View File

@ -119,7 +119,7 @@ class DisabledWALProvider implements WALProvider {
public byte[][] rollWriter() { public byte[][] rollWriter() {
if (!listeners.isEmpty()) { if (!listeners.isEmpty()) {
for (WALActionsListener listener : listeners) { for (WALActionsListener listener : listeners) {
listener.logRollRequested(false); listener.logRollRequested(WALActionsListener.RollRequestReason.ERROR);
} }
for (WALActionsListener listener : listeners) { for (WALActionsListener listener : listeners) {
try { try {

View File

@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -46,6 +47,7 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTable;
@ -64,7 +66,9 @@ import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.DefaultWALProvider; import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
@ -146,9 +150,15 @@ public class TestLogRolling {
TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1); TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
// the namenode might still try to choose the recently-dead datanode // the namenode might still try to choose the recently-dead datanode
// for a pipeline, so try to a new pipeline multiple times // for a pipeline, so try to a new pipeline multiple times
TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30); TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30);
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.hlog.tolerable.lowreplication", 2); TEST_UTIL.getConfiguration().setInt("hbase.regionserver.hlog.tolerable.lowreplication", 2);
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.hlog.lowreplication.rolllimit", 3); TEST_UTIL.getConfiguration().setInt("hbase.regionserver.hlog.lowreplication.rolllimit", 3);
// For slow sync threshold test: roll after 5 slow syncs in 10 seconds
TEST_UTIL.getConfiguration().setInt(FSHLog.SLOW_SYNC_ROLL_THRESHOLD, 5);
TEST_UTIL.getConfiguration().setInt(FSHLog.SLOW_SYNC_ROLL_INTERVAL_MS, 10 * 1000);
// For slow sync threshold test: roll once after a sync above this threshold
TEST_UTIL.getConfiguration().setInt(FSHLog.ROLL_ON_SYNC_TIME_MS, 5000);
} }
@Before @Before
@ -223,19 +233,187 @@ public class TestLogRolling {
LOG.info("after writing there are " + DefaultWALProvider.getNumRolledLogFiles(log) + LOG.info("after writing there are " + DefaultWALProvider.getNumRolledLogFiles(log) +
" log files"); " log files");
// flush all regions // flush all regions
for (Region r: server.getOnlineRegionsLocalContext()) { for (Region r: server.getOnlineRegionsLocalContext()) {
r.flush(true); r.flush(true);
} }
// Now roll the log // Now roll the log
log.rollWriter(); log.rollWriter();
int count = DefaultWALProvider.getNumRolledLogFiles(log); int count = DefaultWALProvider.getNumRolledLogFiles(log);
LOG.info("after flushing all regions and rolling logs there are " + count + " log files"); LOG.info("after flushing all regions and rolling logs there are " + count + " log files");
assertTrue(("actual count: " + count), count <= 2); assertTrue(("actual count: " + count), count <= 2);
} }
@Test
public void testSlowSyncLogRolling() throws Exception {
// Create the test table
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(getName()));
desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
admin.createTable(desc);
Table table = TEST_UTIL.getConnection().getTable(desc.getTableName());
int row = 1;
try {
assertTrue(((HTable) table).isAutoFlush());
// Get a reference to the FSHLog
server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName());
HRegionInfo region = server.getOnlineRegions(desc.getTableName()).get(0).getRegionInfo();
final FSHLog log = (FSHLog) server.getWAL(region);
// Register a WALActionsListener to observe if a SLOW_SYNC roll is requested
final AtomicBoolean slowSyncHookCalled = new AtomicBoolean();
log.registerWALActionsListener(new WALActionsListener.Base() {
@Override
public void logRollRequested(WALActionsListener.RollRequestReason reason) {
switch (reason) {
case SLOW_SYNC:
slowSyncHookCalled.lazySet(true);
break;
default:
break;
}
}
});
// Write some data
for (int i = 0; i < 10; i++) {
writeData(table, row++);
}
assertFalse("Should not have triggered log roll due to SLOW_SYNC",
slowSyncHookCalled.get());
// Set up for test
slowSyncHookCalled.set(false);
// Wrap the current writer with the anonymous class below that adds 200 ms of
// latency to any sync on the hlog. This should be more than sufficient to trigger
// slow sync warnings.
final Writer oldWriter1 = log.getWriter();
final Writer newWriter1 = new Writer() {
@Override
public void close() throws IOException {
oldWriter1.close();
}
@Override
public void sync(boolean forceSync) throws IOException {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
InterruptedIOException ex = new InterruptedIOException();
ex.initCause(e);
throw ex;
}
oldWriter1.sync(forceSync);
}
@Override
public void append(Entry entry) throws IOException {
oldWriter1.append(entry);
}
@Override
public long getLength() throws IOException {
return oldWriter1.getLength();
}
};
log.setWriter(newWriter1);
// Write some data.
// We need to write at least 5 times, but double it. We should only request
// a SLOW_SYNC roll once in the current interval.
for (int i = 0; i < 10; i++) {
writeData(table, row++);
}
// Wait for our wait injecting writer to get rolled out, as needed.
TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return log.getWriter() != newWriter1;
}
@Override
public String explainFailure() throws Exception {
return "Waited too long for our test writer to get rolled out";
}
});
assertTrue("Should have triggered log roll due to SLOW_SYNC",
slowSyncHookCalled.get());
// Set up for test
slowSyncHookCalled.set(false);
// Wrap the current writer with the anonymous class below that adds 5000 ms of
// latency to any sync on the hlog.
// This will trip the other threshold.
final Writer oldWriter2 = log.getWriter();
final Writer newWriter2 = new Writer() {
@Override
public void close() throws IOException {
oldWriter2.close();
}
@Override
public void sync(boolean forceSync) throws IOException {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
InterruptedIOException ex = new InterruptedIOException();
ex.initCause(e);
throw ex;
}
oldWriter2.sync(forceSync);
}
@Override
public void append(Entry entry) throws IOException {
oldWriter2.append(entry);
}
@Override
public long getLength() throws IOException {
return oldWriter2.getLength();
}
};
log.setWriter(newWriter2);
// Write some data. Should only take one sync.
writeData(table, row++);
// Wait for our wait injecting writer to get rolled out, as needed.
TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return log.getWriter() != newWriter2;
}
@Override
public String explainFailure() throws Exception {
return "Waited too long for our test writer to get rolled out";
}
});
assertTrue("Should have triggered log roll due to SLOW_SYNC",
slowSyncHookCalled.get());
// Set up for test
slowSyncHookCalled.set(false);
// Write some data
for (int i = 0; i < 10; i++) {
writeData(table, row++);
}
assertFalse("Should not have triggered log roll due to SLOW_SYNC",
slowSyncHookCalled.get());
} finally {
table.close();
}
}
private String getName() { private String getName() {
return "TestLogRolling-" + name.getMethodName(); return "TestLogRolling-" + name.getMethodName();
} }
@ -316,12 +494,15 @@ public class TestLogRolling {
HRegionInfo region = server.getOnlineRegions(desc.getTableName()).get(0).getRegionInfo(); HRegionInfo region = server.getOnlineRegions(desc.getTableName()).get(0).getRegionInfo();
final FSHLog log = (FSHLog) server.getWAL(region); final FSHLog log = (FSHLog) server.getWAL(region);
final AtomicBoolean lowReplicationHookCalled = new AtomicBoolean(false); final AtomicBoolean lowReplicationHookCalled = new AtomicBoolean(false);
log.registerWALActionsListener(new WALActionsListener.Base() { log.registerWALActionsListener(new WALActionsListener.Base() {
@Override @Override
public void logRollRequested(boolean lowReplication) { public void logRollRequested(WALActionsListener.RollRequestReason reason) {
if (lowReplication) { switch (reason) {
lowReplicationHookCalled.lazySet(true); case LOW_REPLICATION:
lowReplicationHookCalled.lazySet(true);
break;
default:
break;
} }
} }
}); });

View File

@ -35,13 +35,21 @@ public class TestMetricsWAL {
public void testLogRollRequested() throws Exception { public void testLogRollRequested() throws Exception {
MetricsWALSource source = mock(MetricsWALSourceImpl.class); MetricsWALSource source = mock(MetricsWALSourceImpl.class);
MetricsWAL metricsWAL = new MetricsWAL(source); MetricsWAL metricsWAL = new MetricsWAL(source);
metricsWAL.logRollRequested(false); metricsWAL.logRollRequested(WALActionsListener.RollRequestReason.ERROR);
metricsWAL.logRollRequested(true); metricsWAL.logRollRequested(WALActionsListener.RollRequestReason.LOW_REPLICATION);
metricsWAL.logRollRequested(WALActionsListener.RollRequestReason.SLOW_SYNC);
metricsWAL.logRollRequested(WALActionsListener.RollRequestReason.SIZE);
// Log roll was requested twice // Log roll was requested four times
verify(source, times(2)).incrementLogRollRequested(); verify(source, times(4)).incrementLogRollRequested();
// One was because of an IO error.
verify(source, times(1)).incrementErrorLogRoll();
// One was because of low replication on the hlog. // One was because of low replication on the hlog.
verify(source, times(1)).incrementLowReplicationLogRoll(); verify(source, times(1)).incrementLowReplicationLogRoll();
// One was because of slow sync on the hlog.
verify(source, times(1)).incrementSlowSyncLogRoll();
// One was because of hlog file length limit.
verify(source, times(1)).incrementSizeLogRoll();
} }
@Test @Test