HDFS-7129. Metrics to track usage of memory for writes. (Contributed by Xiaoyu Yao)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
This commit is contained in:
arp 2014-09-30 00:53:18 -07:00 committed by Jitendra Pandey
parent 6916d41fe6
commit c865c93dc1
6 changed files with 238 additions and 23 deletions

View File

@ -1013,11 +1013,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
if (allowLazyPersist) {
// First try to place the block on a transient volume.
v = volumes.getNextTransientVolume(b.getNumBytes());
datanode.getMetrics().incrRamDiskBlocksWrite();
} else {
v = volumes.getNextVolume(storageType, b.getNumBytes());
}
} catch (DiskOutOfSpaceException de) {
if (allowLazyPersist) {
datanode.getMetrics().incrRamDiskBlocksWriteFallback();
allowLazyPersist = false;
continue;
}
@ -1245,6 +1247,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
if (v.isTransientStorage()) {
ramDiskReplicaTracker.addReplica(bpid, replicaInfo.getBlockId(), v);
datanode.getMetrics().addRamDiskBytesWrite(replicaInfo.getNumBytes());
}
}
volumeMap.add(bpid, newReplicaInfo);
@ -1500,7 +1503,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
if (v.isTransientStorage()) {
ramDiskReplicaTracker.discardReplica(bpid, invalidBlks[i].getBlockId(), true);
RamDiskReplica replicaInfo =
ramDiskReplicaTracker.getReplica(bpid, invalidBlks[i].getBlockId());
if (replicaInfo != null) {
if (replicaInfo.getIsPersisted() == false) {
datanode.getMetrics().incrRamDiskBlocksDeletedBeforeLazyPersisted();
}
discardRamDiskReplica(replicaInfo, true);
}
}
// If a DFSClient has the replica in its cache of short-circuit file
@ -1646,11 +1656,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
if (info != null) {
if (touch && info.getVolume().isTransientStorage()) {
ramDiskReplicaTracker.touch(bpid, blockId);
datanode.getMetrics().incrRamDiskBlocksReadHits();
}
return info.getBlockFile();
}
return null;
}
/**
* check if a data directory is healthy
* if some volumes failed - make sure to remove all the blocks that belong
@ -2304,6 +2316,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
nbytes, flags);
}
void discardRamDiskReplica(RamDiskReplica replica, boolean deleteSavedCopies) {
ramDiskReplicaTracker.discardReplica(replica.getBlockPoolId(),
replica.getBlockId(), deleteSavedCopies);
}
class LazyWriter implements Runnable {
private volatile boolean shouldRun = true;
final int checkpointerInterval;
@ -2327,7 +2344,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS_DEFAULT);
}
private void moveReplicaToNewVolume(String bpid, long blockId)
private void moveReplicaToNewVolume(String bpid, long blockId, long creationTime)
throws IOException {
FsVolumeImpl targetVolume;
@ -2369,6 +2386,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
synchronized (FsDatasetImpl.this) {
ramDiskReplicaTracker.recordEndLazyPersist(bpid, blockId, savedFiles);
// Update metrics (ignore the metadata file size)
datanode.getMetrics().incrRamDiskBlocksLazyPersisted();
datanode.getMetrics().incrRamDiskBytesLazyPersisted(replicaInfo.getNumBytes());
datanode.getMetrics().addRamDiskBlocksLazyPersistWindowMs(
Time.monotonicNow() - creationTime);
if (LOG.isDebugEnabled()) {
LOG.debug("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid +
" to file " + savedFiles[1]);
@ -2388,7 +2411,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
try {
block = ramDiskReplicaTracker.dequeueNextReplicaToPersist();
if (block != null) {
moveReplicaToNewVolume(block.getBlockPoolId(), block.getBlockId());
moveReplicaToNewVolume(block.getBlockPoolId(), block.getBlockId(),
block.getCreationTime());
}
succeeded = true;
} catch(IOException ioe) {
@ -2456,7 +2480,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
metaFile = replicaInfo.getMetaFile();
blockFileUsed = blockFile.length();
metaFileUsed = metaFile.length();
ramDiskReplicaTracker.discardReplica(replicaState, false);
discardRamDiskReplica(replicaState, false);
// Move the replica from lazyPersist/ to finalized/ on target volume
BlockPoolSlice bpSlice =
@ -2474,6 +2498,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// Update the volumeMap entry.
volumeMap.add(bpid, newReplicaInfo);
// Update metrics
datanode.getMetrics().incrRamDiskBlocksEvicted();
datanode.getMetrics().addRamDiskBlocksEvictionWindowMs(
Time.monotonicNow() - replicaState.getCreationTime());
if (replicaState.getNumReads() == 0) {
datanode.getMetrics().incrRamDiskBlocksEvictedWithoutRead();
}
}
// Before deleting the files from transient storage we must notify the

View File

@ -22,6 +22,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import com.google.common.collect.TreeMultimap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.util.Time;
import java.io.File;
import java.util.*;
@ -97,9 +98,11 @@ public class RamDiskReplicaLruTracker extends RamDiskReplicaTracker {
return;
}
ramDiskReplicaLru.numReads.getAndIncrement();
// Reinsert the replica with its new timestamp.
if (replicasPersisted.remove(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru)) {
ramDiskReplicaLru.lastUsedTime = System.currentTimeMillis();
ramDiskReplicaLru.lastUsedTime = Time.monotonicNow();
replicasPersisted.put(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru);
}
}
@ -132,8 +135,9 @@ public class RamDiskReplicaLruTracker extends RamDiskReplicaTracker {
replicasNotPersisted.remove(ramDiskReplicaLru);
}
ramDiskReplicaLru.lastUsedTime = System.currentTimeMillis();
ramDiskReplicaLru.lastUsedTime = Time.monotonicNow();
replicasPersisted.put(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru);
ramDiskReplicaLru.isPersisted = true;
}
@Override
@ -215,4 +219,16 @@ public class RamDiskReplicaLruTracker extends RamDiskReplicaTracker {
// replicasNotPersisted will be lazily GC'ed.
}
@Override
synchronized RamDiskReplica getReplica(
final String bpid, final long blockId) {
Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
if (map == null) {
return null;
}
return map.get(blockId);
}
}

View File

@ -28,8 +28,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import java.io.File;
import java.util.concurrent.atomic.AtomicLong;
@InterfaceAudience.Private
@InterfaceStability.Unstable
@ -44,6 +46,10 @@ public abstract class RamDiskReplicaTracker {
private File savedBlockFile;
private File savedMetaFile;
private long creationTime;
protected AtomicLong numReads = new AtomicLong(0);
protected boolean isPersisted;
/**
* RAM_DISK volume that holds the original replica.
*/
@ -62,6 +68,8 @@ public abstract class RamDiskReplicaTracker {
lazyPersistVolume = null;
savedMetaFile = null;
savedBlockFile = null;
creationTime = Time.monotonicNow();
isPersisted = false;
}
long getBlockId() {
@ -89,6 +97,12 @@ public abstract class RamDiskReplicaTracker {
return savedMetaFile;
}
long getNumReads() { return numReads.get(); }
long getCreationTime() { return creationTime; }
boolean getIsPersisted() {return isPersisted; }
/**
* Record the saved meta and block files on the given volume.
*
@ -243,7 +257,10 @@ public abstract class RamDiskReplicaTracker {
final String bpid, final long blockId,
boolean deleteSavedCopies);
void discardReplica(RamDiskReplica replica, boolean deleteSavedCopies) {
discardReplica(replica.getBlockPoolId(), replica.getBlockId(), deleteSavedCopies);
}
/**
* Return RamDiskReplica info given block pool id and block id
* Return null if it does not exist in RamDisk
*/
abstract RamDiskReplica getReplica(
final String bpid, final long blockId);
}

