diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java index 37c147dea34..02b9a876227 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java @@ -768,7 +768,7 @@ public class LocalJobRunner implements ClientProtocol { public LocalJobRunner(JobConf conf) throws IOException { this.fs = FileSystem.getLocal(conf); this.conf = conf; - myMetrics = new LocalJobRunnerMetrics(new JobConf(conf)); + myMetrics = LocalJobRunnerMetrics.create(); } // JobSubmissionProtocol methods diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerMetrics.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerMetrics.java index aec70edefc2..0186cdc9d27 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerMetrics.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerMetrics.java @@ -17,82 +17,50 @@ */ package org.apache.hadoop.mapred; -import org.apache.hadoop.metrics.MetricsContext; -import org.apache.hadoop.metrics.MetricsRecord; -import org.apache.hadoop.metrics.MetricsUtil; -import org.apache.hadoop.metrics.Updater; -import org.apache.hadoop.metrics.jvm.JvmMetrics; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MutableCounterInt; +import org.apache.hadoop.metrics2.lib.MutableGaugeInt; -@SuppressWarnings("deprecation") -class LocalJobRunnerMetrics implements Updater { - private final MetricsRecord metricsRecord; +import java.util.concurrent.ThreadLocalRandom; - private int numMapTasksLaunched = 0; - private int numMapTasksCompleted = 0; - private int numReduceTasksLaunched = 0; - private int numReduceTasksCompleted = 0; - private int numWaitingMaps = 0; - private int numWaitingReduces = 0; - - public LocalJobRunnerMetrics(JobConf conf) { - String sessionId = conf.getSessionId(); - // Initiate JVM Metrics - JvmMetrics.init("JobTracker", sessionId); - // Create a record for map-reduce metrics - MetricsContext context = MetricsUtil.getContext("mapred"); - // record name is jobtracker for compatibility - metricsRecord = MetricsUtil.createRecord(context, "jobtracker"); - metricsRecord.setTag("sessionId", sessionId); - context.registerUpdater(this); +@Metrics(name="LocalJobRunnerMetrics", context="mapred") +final class LocalJobRunnerMetrics { + + @Metric + private MutableCounterInt numMapTasksLaunched; + @Metric + private MutableCounterInt numMapTasksCompleted; + @Metric + private MutableCounterInt numReduceTasksLaunched; + @Metric + private MutableGaugeInt numReduceTasksCompleted; + + private LocalJobRunnerMetrics() { } - - /** - * Since this object is a registered updater, this method will be called - * periodically, e.g. every 5 seconds. - */ - public void doUpdates(MetricsContext unused) { - synchronized (this) { - metricsRecord.incrMetric("maps_launched", numMapTasksLaunched); - metricsRecord.incrMetric("maps_completed", numMapTasksCompleted); - metricsRecord.incrMetric("reduces_launched", numReduceTasksLaunched); - metricsRecord.incrMetric("reduces_completed", numReduceTasksCompleted); - metricsRecord.incrMetric("waiting_maps", numWaitingMaps); - metricsRecord.incrMetric("waiting_reduces", numWaitingReduces); - numMapTasksLaunched = 0; - numMapTasksCompleted = 0; - numReduceTasksLaunched = 0; - numReduceTasksCompleted = 0; - numWaitingMaps = 0; - numWaitingReduces = 0; - } - metricsRecord.update(); + public static LocalJobRunnerMetrics create() { + MetricsSystem ms = DefaultMetricsSystem.initialize("JobTracker"); + return ms.register("LocalJobRunnerMetrics-" + + ThreadLocalRandom.current().nextInt(), null, + new LocalJobRunnerMetrics()); } public synchronized void launchMap(TaskAttemptID taskAttemptID) { - ++numMapTasksLaunched; - decWaitingMaps(taskAttemptID.getJobID(), 1); + numMapTasksLaunched.incr(); } - public synchronized void completeMap(TaskAttemptID taskAttemptID) { - ++numMapTasksCompleted; + public void completeMap(TaskAttemptID taskAttemptID) { + numMapTasksCompleted.incr(); } public synchronized void launchReduce(TaskAttemptID taskAttemptID) { - ++numReduceTasksLaunched; - decWaitingReduces(taskAttemptID.getJobID(), 1); + numReduceTasksLaunched.incr(); } - public synchronized void completeReduce(TaskAttemptID taskAttemptID) { - ++numReduceTasksCompleted; + public void completeReduce(TaskAttemptID taskAttemptID) { + numReduceTasksCompleted.incr(); } - - private synchronized void decWaitingMaps(JobID id, int task) { - numWaitingMaps -= task; - } - - private synchronized void decWaitingReduces(JobID id, int task){ - numWaitingReduces -= task; - } - } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java index 93f9a507b10..3382bbf8436 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java @@ -68,7 +68,7 @@ public class Shuffle implements ShuffleConsumerPlugin, ExceptionRepo this.jobConf = context.getJobConf(); this.umbilical = context.getUmbilical(); this.reporter = context.getReporter(); - this.metrics = new ShuffleClientMetrics(reduceId, jobConf); + this.metrics = ShuffleClientMetrics.create(); this.copyPhase = context.getCopyPhase(); this.taskStatus = context.getStatus(); this.reduceTask = context.getReduceTask(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java index 92c69a60a5a..d4e185df6f3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java @@ -20,70 +20,53 @@ package org.apache.hadoop.mapreduce.task.reduce; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.metrics.MetricsContext; -import org.apache.hadoop.metrics.MetricsRecord; -import org.apache.hadoop.metrics.MetricsUtil; -import org.apache.hadoop.metrics.Updater; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MutableCounterInt; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableGaugeInt; + +import java.util.concurrent.ThreadLocalRandom; @InterfaceAudience.LimitedPrivate({"MapReduce"}) @InterfaceStability.Unstable -public class ShuffleClientMetrics implements Updater { +@Metrics(name="ShuffleClientMetrics", context="mapred") +public class ShuffleClientMetrics { - private MetricsRecord shuffleMetrics = null; - private int numFailedFetches = 0; - private int numSuccessFetches = 0; - private long numBytes = 0; - private int numThreadsBusy = 0; - private final int numCopiers; - - ShuffleClientMetrics(TaskAttemptID reduceId, JobConf jobConf) { - this.numCopiers = jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5); + @Metric + private MutableCounterInt numFailedFetches; + @Metric + private MutableCounterInt numSuccessFetches; + @Metric + private MutableCounterLong numBytes; + @Metric + private MutableGaugeInt numThreadsBusy; - MetricsContext metricsContext = MetricsUtil.getContext("mapred"); - this.shuffleMetrics = - MetricsUtil.createRecord(metricsContext, "shuffleInput"); - this.shuffleMetrics.setTag("user", jobConf.getUser()); - this.shuffleMetrics.setTag("jobName", jobConf.getJobName()); - this.shuffleMetrics.setTag("jobId", reduceId.getJobID().toString()); - this.shuffleMetrics.setTag("taskId", reduceId.toString()); - this.shuffleMetrics.setTag("sessionId", jobConf.getSessionId()); - metricsContext.registerUpdater(this); + private ShuffleClientMetrics() { } - public synchronized void inputBytes(long numBytes) { - this.numBytes += numBytes; + + public static ShuffleClientMetrics create() { + MetricsSystem ms = DefaultMetricsSystem.initialize("JobTracker"); + return ms.register("ShuffleClientMetrics-" + + ThreadLocalRandom.current().nextInt(), null, + new ShuffleClientMetrics()); } - public synchronized void failedFetch() { - ++numFailedFetches; + + public void inputBytes(long bytes) { + numBytes.incr(bytes); } - public synchronized void successFetch() { - ++numSuccessFetches; + public void failedFetch() { + numFailedFetches.incr(); } - public synchronized void threadBusy() { - ++numThreadsBusy; + public void successFetch() { + numSuccessFetches.incr(); } - public synchronized void threadFree() { - --numThreadsBusy; + public void threadBusy() { + numThreadsBusy.incr(); } - public void doUpdates(MetricsContext unused) { - synchronized (this) { - shuffleMetrics.incrMetric("shuffle_input_bytes", numBytes); - shuffleMetrics.incrMetric("shuffle_failed_fetches", - numFailedFetches); - shuffleMetrics.incrMetric("shuffle_success_fetches", - numSuccessFetches); - if (numCopiers != 0) { - shuffleMetrics.setMetric("shuffle_fetchers_busy_percent", - 100*((float)numThreadsBusy/numCopiers)); - } else { - shuffleMetrics.setMetric("shuffle_fetchers_busy_percent", 0); - } - numBytes = 0; - numSuccessFetches = 0; - numFailedFetches = 0; - } - shuffleMetrics.update(); + public void threadFree() { + numThreadsBusy.decr(); } }