MAPREDUCE-5650. Job fails when hprof mapreduce.task.profile.map/reduce.params is specified (Gera Shegalov via Sandy Ryza)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1559201 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sanford Ryza 2014-01-17 17:43:00 +00:00
parent 67cbde3008
commit f667371746
4 changed files with 277 additions and 13 deletions

View File

@ -334,6 +334,9 @@ Release 2.3.0 - UNRELEASED
MAPREDUCE-5674. Missing start and finish time in mapred.JobStatus.
(Chuan Liu via cnauroth)
MAPREDUCE-5650. Job fails when hprof mapreduce.task.profile.map/reduce.params
is specified (Gera Shegalov via Sandy Ryza)
Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES

View File

@ -212,19 +212,11 @@ public class MapReduceChildJVM {
if (conf.getProfileEnabled()) {
if (conf.getProfileTaskRange(task.isMapTask()
).isIncluded(task.getPartition())) {
vargs.add(
String.format(
conf.getProfileParams(),
getTaskLogFile(TaskLog.LogName.PROFILE)
)
);
if (task.isMapTask()) {
vargs.add(conf.get(MRJobConfig.TASK_MAP_PROFILE_PARAMS, ""));
}
else {
vargs.add(conf.get(MRJobConfig.TASK_REDUCE_PROFILE_PARAMS, ""));
}
final String profileParams = conf.get(task.isMapTask()
? MRJobConfig.TASK_MAP_PROFILE_PARAMS
: MRJobConfig.TASK_REDUCE_PROFILE_PARAMS, conf.getProfileParams());
vargs.add(String.format(profileParams,
getTaskLogFile(TaskLog.LogName.PROFILE)));
}
}

View File

@ -602,6 +602,31 @@
</description>
</property>
<property>
<name>mapreduce.task.profile.params</name>
<value></value>
<description>JVM profiler parameters used to profile map and reduce task
attempts. This string may contain a single format specifier %s that will
be replaced by the path to profile.out in the task attempt log directory.
To specify different profiling options for map tasks and reduce tasks,
more specific parameters mapreduce.task.profile.map.params and
mapreduce.task.profile.reduce.params should be used.</description>
</property>
<property>
<name>mapreduce.task.profile.map.params</name>
<value>${mapreduce.task.profile.params}</value>
<description>Map-task-specific JVM profiler parameters. See
mapreduce.task.profile.params</description>
</property>
<property>
<name>mapreduce.task.profile.reduce.params</name>
<value>${mapreduce.task.profile.params}</value>
<description>Reduce-task-specific JVM profiler parameters. See
mapreduce.task.profile.params</description>
</property>
<property>
<name>mapreduce.task.skip.start.attempts</name>
<value>2</value>

View File

@ -0,0 +1,244 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2;
import java.io.*;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.SleepJob;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestMRJobsWithProfiler {
private static final Log LOG =
LogFactory.getLog(TestMRJobsWithProfiler.class);
private static final EnumSet<RMAppState> TERMINAL_RM_APP_STATES =
EnumSet.of(RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED);
private static MiniMRYarnCluster mrCluster;
private static final Configuration CONF = new Configuration();
private static final FileSystem localFs;
static {
try {
localFs = FileSystem.getLocal(CONF);
} catch (IOException io) {
throw new RuntimeException("problem getting local fs", io);
}
}
private static final Path TEST_ROOT_DIR =
new Path("target", TestMRJobs.class.getName() + "-tmpDir").
makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
private static final Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar");
@Before
public void setup() throws InterruptedException, IOException {
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
return;
}
if (mrCluster == null) {
mrCluster = new MiniMRYarnCluster(getClass().getName());
mrCluster.init(CONF);
mrCluster.start();
}
// Copy MRAppJar and make it private. TODO: FIXME. This is a hack to
// workaround the absent public discache.
localFs.copyFromLocalFile(new Path(MiniMRYarnCluster.APPJAR), APP_JAR);
localFs.setPermission(APP_JAR, new FsPermission("700"));
}
@After
public void tearDown() {
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
return;
}
if (mrCluster != null) {
mrCluster.stop();
}
}
@Test (timeout = 120000)
public void testProfiler() throws IOException, InterruptedException,
ClassNotFoundException {
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
return;
}
final SleepJob sleepJob = new SleepJob();
final JobConf sleepConf = new JobConf(mrCluster.getConfig());
sleepConf.setProfileEnabled(true);
// profile map split 1
sleepConf.setProfileTaskRange(true, "1");
// profile reduce of map output partitions 1
sleepConf.setProfileTaskRange(false, "1");
// use hprof for map to profile.out
sleepConf.set(MRJobConfig.TASK_MAP_PROFILE_PARAMS,
"-agentlib:hprof=cpu=times,heap=sites,force=n,thread=y,verbose=n,"
+ "file=%s");
// use Xprof for reduce to stdout
sleepConf.set(MRJobConfig.TASK_REDUCE_PROFILE_PARAMS, "-Xprof");
sleepJob.setConf(sleepConf);
// 2-map-2-reduce SleepJob
final Job job = sleepJob.createJob(2, 2, 500, 1, 500, 1);
job.setJarByClass(SleepJob.class);
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
job.waitForCompletion(true);
final JobId jobId = TypeConverter.toYarn(job.getJobID());
final ApplicationId appID = jobId.getAppId();
int pollElapsed = 0;
while (true) {
Thread.sleep(1000);
pollElapsed += 1000;
if (TERMINAL_RM_APP_STATES.contains(
mrCluster.getResourceManager().getRMContext().getRMApps().get(appID)
.getState())) {
break;
}
if (pollElapsed >= 60000) {
LOG.warn("application did not reach terminal state within 60 seconds");
break;
}
}
Assert.assertEquals(RMAppState.FINISHED, mrCluster.getResourceManager()
.getRMContext().getRMApps().get(appID).getState());
// Job finished, verify logs
//
final Configuration nmConf = mrCluster.getNodeManager(0).getConfig();
final String appIdStr = appID.toString();
final String appIdSuffix = appIdStr.substring(
"application_".length(), appIdStr.length());
final String containerGlob = "container_" + appIdSuffix + "_*_*";
final Map<TaskAttemptID,Path> taLogDirs = new HashMap<TaskAttemptID,Path>();
final Pattern taskPattern = Pattern.compile(
".*Task:(attempt_"
+ appIdSuffix + "_[rm]_" + "[0-9]+_[0-9]+).*");
for (String logDir :
nmConf.getTrimmedStrings(YarnConfiguration.NM_LOG_DIRS))
{
// filter out MRAppMaster and create attemptId->logDir map
//
for (FileStatus fileStatus :
localFs.globStatus(new Path(logDir
+ Path.SEPARATOR + appIdStr
+ Path.SEPARATOR + containerGlob
+ Path.SEPARATOR + TaskLog.LogName.SYSLOG)))
{
final BufferedReader br = new BufferedReader(
new InputStreamReader(localFs.open(fileStatus.getPath())));
String line;
while ((line = br.readLine()) != null) {
final Matcher m = taskPattern.matcher(line);
if (m.matches()) {
// found Task done message
taLogDirs.put(TaskAttemptID.forName(m.group(1)),
fileStatus.getPath().getParent());
break;
}
}
br.close();
}
}
Assert.assertEquals(4, taLogDirs.size()); // all 4 attempts found
for (Map.Entry<TaskAttemptID,Path> dirEntry : taLogDirs.entrySet()) {
final TaskAttemptID tid = dirEntry.getKey();
final Path profilePath = new Path(dirEntry.getValue(),
TaskLog.LogName.PROFILE.toString());
final Path stdoutPath = new Path(dirEntry.getValue(),
TaskLog.LogName.STDOUT.toString());
if (tid.getTaskType() == TaskType.MAP) {
if (tid.getTaskID().getId() == 1) {
// verify profile.out
final BufferedReader br = new BufferedReader(new InputStreamReader(
localFs.open(profilePath)));
final String line = br.readLine();
Assert.assertTrue("No hprof content found!",
line !=null && line.startsWith("JAVA PROFILE"));
br.close();
Assert.assertEquals(0L, localFs.getFileStatus(stdoutPath).getLen());
} else {
Assert.assertFalse("hprof file should not exist",
localFs.exists(profilePath));
}
} else {
Assert.assertFalse("hprof file should not exist",
localFs.exists(profilePath));
if (tid.getTaskID().getId() == 1) {
final BufferedReader br = new BufferedReader(new InputStreamReader(
localFs.open(stdoutPath)));
boolean flatProfFound = false;
String line;
while ((line = br.readLine()) != null) {
if (line.startsWith("Flat profile")) {
flatProfFound = true;
break;
}
}
br.close();
Assert.assertTrue("Xprof flat profile not found!", flatProfFound);
} else {
Assert.assertEquals(0L, localFs.getFileStatus(stdoutPath).getLen());
}
}
}
}
}