View File

@ -65,6 +65,26 @@ public class DataNodeMetrics {
@Metric MutableCounterLong writesFromRemoteClient;
@Metric MutableCounterLong blocksGetLocalPathInfo;
// RamDisk metrics on read/write
@Metric MutableCounterLong ramDiskBlocksWrite;
@Metric MutableCounterLong ramDiskBlocksWriteFallback;
@Metric MutableCounterLong ramDiskBytesWrite;
@Metric MutableCounterLong ramDiskBlocksReadHits;
// RamDisk metrics on eviction
@Metric MutableCounterLong ramDiskBlocksEvicted;
@Metric MutableCounterLong ramDiskBlocksEvictedWithoutRead;
@Metric MutableRate ramDiskBlocksEvictionWindowMs;
final MutableQuantiles[] ramDiskBlocksEvictionWindowMsQuantiles;
// RamDisk metrics on lazy persist
@Metric MutableCounterLong ramDiskBlocksLazyPersisted;
@Metric MutableCounterLong ramDiskBlocksDeletedBeforeLazyPersisted;
@Metric MutableCounterLong ramDiskBytesLazyPersisted;
@Metric MutableRate ramDiskBlocksLazyPersistWindowMs;
final MutableQuantiles[] ramDiskBlocksLazyPersistWindowMsQuantiles;
@Metric MutableCounterLong fsyncCount;
@Metric MutableCounterLong volumeFailures;
@ -107,6 +127,8 @@ public class DataNodeMetrics {
fsyncNanosQuantiles = new MutableQuantiles[len];
sendDataPacketBlockedOnNetworkNanosQuantiles = new MutableQuantiles[len];
sendDataPacketTransferNanosQuantiles = new MutableQuantiles[len];
ramDiskBlocksEvictionWindowMsQuantiles = new MutableQuantiles[len];
ramDiskBlocksLazyPersistWindowMsQuantiles = new MutableQuantiles[len];
for (int i = 0; i < len; i++) {
int interval = intervals[i];
@ -127,6 +149,14 @@ public class DataNodeMetrics {
"sendDataPacketTransferNanos" + interval + "s",
"Time reading from disk and writing to network while sending " +
"a packet in ns", "ops", "latency", interval);
ramDiskBlocksEvictionWindowMsQuantiles[i] = registry.newQuantiles(
"ramDiskBlocksEvictionWindows" + interval + "s",
"Time between the RamDisk block write and eviction in ms",
"ops", "latency", interval);
ramDiskBlocksLazyPersistWindowMsQuantiles[i] = registry.newQuantiles(
"ramDiskBlocksLazyPersistWindows" + interval + "s",
"Time between the RamDisk block write and disk persist in ms",
"ops", "latency", interval);
}
}
@ -284,4 +314,54 @@ public class DataNodeMetrics {
q.add(latencyNanos);
}
}
public void incrRamDiskBlocksWrite() {
ramDiskBlocksWrite.incr();
}
public void incrRamDiskBlocksWriteFallback() {
ramDiskBlocksWriteFallback.incr();
}
public void addRamDiskBytesWrite(long bytes) {
ramDiskBytesWrite.incr(bytes);
}
public void incrRamDiskBlocksReadHits() {
ramDiskBlocksReadHits.incr();
}
public void incrRamDiskBlocksEvicted() {
ramDiskBlocksEvicted.incr();
}
public void incrRamDiskBlocksEvictedWithoutRead() {
ramDiskBlocksEvictedWithoutRead.incr();
}
public void addRamDiskBlocksEvictionWindowMs(long latencyMs) {
ramDiskBlocksEvictionWindowMs.add(latencyMs);
for (MutableQuantiles q : ramDiskBlocksEvictionWindowMsQuantiles) {
q.add(latencyMs);
}
}
public void incrRamDiskBlocksLazyPersisted() {
ramDiskBlocksLazyPersisted.incr();
}
public void incrRamDiskBlocksDeletedBeforeLazyPersisted() {
ramDiskBlocksDeletedBeforeLazyPersisted.incr();
}
public void incrRamDiskBytesLazyPersisted(long bytes) {
ramDiskBytesLazyPersisted.incr(bytes);
}
public void addRamDiskBlocksLazyPersistWindowMs(long latencyMs) {
ramDiskBlocksLazyPersistWindowMs.add(latencyMs);
for (MutableQuantiles q : ramDiskBlocksLazyPersistWindowMsQuantiles) {
q.add(latencyMs);
}
}
}

