HDFS-2943. Expose last checkpoint time and transaction stats as JMX metrics. Contributed by Aaron T. Myers.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1243823 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2012-02-14 08:33:04 +00:00
parent 6eadd0201d
commit 7054b4a891
5 changed files with 109 additions and 14 deletions

View File

@ -11,6 +11,9 @@ Release 0.23.2 - UNRELEASED
NEW FEATURES NEW FEATURES
HDFS-2943. Expose last checkpoint time and transaction stats as JMX
metrics. (atm)
IMPROVEMENTS IMPROVEMENTS
HDFS-2931. Switch DataNode's BlockVolumeChoosingPolicy to private-audience. HDFS-2931. Switch DataNode's BlockVolumeChoosingPolicy to private-audience.

View File

@ -724,7 +724,7 @@ public class FSImage implements Closeable {
long txId = loader.getLoadedImageTxId(); long txId = loader.getLoadedImageTxId();
LOG.info("Loaded image for txid " + txId + " from " + curFile); LOG.info("Loaded image for txid " + txId + " from " + curFile);
lastAppliedTxId = txId; lastAppliedTxId = txId;
storage.setMostRecentCheckpointTxId(txId); storage.setMostRecentCheckpointInfo(txId, curFile.lastModified());
} }
/** /**
@ -739,7 +739,7 @@ public class FSImage implements Closeable {
saver.save(newFile, txid, getFSNamesystem(), compression); saver.save(newFile, txid, getFSNamesystem(), compression);
MD5FileUtils.saveMD5File(dstFile, saver.getSavedDigest()); MD5FileUtils.saveMD5File(dstFile, saver.getSavedDigest());
storage.setMostRecentCheckpointTxId(txid); storage.setMostRecentCheckpointInfo(txid, Util.now());
} }
/** /**
@ -997,7 +997,7 @@ public class FSImage implements Closeable {
// advertise it as such to other checkpointers // advertise it as such to other checkpointers
// from now on // from now on
if (txid > storage.getMostRecentCheckpointTxId()) { if (txid > storage.getMostRecentCheckpointTxId()) {
storage.setMostRecentCheckpointTxId(txid); storage.setMostRecentCheckpointInfo(txid, Util.now());
} }
} }

View File

@ -2626,6 +2626,31 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
return datanodeStatistics.getExpiredHeartbeats(); return datanodeStatistics.getExpiredHeartbeats();
} }
@Metric({"TransactionsSinceLastCheckpoint",
"Number of transactions since last checkpoint"})
public long getTransactionsSinceLastCheckpoint() {
return getEditLog().getLastWrittenTxId() -
getFSImage().getStorage().getMostRecentCheckpointTxId();
}
@Metric({"TransactionsSinceLastLogRoll",
"Number of transactions since last edit log roll"})
public long getTransactionsSinceLastLogRoll() {
return (getEditLog().getLastWrittenTxId() -
getEditLog().getCurSegmentTxId()) + 1;
}
@Metric({"LastWrittenTransactionId", "Transaction ID written to the edit log"})
public long getLastWrittenTransactionId() {
return getEditLog().getLastWrittenTxId();
}
@Metric({"LastCheckpointTime",
"Time in milliseconds since the epoch of the last checkpoint"})
public long getLastCheckpointTime() {
return getFSImage().getStorage().getMostRecentCheckpointTime();
}
/** @see ClientProtocol#getStats() */ /** @see ClientProtocol#getStats() */
long[] getStats() { long[] getStats() {
final long[] stats = datanodeStatistics.getStats(); final long[] stats = datanodeStatistics.getStats();

View File

@ -126,6 +126,11 @@ public class NNStorage extends Storage implements Closeable {
*/ */
protected long mostRecentCheckpointTxId = HdfsConstants.INVALID_TXID; protected long mostRecentCheckpointTxId = HdfsConstants.INVALID_TXID;
/**
* Time of the last checkpoint, in milliseconds since the epoch.
*/
private long mostRecentCheckpointTime = 0;
/** /**
* list of failed (and thus removed) storages * list of failed (and thus removed) storages
*/ */
@ -417,19 +422,30 @@ public class NNStorage extends Storage implements Closeable {
} }
/** /**
* Set the transaction ID of the last checkpoint * Set the transaction ID and time of the last checkpoint
*
* @param txid transaction id of the last checkpoint
* @param time time of the last checkpoint, in millis since the epoch
*/ */
void setMostRecentCheckpointTxId(long txid) { void setMostRecentCheckpointInfo(long txid, long time) {
this.mostRecentCheckpointTxId = txid; this.mostRecentCheckpointTxId = txid;
this.mostRecentCheckpointTime = time;
} }
/** /**
* Return the transaction ID of the last checkpoint. * @return the transaction ID of the last checkpoint.
*/ */
long getMostRecentCheckpointTxId() { long getMostRecentCheckpointTxId() {
return mostRecentCheckpointTxId; return mostRecentCheckpointTxId;
} }
/**
* @return the time of the most recent checkpoint in millis since the epoch.
*/
long getMostRecentCheckpointTime() {
return mostRecentCheckpointTime;
}
/** /**
* Write a small file in all available storage directories that * Write a small file in all available storage directories that
* indicates that the namespace has reached some given transaction ID. * indicates that the namespace has reached some given transaction ID.

View File

@ -20,13 +20,12 @@ package org.apache.hadoop.hdfs.server.namenode.metrics;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter; import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.assertGauge; import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.*;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.IOException; import java.io.IOException;
import java.util.Random; import java.util.Random;
import junit.framework.TestCase;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger; import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -39,17 +38,21 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.test.MetricsAsserts; import org.apache.hadoop.test.MetricsAsserts;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/** /**
* Test for metrics published by the Namenode * Test for metrics published by the Namenode
*/ */
public class TestNameNodeMetrics extends TestCase { public class TestNameNodeMetrics {
private static final Configuration CONF = new HdfsConfiguration(); private static final Configuration CONF = new HdfsConfiguration();
private static final int DFS_REPLICATION_INTERVAL = 1; private static final int DFS_REPLICATION_INTERVAL = 1;
private static final Path TEST_ROOT_DIR_PATH = private static final Path TEST_ROOT_DIR_PATH =
@ -81,8 +84,8 @@ public class TestNameNodeMetrics extends TestCase {
return new Path(TEST_ROOT_DIR_PATH, fileName); return new Path(TEST_ROOT_DIR_PATH, fileName);
} }
@Override @Before
protected void setUp() throws Exception { public void setUp() throws Exception {
cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(DATANODE_COUNT).build(); cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(DATANODE_COUNT).build();
cluster.waitActive(); cluster.waitActive();
namesystem = cluster.getNamesystem(); namesystem = cluster.getNamesystem();
@ -90,8 +93,8 @@ public class TestNameNodeMetrics extends TestCase {
fs = (DistributedFileSystem) cluster.getFileSystem(); fs = (DistributedFileSystem) cluster.getFileSystem();
} }
@Override @After
protected void tearDown() throws Exception { public void tearDown() throws Exception {
cluster.shutdown(); cluster.shutdown();
} }
@ -115,6 +118,7 @@ public class TestNameNodeMetrics extends TestCase {
} }
/** Test metrics associated with addition of a file */ /** Test metrics associated with addition of a file */
@Test
public void testFileAdd() throws Exception { public void testFileAdd() throws Exception {
// Add files with 100 blocks // Add files with 100 blocks
final Path file = getTestPath("testFileAdd"); final Path file = getTestPath("testFileAdd");
@ -161,6 +165,7 @@ public class TestNameNodeMetrics extends TestCase {
} }
/** Corrupt a block and ensure metrics reflects it */ /** Corrupt a block and ensure metrics reflects it */
@Test
public void testCorruptBlock() throws Exception { public void testCorruptBlock() throws Exception {
// Create a file with single block with two replicas // Create a file with single block with two replicas
final Path file = getTestPath("testCorruptBlock"); final Path file = getTestPath("testCorruptBlock");
@ -186,6 +191,7 @@ public class TestNameNodeMetrics extends TestCase {
/** Create excess blocks by reducing the replication factor for /** Create excess blocks by reducing the replication factor for
* for a file and ensure metrics reflects it * for a file and ensure metrics reflects it
*/ */
@Test
public void testExcessBlocks() throws Exception { public void testExcessBlocks() throws Exception {
Path file = getTestPath("testExcessBlocks"); Path file = getTestPath("testExcessBlocks");
createFile(file, 100, (short)2); createFile(file, 100, (short)2);
@ -198,6 +204,7 @@ public class TestNameNodeMetrics extends TestCase {
} }
/** Test to ensure metrics reflects missing blocks */ /** Test to ensure metrics reflects missing blocks */
@Test
public void testMissingBlock() throws Exception { public void testMissingBlock() throws Exception {
// Create a file with single block with two replicas // Create a file with single block with two replicas
Path file = getTestPath("testMissingBlocks"); Path file = getTestPath("testMissingBlocks");
@ -216,6 +223,7 @@ public class TestNameNodeMetrics extends TestCase {
assertGauge("UnderReplicatedBlocks", 0L, getMetrics(NS_METRICS)); assertGauge("UnderReplicatedBlocks", 0L, getMetrics(NS_METRICS));
} }
@Test
public void testRenameMetrics() throws Exception { public void testRenameMetrics() throws Exception {
Path src = getTestPath("src"); Path src = getTestPath("src");
createFile(src, 100, (short)1); createFile(src, 100, (short)1);
@ -240,6 +248,7 @@ public class TestNameNodeMetrics extends TestCase {
* *
* @throws IOException in case of an error * @throws IOException in case of an error
*/ */
@Test
public void testGetBlockLocationMetric() throws Exception { public void testGetBlockLocationMetric() throws Exception {
Path file1_Path = new Path(TEST_ROOT_DIR_PATH, "file1.dat"); Path file1_Path = new Path(TEST_ROOT_DIR_PATH, "file1.dat");
@ -268,4 +277,46 @@ public class TestNameNodeMetrics extends TestCase {
updateMetrics(); updateMetrics();
assertCounter("GetBlockLocations", 3L, getMetrics(NN_METRICS)); assertCounter("GetBlockLocations", 3L, getMetrics(NN_METRICS));
} }
/**
* Test NN checkpoint and transaction-related metrics.
*/
@Test
public void testTransactionAndCheckpointMetrics() throws Exception {
long lastCkptTime = MetricsAsserts.getLongGauge("LastCheckpointTime",
getMetrics(NS_METRICS));
assertGauge("LastCheckpointTime", lastCkptTime, getMetrics(NS_METRICS));
assertGauge("LastWrittenTransactionId", 1L, getMetrics(NS_METRICS));
assertGauge("TransactionsSinceLastCheckpoint", 1L, getMetrics(NS_METRICS));
assertGauge("TransactionsSinceLastLogRoll", 1L, getMetrics(NS_METRICS));
fs.mkdirs(new Path(TEST_ROOT_DIR_PATH, "/tmp"));
updateMetrics();
assertGauge("LastCheckpointTime", lastCkptTime, getMetrics(NS_METRICS));
assertGauge("LastWrittenTransactionId", 2L, getMetrics(NS_METRICS));
assertGauge("TransactionsSinceLastCheckpoint", 2L, getMetrics(NS_METRICS));
assertGauge("TransactionsSinceLastLogRoll", 2L, getMetrics(NS_METRICS));
cluster.getNameNodeRpc().rollEditLog();
updateMetrics();
assertGauge("LastCheckpointTime", lastCkptTime, getMetrics(NS_METRICS));
assertGauge("LastWrittenTransactionId", 4L, getMetrics(NS_METRICS));
assertGauge("TransactionsSinceLastCheckpoint", 4L, getMetrics(NS_METRICS));
assertGauge("TransactionsSinceLastLogRoll", 1L, getMetrics(NS_METRICS));
cluster.getNameNodeRpc().setSafeMode(SafeModeAction.SAFEMODE_ENTER);
cluster.getNameNodeRpc().saveNamespace();
cluster.getNameNodeRpc().setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
updateMetrics();
long newLastCkptTime = MetricsAsserts.getLongGauge("LastCheckpointTime",
getMetrics(NS_METRICS));
assertTrue(lastCkptTime < newLastCkptTime);
assertGauge("LastWrittenTransactionId", 6L, getMetrics(NS_METRICS));
assertGauge("TransactionsSinceLastCheckpoint", 1L, getMetrics(NS_METRICS));
assertGauge("TransactionsSinceLastLogRoll", 1L, getMetrics(NS_METRICS));
}
} }