MAPREDUCE-6526. Remove usage of metrics v1 from hadoop-mapreduce. (aajisaka)
This commit is contained in:
parent
3ff0510ffd
commit
4ee4e5ca2b
|
@ -768,7 +768,7 @@ public class LocalJobRunner implements ClientProtocol {
|
|||
public LocalJobRunner(JobConf conf) throws IOException {
|
||||
this.fs = FileSystem.getLocal(conf);
|
||||
this.conf = conf;
|
||||
myMetrics = new LocalJobRunnerMetrics(new JobConf(conf));
|
||||
myMetrics = LocalJobRunnerMetrics.create();
|
||||
}
|
||||
|
||||
// JobSubmissionProtocol methods
|
||||
|
|
|
@ -17,82 +17,50 @@
|
|||
*/
|
||||
package org.apache.hadoop.mapred;
|
||||
|
||||
import org.apache.hadoop.metrics.MetricsContext;
|
||||
import org.apache.hadoop.metrics.MetricsRecord;
|
||||
import org.apache.hadoop.metrics.MetricsUtil;
|
||||
import org.apache.hadoop.metrics.Updater;
|
||||
import org.apache.hadoop.metrics.jvm.JvmMetrics;
|
||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
|
||||
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
class LocalJobRunnerMetrics implements Updater {
|
||||
private final MetricsRecord metricsRecord;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
private int numMapTasksLaunched = 0;
|
||||
private int numMapTasksCompleted = 0;
|
||||
private int numReduceTasksLaunched = 0;
|
||||
private int numReduceTasksCompleted = 0;
|
||||
private int numWaitingMaps = 0;
|
||||
private int numWaitingReduces = 0;
|
||||
|
||||
public LocalJobRunnerMetrics(JobConf conf) {
|
||||
String sessionId = conf.getSessionId();
|
||||
// Initiate JVM Metrics
|
||||
JvmMetrics.init("JobTracker", sessionId);
|
||||
// Create a record for map-reduce metrics
|
||||
MetricsContext context = MetricsUtil.getContext("mapred");
|
||||
// record name is jobtracker for compatibility
|
||||
metricsRecord = MetricsUtil.createRecord(context, "jobtracker");
|
||||
metricsRecord.setTag("sessionId", sessionId);
|
||||
context.registerUpdater(this);
|
||||
@Metrics(name="LocalJobRunnerMetrics", context="mapred")
|
||||
final class LocalJobRunnerMetrics {
|
||||
|
||||
@Metric
|
||||
private MutableCounterInt numMapTasksLaunched;
|
||||
@Metric
|
||||
private MutableCounterInt numMapTasksCompleted;
|
||||
@Metric
|
||||
private MutableCounterInt numReduceTasksLaunched;
|
||||
@Metric
|
||||
private MutableGaugeInt numReduceTasksCompleted;
|
||||
|
||||
private LocalJobRunnerMetrics() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Since this object is a registered updater, this method will be called
|
||||
* periodically, e.g. every 5 seconds.
|
||||
*/
|
||||
public void doUpdates(MetricsContext unused) {
|
||||
synchronized (this) {
|
||||
metricsRecord.incrMetric("maps_launched", numMapTasksLaunched);
|
||||
metricsRecord.incrMetric("maps_completed", numMapTasksCompleted);
|
||||
metricsRecord.incrMetric("reduces_launched", numReduceTasksLaunched);
|
||||
metricsRecord.incrMetric("reduces_completed", numReduceTasksCompleted);
|
||||
metricsRecord.incrMetric("waiting_maps", numWaitingMaps);
|
||||
metricsRecord.incrMetric("waiting_reduces", numWaitingReduces);
|
||||
|
||||
numMapTasksLaunched = 0;
|
||||
numMapTasksCompleted = 0;
|
||||
numReduceTasksLaunched = 0;
|
||||
numReduceTasksCompleted = 0;
|
||||
numWaitingMaps = 0;
|
||||
numWaitingReduces = 0;
|
||||
}
|
||||
metricsRecord.update();
|
||||
public static LocalJobRunnerMetrics create() {
|
||||
MetricsSystem ms = DefaultMetricsSystem.initialize("JobTracker");
|
||||
return ms.register("LocalJobRunnerMetrics-" +
|
||||
ThreadLocalRandom.current().nextInt(), null,
|
||||
new LocalJobRunnerMetrics());
|
||||
}
|
||||
|
||||
public synchronized void launchMap(TaskAttemptID taskAttemptID) {
|
||||
++numMapTasksLaunched;
|
||||
decWaitingMaps(taskAttemptID.getJobID(), 1);
|
||||
numMapTasksLaunched.incr();
|
||||
}
|
||||
|
||||
public synchronized void completeMap(TaskAttemptID taskAttemptID) {
|
||||
++numMapTasksCompleted;
|
||||
public void completeMap(TaskAttemptID taskAttemptID) {
|
||||
numMapTasksCompleted.incr();
|
||||
}
|
||||
|
||||
public synchronized void launchReduce(TaskAttemptID taskAttemptID) {
|
||||
++numReduceTasksLaunched;
|
||||
decWaitingReduces(taskAttemptID.getJobID(), 1);
|
||||
numReduceTasksLaunched.incr();
|
||||
}
|
||||
|
||||
public synchronized void completeReduce(TaskAttemptID taskAttemptID) {
|
||||
++numReduceTasksCompleted;
|
||||
public void completeReduce(TaskAttemptID taskAttemptID) {
|
||||
numReduceTasksCompleted.incr();
|
||||
}
|
||||
|
||||
private synchronized void decWaitingMaps(JobID id, int task) {
|
||||
numWaitingMaps -= task;
|
||||
}
|
||||
|
||||
private synchronized void decWaitingReduces(JobID id, int task){
|
||||
numWaitingReduces -= task;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -68,7 +68,7 @@ public class Shuffle<K, V> implements ShuffleConsumerPlugin<K, V>, ExceptionRepo
|
|||
this.jobConf = context.getJobConf();
|
||||
this.umbilical = context.getUmbilical();
|
||||
this.reporter = context.getReporter();
|
||||
this.metrics = new ShuffleClientMetrics(reduceId, jobConf);
|
||||
this.metrics = ShuffleClientMetrics.create();
|
||||
this.copyPhase = context.getCopyPhase();
|
||||
this.taskStatus = context.getStatus();
|
||||
this.reduceTask = context.getReduceTask();
|
||||
|
|
|
@ -20,70 +20,53 @@ package org.apache.hadoop.mapreduce.task.reduce;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||
import org.apache.hadoop.metrics.MetricsContext;
|
||||
import org.apache.hadoop.metrics.MetricsRecord;
|
||||
import org.apache.hadoop.metrics.MetricsUtil;
|
||||
import org.apache.hadoop.metrics.Updater;
|
||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
|
||||
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
||||
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
|
||||
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
@InterfaceAudience.LimitedPrivate({"MapReduce"})
|
||||
@InterfaceStability.Unstable
|
||||
public class ShuffleClientMetrics implements Updater {
|
||||
@Metrics(name="ShuffleClientMetrics", context="mapred")
|
||||
public class ShuffleClientMetrics {
|
||||
|
||||
private MetricsRecord shuffleMetrics = null;
|
||||
private int numFailedFetches = 0;
|
||||
private int numSuccessFetches = 0;
|
||||
private long numBytes = 0;
|
||||
private int numThreadsBusy = 0;
|
||||
private final int numCopiers;
|
||||
|
||||
ShuffleClientMetrics(TaskAttemptID reduceId, JobConf jobConf) {
|
||||
this.numCopiers = jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
|
||||
@Metric
|
||||
private MutableCounterInt numFailedFetches;
|
||||
@Metric
|
||||
private MutableCounterInt numSuccessFetches;
|
||||
@Metric
|
||||
private MutableCounterLong numBytes;
|
||||
@Metric
|
||||
private MutableGaugeInt numThreadsBusy;
|
||||
|
||||
MetricsContext metricsContext = MetricsUtil.getContext("mapred");
|
||||
this.shuffleMetrics =
|
||||
MetricsUtil.createRecord(metricsContext, "shuffleInput");
|
||||
this.shuffleMetrics.setTag("user", jobConf.getUser());
|
||||
this.shuffleMetrics.setTag("jobName", jobConf.getJobName());
|
||||
this.shuffleMetrics.setTag("jobId", reduceId.getJobID().toString());
|
||||
this.shuffleMetrics.setTag("taskId", reduceId.toString());
|
||||
this.shuffleMetrics.setTag("sessionId", jobConf.getSessionId());
|
||||
metricsContext.registerUpdater(this);
|
||||
private ShuffleClientMetrics() {
|
||||
}
|
||||
public synchronized void inputBytes(long numBytes) {
|
||||
this.numBytes += numBytes;
|
||||
|
||||
public static ShuffleClientMetrics create() {
|
||||
MetricsSystem ms = DefaultMetricsSystem.initialize("JobTracker");
|
||||
return ms.register("ShuffleClientMetrics-" +
|
||||
ThreadLocalRandom.current().nextInt(), null,
|
||||
new ShuffleClientMetrics());
|
||||
}
|
||||
public synchronized void failedFetch() {
|
||||
++numFailedFetches;
|
||||
|
||||
public void inputBytes(long bytes) {
|
||||
numBytes.incr(bytes);
|
||||
}
|
||||
public synchronized void successFetch() {
|
||||
++numSuccessFetches;
|
||||
public void failedFetch() {
|
||||
numFailedFetches.incr();
|
||||
}
|
||||
public synchronized void threadBusy() {
|
||||
++numThreadsBusy;
|
||||
public void successFetch() {
|
||||
numSuccessFetches.incr();
|
||||
}
|
||||
public synchronized void threadFree() {
|
||||
--numThreadsBusy;
|
||||
public void threadBusy() {
|
||||
numThreadsBusy.incr();
|
||||
}
|
||||
public void doUpdates(MetricsContext unused) {
|
||||
synchronized (this) {
|
||||
shuffleMetrics.incrMetric("shuffle_input_bytes", numBytes);
|
||||
shuffleMetrics.incrMetric("shuffle_failed_fetches",
|
||||
numFailedFetches);
|
||||
shuffleMetrics.incrMetric("shuffle_success_fetches",
|
||||
numSuccessFetches);
|
||||
if (numCopiers != 0) {
|
||||
shuffleMetrics.setMetric("shuffle_fetchers_busy_percent",
|
||||
100*((float)numThreadsBusy/numCopiers));
|
||||
} else {
|
||||
shuffleMetrics.setMetric("shuffle_fetchers_busy_percent", 0);
|
||||
}
|
||||
numBytes = 0;
|
||||
numSuccessFetches = 0;
|
||||
numFailedFetches = 0;
|
||||
}
|
||||
shuffleMetrics.update();
|
||||
public void threadFree() {
|
||||
numThreadsBusy.decr();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue