HDFS-3870. Add metrics to JournalNode. Contributed by Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3077@1380980 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f6b7f067c3
commit
13daca1ef6
|
@ -42,3 +42,5 @@ HDFS-3863. Track last "committed" txid in QJM (todd)
|
|||
HDFS-3869. Expose non-file journal manager details in web UI (todd)
|
||||
|
||||
HDFS-3884. Journal format() should reset cached values (todd)
|
||||
|
||||
HDFS-3870. Add metrics to JournalNode (todd)
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.io.InputStream;
|
|||
import java.io.OutputStreamWriter;
|
||||
import java.net.URL;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -50,7 +51,9 @@ import org.apache.hadoop.hdfs.util.BestEffortLongFile;
|
|||
import org.apache.hadoop.hdfs.util.PersistentLongFile;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Stopwatch;
|
||||
import com.google.common.collect.Range;
|
||||
import com.google.common.collect.Ranges;
|
||||
import com.google.protobuf.ByteString;
|
||||
|
@ -70,6 +73,8 @@ class Journal implements Closeable {
|
|||
private long curSegmentTxId = HdfsConstants.INVALID_TXID;
|
||||
private long nextTxId = HdfsConstants.INVALID_TXID;
|
||||
|
||||
private final String journalId;
|
||||
|
||||
private final JNStorage storage;
|
||||
|
||||
/**
|
||||
|
@ -102,12 +107,19 @@ class Journal implements Closeable {
|
|||
|
||||
private final FileJournalManager fjm;
|
||||
|
||||
Journal(File logDir, StorageErrorReporter errorReporter) throws IOException {
|
||||
private final JournalMetrics metrics;
|
||||
|
||||
|
||||
Journal(File logDir, String journalId,
|
||||
StorageErrorReporter errorReporter) throws IOException {
|
||||
storage = new JNStorage(logDir, errorReporter);
|
||||
this.journalId = journalId;
|
||||
|
||||
refreshCachedData();
|
||||
|
||||
this.fjm = storage.getJournalManager();
|
||||
|
||||
this.metrics = JournalMetrics.create(this);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -183,6 +195,10 @@ class Journal implements Closeable {
|
|||
JNStorage getStorage() {
|
||||
return storage;
|
||||
}
|
||||
|
||||
String getJournalId() {
|
||||
return journalId;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the last epoch which this node has promised not to accept
|
||||
|
@ -201,6 +217,11 @@ class Journal implements Closeable {
|
|||
synchronized long getCommittedTxnIdForTests() throws IOException {
|
||||
return committedTxnId.get();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
JournalMetrics getMetricsForTests() {
|
||||
return metrics;
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to create a new epoch for this journal.
|
||||
|
@ -279,13 +300,34 @@ class Journal implements Closeable {
|
|||
Preconditions.checkState(nextTxId == firstTxnId,
|
||||
"Can't write txid " + firstTxnId + " expecting nextTxId=" + nextTxId);
|
||||
|
||||
long lastTxnId = firstTxnId + numTxns - 1;
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Writing txid " + firstTxnId + "-" + (firstTxnId + numTxns - 1));
|
||||
LOG.trace("Writing txid " + firstTxnId + "-" + lastTxnId);
|
||||
}
|
||||
|
||||
curSegment.writeRaw(records, 0, records.length);
|
||||
curSegment.setReadyToFlush();
|
||||
Stopwatch sw = new Stopwatch();
|
||||
sw.start();
|
||||
curSegment.flush();
|
||||
sw.stop();
|
||||
|
||||
metrics.addSync(sw.elapsedTime(TimeUnit.MICROSECONDS));
|
||||
|
||||
if (committedTxnId.get() > lastTxnId) {
|
||||
// This batch of edits has already been committed on a quorum of other
|
||||
// nodes. So, we are in "catch up" mode. This gets its own metric.
|
||||
metrics.batchesWrittenWhileLagging.incr(1);
|
||||
metrics.currentLagTxns.set(committedTxnId.get() - lastTxnId);
|
||||
} else {
|
||||
metrics.currentLagTxns.set(0L);
|
||||
}
|
||||
|
||||
metrics.batchesWritten.incr(1);
|
||||
metrics.bytesWritten.incr(records.length);
|
||||
metrics.txnsWritten.incr(numTxns);
|
||||
metrics.lastWrittenTxId.set(lastTxnId);
|
||||
|
||||
nextTxId += numTxns;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,107 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.qjournal.server;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
||||
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
||||
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
|
||||
import org.apache.hadoop.metrics2.lib.MutableQuantiles;
|
||||
|
||||
@Metrics(about="Journal metrics", context="dfs")
|
||||
class JournalMetrics {
|
||||
final MetricsRegistry registry = new MetricsRegistry("JournalNode");
|
||||
|
||||
@Metric("Number of batches written since startup")
|
||||
MutableCounterLong batchesWritten;
|
||||
|
||||
@Metric("Number of txns written since startup")
|
||||
MutableCounterLong txnsWritten;
|
||||
|
||||
@Metric("Number of bytes written since startup")
|
||||
MutableCounterLong bytesWritten;
|
||||
|
||||
@Metric("Number of batches written where this node was lagging")
|
||||
MutableCounterLong batchesWrittenWhileLagging;
|
||||
|
||||
private final int[] QUANTILE_INTERVALS = new int[] {
|
||||
1*60, // 1m
|
||||
5*60, // 5m
|
||||
60*60 // 1h
|
||||
};
|
||||
|
||||
MutableQuantiles[] syncsQuantiles;
|
||||
|
||||
@Metric("Transaction lag behind the most recent commit")
|
||||
MutableGaugeLong currentLagTxns;
|
||||
|
||||
@Metric("Last written txid")
|
||||
MutableGaugeLong lastWrittenTxId;
|
||||
|
||||
private final Journal journal;
|
||||
|
||||
JournalMetrics(Journal journal) {
|
||||
this.journal = journal;
|
||||
|
||||
syncsQuantiles = new MutableQuantiles[QUANTILE_INTERVALS.length];
|
||||
for (int i = 0; i < syncsQuantiles.length; i++) {
|
||||
int interval = QUANTILE_INTERVALS[i];
|
||||
syncsQuantiles[i] = registry.newQuantiles(
|
||||
"syncs" + interval + "s",
|
||||
"Journal sync time", "ops", "latencyMicros", interval);
|
||||
}
|
||||
}
|
||||
|
||||
public static JournalMetrics create(Journal j) {
|
||||
JournalMetrics m = new JournalMetrics(j);
|
||||
return DefaultMetricsSystem.instance().register(
|
||||
m.getName(), null, m);
|
||||
}
|
||||
|
||||
String getName() {
|
||||
return "Journal-" + journal.getJournalId();
|
||||
}
|
||||
|
||||
@Metric("Current writer's epoch")
|
||||
public long getLastWriterEpoch() {
|
||||
try {
|
||||
return journal.getLastWriterEpoch();
|
||||
} catch (IOException e) {
|
||||
return -1L;
|
||||
}
|
||||
}
|
||||
|
||||
@Metric("Last accepted epoch")
|
||||
public long getLastPromisedEpoch() {
|
||||
try {
|
||||
return journal.getLastPromisedEpoch();
|
||||
} catch (IOException e) {
|
||||
return -1L;
|
||||
}
|
||||
}
|
||||
|
||||
void addSync(long us) {
|
||||
for (MutableQuantiles q : syncsQuantiles) {
|
||||
q.add(us);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -73,7 +73,7 @@ public class JournalNode implements Tool, Configurable {
|
|||
if (journal == null) {
|
||||
File logDir = getLogDir(jid);
|
||||
LOG.info("Initializing journal in directory " + logDir);
|
||||
journal = new Journal(logDir, new ErrorReporter());
|
||||
journal = new Journal(logDir, jid, new ErrorReporter());
|
||||
journalsById.put(jid, journal);
|
||||
}
|
||||
|
||||
|
|
|
@ -61,7 +61,7 @@ public class TestJournal {
|
|||
@Before
|
||||
public void setup() throws Exception {
|
||||
FileUtil.fullyDelete(TEST_LOG_DIR);
|
||||
journal = new Journal(TEST_LOG_DIR, mockErrorReporter);
|
||||
journal = new Journal(TEST_LOG_DIR, JID, mockErrorReporter);
|
||||
journal.format(FAKE_NSINFO);
|
||||
}
|
||||
|
||||
|
@ -137,7 +137,7 @@ public class TestJournal {
|
|||
journal.close(); // close to unlock the storage dir
|
||||
|
||||
// Now re-instantiate, make sure history is still there
|
||||
journal = new Journal(TEST_LOG_DIR, mockErrorReporter);
|
||||
journal = new Journal(TEST_LOG_DIR, JID, mockErrorReporter);
|
||||
|
||||
// The storage info should be read, even if no writer has taken over.
|
||||
assertEquals(storageString,
|
||||
|
@ -189,7 +189,7 @@ public class TestJournal {
|
|||
|
||||
journal.newEpoch(FAKE_NSINFO, 1);
|
||||
try {
|
||||
new Journal(TEST_LOG_DIR, mockErrorReporter);
|
||||
new Journal(TEST_LOG_DIR, JID, mockErrorReporter);
|
||||
fail("Did not fail to create another journal in same dir");
|
||||
} catch (IOException ioe) {
|
||||
GenericTestUtils.assertExceptionContains(
|
||||
|
@ -200,7 +200,7 @@ public class TestJournal {
|
|||
|
||||
// Journal should no longer be locked after the close() call.
|
||||
// Hence, should be able to create a new Journal in the same dir.
|
||||
Journal journal2 = new Journal(TEST_LOG_DIR, mockErrorReporter);
|
||||
Journal journal2 = new Journal(TEST_LOG_DIR, JID, mockErrorReporter);
|
||||
journal2.newEpoch(FAKE_NSINFO, 2);
|
||||
}
|
||||
|
||||
|
@ -227,7 +227,7 @@ public class TestJournal {
|
|||
// Check that, even if we re-construct the journal by scanning the
|
||||
// disk, we don't allow finalizing incorrectly.
|
||||
journal.close();
|
||||
journal = new Journal(TEST_LOG_DIR, mockErrorReporter);
|
||||
journal = new Journal(TEST_LOG_DIR, JID, mockErrorReporter);
|
||||
|
||||
try {
|
||||
journal.finalizeLogSegment(makeRI(4), 1, 6);
|
||||
|
|
|
@ -40,8 +40,10 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRe
|
|||
import org.apache.hadoop.hdfs.qjournal.server.Journal;
|
||||
import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.test.MetricsAsserts;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -86,12 +88,22 @@ public class TestJournalNode {
|
|||
|
||||
@Test
|
||||
public void testJournal() throws Exception {
|
||||
MetricsRecordBuilder metrics = MetricsAsserts.getMetrics(
|
||||
journal.getMetricsForTests().getName());
|
||||
MetricsAsserts.assertCounter("BatchesWritten", 0L, metrics);
|
||||
MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics);
|
||||
|
||||
IPCLoggerChannel ch = new IPCLoggerChannel(
|
||||
conf, FAKE_NSINFO, JID, jn.getBoundIpcAddress());
|
||||
ch.newEpoch(1).get();
|
||||
ch.setEpoch(1);
|
||||
ch.startLogSegment(1).get();
|
||||
ch.sendEdits(1L, 1, 1, "hello".getBytes(Charsets.UTF_8)).get();
|
||||
|
||||
metrics = MetricsAsserts.getMetrics(
|
||||
journal.getMetricsForTests().getName());
|
||||
MetricsAsserts.assertCounter("BatchesWritten", 1L, metrics);
|
||||
MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics);
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue