HBASE-12576 Add metrics for rolling the HLog if there are too few DN's in the write pipeline

This commit is contained in:
Elliott Clark 2014-11-26 09:53:11 -08:00
parent 7b10847ec8
commit 29ee0cb0e1
10 changed files with 127 additions and 21 deletions

View File

@ -57,6 +57,11 @@ public interface MetricsWALSource extends BaseSource {
String SLOW_APPEND_COUNT_DESC = "Number of appends that were slow."; String SLOW_APPEND_COUNT_DESC = "Number of appends that were slow.";
String SYNC_TIME = "syncTime"; String SYNC_TIME = "syncTime";
String SYNC_TIME_DESC = "The time it took to sync the WAL to HDFS."; String SYNC_TIME_DESC = "The time it took to sync the WAL to HDFS.";
String ROLL_REQUESTED = "rollRequest";
String ROLL_REQUESTED_DESC = "How many times a log roll has been requested total";
String LOW_REPLICA_ROLL_REQUESTED = "lowReplicaRollRequest";
String LOW_REPLICA_ROLL_REQUESTED_DESC =
"How many times a log roll was requested due to too few DN's in the write pipeline.";
/** /**
* Add the append size. * Add the append size.
@ -83,4 +88,8 @@ public interface MetricsWALSource extends BaseSource {
*/ */
void incrementSyncTime(long time); void incrementSyncTime(long time);
void incrementLogRollRequested();
void incrementLowReplicationLogRoll();
} }

View File

@ -38,6 +38,8 @@ public class MetricsWALSourceImpl extends BaseSourceImpl implements MetricsWALSo
private final MetricHistogram syncTimeHisto; private final MetricHistogram syncTimeHisto;
private final MutableCounterLong appendCount; private final MutableCounterLong appendCount;
private final MutableCounterLong slowAppendCount; private final MutableCounterLong slowAppendCount;
private final MutableCounterLong logRollRequested;
private final MutableCounterLong lowReplicationLogRollRequested;
public MetricsWALSourceImpl() { public MetricsWALSourceImpl() {
this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT); this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT);
@ -53,8 +55,13 @@ public class MetricsWALSourceImpl extends BaseSourceImpl implements MetricsWALSo
appendTimeHisto = this.getMetricsRegistry().newHistogram(APPEND_TIME, APPEND_TIME_DESC); appendTimeHisto = this.getMetricsRegistry().newHistogram(APPEND_TIME, APPEND_TIME_DESC);
appendSizeHisto = this.getMetricsRegistry().newHistogram(APPEND_SIZE, APPEND_SIZE_DESC); appendSizeHisto = this.getMetricsRegistry().newHistogram(APPEND_SIZE, APPEND_SIZE_DESC);
appendCount = this.getMetricsRegistry().newCounter(APPEND_COUNT, APPEND_COUNT_DESC, 0l); appendCount = this.getMetricsRegistry().newCounter(APPEND_COUNT, APPEND_COUNT_DESC, 0l);
slowAppendCount = this.getMetricsRegistry().newCounter(SLOW_APPEND_COUNT, SLOW_APPEND_COUNT_DESC, 0l); slowAppendCount =
this.getMetricsRegistry().newCounter(SLOW_APPEND_COUNT, SLOW_APPEND_COUNT_DESC, 0l);
syncTimeHisto = this.getMetricsRegistry().newHistogram(SYNC_TIME, SYNC_TIME_DESC); syncTimeHisto = this.getMetricsRegistry().newHistogram(SYNC_TIME, SYNC_TIME_DESC);
logRollRequested =
this.getMetricsRegistry().newCounter(ROLL_REQUESTED, ROLL_REQUESTED_DESC, 0L);
lowReplicationLogRollRequested = this.getMetricsRegistry()
.newCounter(LOW_REPLICA_ROLL_REQUESTED, LOW_REPLICA_ROLL_REQUESTED_DESC, 0L);
} }
@Override @Override
@ -81,4 +88,14 @@ public class MetricsWALSourceImpl extends BaseSourceImpl implements MetricsWALSo
public void incrementSyncTime(long time) { public void incrementSyncTime(long time) {
syncTimeHisto.add(time); syncTimeHisto.add(time);
} }
@Override
public void incrementLogRollRequested() {
logRollRequested.incr();
}
@Override
public void incrementLowReplicationLogRoll() {
lowReplicationLogRollRequested.incr();
}
} }