View File

@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Set;
import java.util.TreeSet;
import java.util.regex.Pattern;
import javax.management.AttributeNotFoundException;
import javax.management.MBeanAttributeInfo;
@ -109,6 +110,23 @@ public class JMXGet {
}
}
public void printAllMatchedAttributes(String attrRegExp) throws Exception {
err("List of the keys matching " + attrRegExp + " :");
Object val = null;
Pattern p = Pattern.compile(attrRegExp);
for (ObjectName oname : hadoopObjectNames) {
err(">>>>>>>>jmx name: " + oname.getCanonicalKeyPropertyListString());
MBeanInfo mbinfo = mbsc.getMBeanInfo(oname);
MBeanAttributeInfo[] mbinfos = mbinfo.getAttributes();
for (MBeanAttributeInfo mb : mbinfos) {
if (p.matcher(mb.getName()).lookingAt()) {
val = mbsc.getAttribute(oname, mb.getName());
System.out.format(format, mb.getName(), (val == null) ? "" : val.toString());
}
}
}
}
/**
* get single value by key
*/

View File

@ -31,19 +31,18 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.tools.JMXGet;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.io.*;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@ -58,6 +57,7 @@ import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsNot.not;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public class TestLazyPersistFiles {
@ -81,14 +81,21 @@ public class TestLazyPersistFiles {
private static final int LAZY_WRITER_INTERVAL_SEC = 1;
private static final int BUFFER_LENGTH = 4096;
private static final int EVICTION_LOW_WATERMARK = 1;
private static final String JMX_SERVICE_NAME = "DataNode";
private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk";
private MiniDFSCluster cluster;
private DistributedFileSystem fs;
private DFSClient client;
private Configuration conf;
private JMXGet jmx;
@After
public void shutDownCluster() throws IOException {
public void shutDownCluster() throws Exception {
// Dump all RamDisk JMX metrics before shutdown the cluster
printRamDiskJMXMetrics();
if (fs != null) {
fs.close();
fs = null;
@ -100,6 +107,10 @@ public class TestLazyPersistFiles {
cluster.shutdown();
cluster = null;
}
if (jmx != null) {
jmx = null;
}
}
@Test (timeout=300000)
@ -203,13 +214,15 @@ public class TestLazyPersistFiles {
* @throws IOException
*/
@Test (timeout=300000)
public void testFallbackToDiskFull() throws IOException {
public void testFallbackToDiskFull() throws Exception {
startUpCluster(false, 0);
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat");
makeTestFile(path, BLOCK_SIZE, true);
ensureFileReplicasOnStorageType(path, DEFAULT);
verifyRamDiskJMXMetric("RamDiskBlocksWriteFallback", 1);
}
/**
@ -384,11 +397,10 @@ public class TestLazyPersistFiles {
/**
* RamDisk eviction after lazy persist to disk.
* @throws IOException
* @throws InterruptedException
* @throws Exception
*/
@Test (timeout=300000)
public void testRamDiskEviction() throws IOException, InterruptedException {
public void testRamDiskEviction() throws Exception {
startUpCluster(true, 1 + EVICTION_LOW_WATERMARK);
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
@ -411,6 +423,9 @@ public class TestLazyPersistFiles {
// RAM_DISK.
ensureFileReplicasOnStorageType(path2, RAM_DISK);
ensureFileReplicasOnStorageType(path1, DEFAULT);
verifyRamDiskJMXMetric("RamDiskBlocksEvicted", 1);
verifyRamDiskJMXMetric("RamDiskBlocksEvictedWithoutRead", 1);
}
/**
@ -454,7 +469,7 @@ public class TestLazyPersistFiles {
*/
@Test (timeout=300000)
public void testRamDiskEvictionIsLru()
throws IOException, InterruptedException {
throws Exception {
final int NUM_PATHS = 5;
startUpCluster(true, NUM_PATHS + EVICTION_LOW_WATERMARK);
final String METHOD_NAME = GenericTestUtils.getMethodName();
@ -499,6 +514,14 @@ public class TestLazyPersistFiles {
ensureFileReplicasOnStorageType(paths[indexes.get(j)], RAM_DISK);
}
}
verifyRamDiskJMXMetric("RamDiskBlocksWrite", NUM_PATHS * 2);
verifyRamDiskJMXMetric("RamDiskBlocksWriteFallback", 0);
verifyRamDiskJMXMetric("RamDiskBytesWrite", BLOCK_SIZE * NUM_PATHS * 2);
verifyRamDiskJMXMetric("RamDiskBlocksReadHits", NUM_PATHS);
verifyRamDiskJMXMetric("RamDiskBlocksEvicted", NUM_PATHS);
verifyRamDiskJMXMetric("RamDiskBlocksEvictedWithoutRead", 0);
verifyRamDiskJMXMetric("RamDiskBlocksDeletedBeforeLazyPersisted", 0);
}
/**
@ -506,9 +529,9 @@ public class TestLazyPersistFiles {
* Memory is freed up and file is gone.
* @throws IOException
*/
@Test (timeout=300000)
@Test // (timeout=300000)
public void testDeleteBeforePersist()
throws IOException, InterruptedException {
throws Exception {
startUpCluster(true, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
@ -523,6 +546,8 @@ public class TestLazyPersistFiles {
Assert.assertFalse(fs.exists(path));
assertThat(verifyDeletedBlocks(locatedBlocks), is(true));
verifyRamDiskJMXMetric("RamDiskBlocksDeletedBeforeLazyPersisted", 1);
}
/**
@ -533,7 +558,7 @@ public class TestLazyPersistFiles {
*/
@Test (timeout=300000)
public void testDeleteAfterPersist()
throws IOException, InterruptedException {
throws Exception {
startUpCluster(true, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat");
@ -548,9 +573,10 @@ public class TestLazyPersistFiles {
client.delete(path.toString(), false);
Assert.assertFalse(fs.exists(path));
triggerBlockReport();
assertThat(verifyDeletedBlocks(locatedBlocks), is(true));
verifyRamDiskJMXMetric("RamDiskBlocksLazyPersisted", 1);
verifyRamDiskJMXMetric("RamDiskBytesLazyPersisted", BLOCK_SIZE);
}
/**
@ -760,6 +786,11 @@ public class TestLazyPersistFiles {
.build();
fs = cluster.getFileSystem();
client = fs.getClient();
try {
jmx = initJMX();
} catch (Exception e) {
fail("Failed initialize JMX for testing: " + e);
}
LOG.info("Cluster startup complete");
}
@ -929,4 +960,25 @@ public class TestLazyPersistFiles {
}
}
}
JMXGet initJMX() throws Exception
{
JMXGet jmx = new JMXGet();
jmx.setService(JMX_SERVICE_NAME);
jmx.init();
return jmx;
}
void printRamDiskJMXMetrics() {
try {
jmx.printAllMatchedAttributes(JMX_RAM_DISK_METRICS_PATTERN);
} catch (Exception e) {
e.printStackTrace();
}
}
void verifyRamDiskJMXMetric(String metricName, long expectedValue)
throws Exception {
assertEquals(expectedValue, Integer.parseInt(jmx.getValue(metricName)));
}
}