View File

@ -63,7 +63,7 @@ class LogRoller extends HasThread {
if (null == walNeedsRoll.putIfAbsent(wal, Boolean.FALSE)) { if (null == walNeedsRoll.putIfAbsent(wal, Boolean.FALSE)) {
wal.registerWALActionsListener(new WALActionsListener.Base() { wal.registerWALActionsListener(new WALActionsListener.Base() {
@Override @Override
public void logRollRequested() { public void logRollRequested(boolean lowReplicas) {
walNeedsRoll.put(wal, Boolean.TRUE); walNeedsRoll.put(wal, Boolean.TRUE);
// TODO logs will contend with each other here, replace with e.g. DelayedQueue // TODO logs will contend with each other here, replace with e.g. DelayedQueue
synchronized(rollLog) { synchronized(rollLog) {

View File

@ -1343,7 +1343,9 @@ public class FSHLog implements WAL {
rollWriterLock.unlock(); rollWriterLock.unlock();
} }
try { try {
if (lowReplication || writer != null && writer.getLength() > logrollsize) requestLogRoll(); if (lowReplication || writer != null && writer.getLength() > logrollsize) {
requestLogRoll(lowReplication);
}
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Writer.getLength() failed; continuing", e); LOG.warn("Writer.getLength() failed; continuing", e);
} }
@ -1559,9 +1561,13 @@ public class FSHLog implements WAL {
// public only until class moves to o.a.h.h.wal // public only until class moves to o.a.h.h.wal
public void requestLogRoll() { public void requestLogRoll() {
requestLogRoll(false);
}
private void requestLogRoll(boolean tooFewReplicas) {
if (!this.listeners.isEmpty()) { if (!this.listeners.isEmpty()) {
for (WALActionsListener i: this.listeners) { for (WALActionsListener i: this.listeners) {
i.logRollRequested(); i.logRollRequested(tooFewReplicas);
} }
} }
} }

View File

@ -19,13 +19,13 @@
package org.apache.hadoop.hbase.regionserver.wal; package org.apache.hadoop.hbase.regionserver.wal;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory; import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
/** /**
* Class used to push numbers about the WAL into the metrics subsystem. This will take a * Class used to push numbers about the WAL into the metrics subsystem. This will take a
* single function call and turn it into multiple manipulations of the hadoop metrics system. * single function call and turn it into multiple manipulations of the hadoop metrics system.
@ -37,12 +37,17 @@ public class MetricsWAL extends WALActionsListener.Base {
private final MetricsWALSource source; private final MetricsWALSource source;
public MetricsWAL() { public MetricsWAL() {
source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class); this(CompatibilitySingletonFactory.getInstance(MetricsWALSource.class));
}
@VisibleForTesting
MetricsWAL(MetricsWALSource s) {
this.source = s;
} }
@Override @Override
public void postSync(final long timeInNanos, final int handlerSyncs) { public void postSync(final long timeInNanos, final int handlerSyncs) {
source.incrementSyncTime(timeInNanos/1000000l); source.incrementSyncTime(timeInNanos/1000000L);
} }
@Override @Override
@ -59,4 +64,12 @@ public class MetricsWAL extends WALActionsListener.Base {
StringUtils.humanReadableInt(size))); StringUtils.humanReadableInt(size)));
} }
} }
@Override
public void logRollRequested(boolean underReplicated) {
source.incrementLogRollRequested();
if (underReplicated) {
source.incrementLowReplicationLogRoll();
}
}
} }

View File

@ -67,7 +67,7 @@ public interface WALActionsListener {
/** /**
* A request was made that the WAL be rolled. * A request was made that the WAL be rolled.
*/ */
void logRollRequested(); void logRollRequested(boolean tooFewReplicas);
/** /**
* The WAL is about to close. * The WAL is about to close.
@ -127,7 +127,7 @@ public interface WALActionsListener {
public void postLogArchive(Path oldPath, Path newPath) throws IOException {} public void postLogArchive(Path oldPath, Path newPath) throws IOException {}
@Override @Override
public void logRollRequested() {} public void logRollRequested(boolean tooFewReplicas) {}
@Override @Override
public void logCloseRequested() {} public void logCloseRequested() {}

View File

@ -109,7 +109,7 @@ class DisabledWALProvider implements WALProvider {
public byte[][] rollWriter() { public byte[][] rollWriter() {
if (!listeners.isEmpty()) { if (!listeners.isEmpty()) {
for (WALActionsListener listener : listeners) { for (WALActionsListener listener : listeners) {
listener.logRollRequested(); listener.logRollRequested(false);
} }
for (WALActionsListener listener : listeners) { for (WALActionsListener listener : listeners) {
try { try {

View File

@ -86,13 +86,6 @@ import org.junit.rules.TestName;
@Category({RegionServerTests.class, MediumTests.class}) @Category({RegionServerTests.class, MediumTests.class})
public class TestFSHLog { public class TestFSHLog {
protected static final Log LOG = LogFactory.getLog(TestFSHLog.class); protected static final Log LOG = LogFactory.getLog(TestFSHLog.class);
{
((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.server.namenode.FSNamesystem"))
.getLogger().setLevel(Level.ALL);
((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
}
protected static Configuration conf; protected static Configuration conf;
protected static FileSystem fs; protected static FileSystem fs;

View File

@ -25,19 +25,16 @@ import static org.junit.Assert.assertTrue;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
@ -319,6 +316,16 @@ public class TestLogRolling {
server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName());
final FSHLog log = (FSHLog) server.getWAL(null); final FSHLog log = (FSHLog) server.getWAL(null);
final AtomicBoolean lowReplicationHookCalled = new AtomicBoolean(false);
log.registerWALActionsListener(new WALActionsListener.Base() {
@Override
public void logRollRequested(boolean lowReplication) {
if (lowReplication) {
lowReplicationHookCalled.lazySet(true);
}
}
});
// don't run this test without append support (HDFS-200 & HDFS-142) // don't run this test without append support (HDFS-200 & HDFS-142)
assertTrue("Need append support for this test", FSUtils assertTrue("Need append support for this test", FSUtils
@ -370,6 +377,9 @@ public class TestLogRolling {
assertTrue("Missing datanode should've triggered a log roll", assertTrue("Missing datanode should've triggered a log roll",
newFilenum > oldFilenum && newFilenum > curTime); newFilenum > oldFilenum && newFilenum > curTime);
assertTrue("The log rolling hook should have been called with the low replication flag",
lowReplicationHookCalled.get());
// write some more log data (this should use a new hdfs_out) // write some more log data (this should use a new hdfs_out)
writeData(table, 3); writeData(table, 3);
assertTrue("The log should not roll again.", assertTrue("The log should not roll again.",
@ -425,8 +435,10 @@ public class TestLogRolling {
final WAL log = server.getWAL(null); final WAL log = server.getWAL(null);
final List<Path> paths = new ArrayList<Path>(); final List<Path> paths = new ArrayList<Path>();
final List<Integer> preLogRolledCalled = new ArrayList<Integer>(); final List<Integer> preLogRolledCalled = new ArrayList<Integer>();
paths.add(DefaultWALProvider.getCurrentFileName(log)); paths.add(DefaultWALProvider.getCurrentFileName(log));
log.registerWALActionsListener(new WALActionsListener.Base() { log.registerWALActionsListener(new WALActionsListener.Base() {
@Override @Override
public void preLogRoll(Path oldFile, Path newFile) { public void preLogRoll(Path oldFile, Path newFile) {
LOG.debug("preLogRoll: oldFile="+oldFile+" newFile="+newFile); LOG.debug("preLogRoll: oldFile="+oldFile+" newFile="+newFile);

View File

@ -0,0 +1,56 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.util.concurrent.TimeUnit;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@Category({MiscTests.class, SmallTests.class})
public class TestMetricsWAL {
@Test
public void testLogRollRequested() throws Exception {
MetricsWALSource source = mock(MetricsWALSourceImpl.class);
MetricsWAL metricsWAL = new MetricsWAL(source);
metricsWAL.logRollRequested(false);
metricsWAL.logRollRequested(true);
// Log roll was requested twice
verify(source, times(2)).incrementLogRollRequested();
// One was because of low replication on the hlog.
verify(source, times(1)).incrementLowReplicationLogRoll();
}
@Test
public void testPostSync() throws Exception {
long nanos = TimeUnit.MILLISECONDS.toNanos(145);
MetricsWALSource source = mock(MetricsWALSourceImpl.class);
MetricsWAL metricsWAL = new MetricsWAL(source);
metricsWAL.postSync(nanos, 1);
verify(source, times(1)).incrementSyncTime(145);
}
}