diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileParser.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileParser.java new file mode 100644 index 00000000000..c290cd64f5e --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileParser.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; + +class JobHistoryFileParser { + private static final Log LOG = LogFactory.getLog(JobHistoryFileParser.class); + + private final FileSystem fs; + + public JobHistoryFileParser(FileSystem fs) { + LOG.info("JobHistoryFileParser created with " + fs); + this.fs = fs; + } + + public JobInfo parseHistoryFile(Path path) throws IOException { + LOG.info("parsing job history file " + path); + JobHistoryParser parser = new JobHistoryParser(fs, path); + return parser.parse(); + } + + public Configuration parseConfiguration(Path path) throws IOException { + LOG.info("parsing job configuration file " + path); + Configuration conf = new Configuration(false); + conf.addResource(fs.open(path)); + return conf; + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayHelper.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayHelper.java new file mode 100644 index 00000000000..8acd26eae5d --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayHelper.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.mapreduce.Mapper.Context; + +class JobHistoryFileReplayHelper { + private static final Log LOG = + LogFactory.getLog(JobHistoryFileReplayHelper.class); + static final String PROCESSING_PATH = "processing path"; + static final String REPLAY_MODE = "replay mode"; + static final int WRITE_ALL_AT_ONCE = 1; + static final int WRITE_PER_ENTITY = 2; + static final int REPLAY_MODE_DEFAULT = WRITE_ALL_AT_ONCE; + + private static Pattern JOB_ID_PARSER = + Pattern.compile("^(job_[0-9]+_([0-9]+)).*"); + private enum FileType { JOB_HISTORY_FILE, JOB_CONF_FILE, UNKNOWN }; + JobHistoryFileParser parser; + int replayMode; + Collection jobFiles; + + JobHistoryFileReplayHelper(Context context) throws IOException { + Configuration conf = context.getConfiguration(); + int taskId = context.getTaskAttemptID().getTaskID().getId(); + int size = conf.getInt(MRJobConfig.NUM_MAPS, + TimelineServicePerformance.NUM_MAPS_DEFAULT); + replayMode = conf.getInt(JobHistoryFileReplayHelper.REPLAY_MODE, + JobHistoryFileReplayHelper.REPLAY_MODE_DEFAULT); + String processingDir = + conf.get(JobHistoryFileReplayHelper.PROCESSING_PATH); + + Path processingPath = new Path(processingDir); + FileSystem processingFs = processingPath.getFileSystem(conf); + parser = new JobHistoryFileParser(processingFs); + jobFiles = selectJobFiles(processingFs, processingPath, taskId, size); + } + + public int getReplayMode() { + return replayMode; + } + + public Collection getJobFiles() { + return jobFiles; + } + + public JobHistoryFileParser getParser() { + return parser; + } + + public static class JobFiles { + private final String jobId; + private Path jobHistoryFilePath; + private Path jobConfFilePath; + + public JobFiles(String jobId) { + this.jobId = jobId; + } + + public String getJobId() { + return jobId; + } + + public Path getJobHistoryFilePath() { + return jobHistoryFilePath; + } + + public void setJobHistoryFilePath(Path jobHistoryFilePath) { + this.jobHistoryFilePath = jobHistoryFilePath; + } + + public Path getJobConfFilePath() { + return jobConfFilePath; + } + + public void setJobConfFilePath(Path jobConfFilePath) { + this.jobConfFilePath = jobConfFilePath; + } + + @Override + public int hashCode() { + return jobId.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + JobFiles other = (JobFiles) obj; + return jobId.equals(other.jobId); + } + } + + private Collection selectJobFiles(FileSystem fs, + Path processingRoot, int i, int size) throws IOException { + Map jobs = new HashMap<>(); + RemoteIterator it = fs.listFiles(processingRoot, true); + while (it.hasNext()) { + LocatedFileStatus status = it.next(); + Path path = status.getPath(); + String fileName = path.getName(); + Matcher m = JOB_ID_PARSER.matcher(fileName); + if (!m.matches()) { + continue; + } + String jobId = m.group(1); + int lastId = Integer.parseInt(m.group(2)); + int mod = lastId % size; + if (mod != i) { + continue; + } + LOG.info("this mapper will process file " + fileName); + // it's mine + JobFiles jobFiles = jobs.get(jobId); + if (jobFiles == null) { + jobFiles = new JobFiles(jobId); + jobs.put(jobId, jobFiles); + } + setFilePath(fileName, path, jobFiles); + } + return jobs.values(); + } + + private void setFilePath(String fileName, Path path, + JobFiles jobFiles) { + // determine if we're dealing with a job history file or a job conf file + FileType type = getFileType(fileName); + switch (type) { + case JOB_HISTORY_FILE: + if (jobFiles.getJobHistoryFilePath() == null) { + jobFiles.setJobHistoryFilePath(path); + } else { + LOG.warn("we already have the job history file " + + jobFiles.getJobHistoryFilePath() + ": skipping " + path); + } + break; + case JOB_CONF_FILE: + if (jobFiles.getJobConfFilePath() == null) { + jobFiles.setJobConfFilePath(path); + } else { + LOG.warn("we already have the job conf file " + + jobFiles.getJobConfFilePath() + ": skipping " + path); + } + break; + case UNKNOWN: + LOG.warn("unknown type: " + path); + } + } + + private FileType getFileType(String fileName) { + if (fileName.endsWith(".jhist")) { + return FileType.JOB_HISTORY_FILE; + } + if (fileName.endsWith("_conf.xml")) { + return FileType.JOB_CONF_FILE; + } + return FileType.UNKNOWN; + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV1.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV1.java new file mode 100644 index 00000000000..5e106622f53 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV1.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.Mapper.Context; +import org.apache.hadoop.mapreduce.TimelineServicePerformance.PerfCounters; +import org.apache.hadoop.mapreduce.JobHistoryFileReplayHelper; +import org.apache.hadoop.mapreduce.JobHistoryFileReplayHelper.JobFiles; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; +import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl; +import org.apache.hadoop.yarn.exceptions.YarnException; + + +/** + * Mapper for TimelineServicePerformanceV1 that replays job history files to the + * timeline service. + * + */ +class JobHistoryFileReplayMapperV1 extends + org.apache.hadoop.mapreduce. + Mapper { + private static final Log LOG = + LogFactory.getLog(JobHistoryFileReplayMapperV1.class); + + public void map(IntWritable key, IntWritable val, Context context) throws IOException { + // collect the apps it needs to process + TimelineClient tlc = new TimelineClientImpl(); + TimelineEntityConverterV1 converter = new TimelineEntityConverterV1(); + JobHistoryFileReplayHelper helper = new JobHistoryFileReplayHelper(context); + int replayMode = helper.getReplayMode(); + Collection jobs = + helper.getJobFiles(); + JobHistoryFileParser parser = helper.getParser(); + + if (jobs.isEmpty()) { + LOG.info(context.getTaskAttemptID().getTaskID() + + " will process no jobs"); + } else { + LOG.info(context.getTaskAttemptID().getTaskID() + " will process " + + jobs.size() + " jobs"); + } + for (JobFiles job: jobs) { + // process each job + String jobIdStr = job.getJobId(); + LOG.info("processing " + jobIdStr + "..."); + JobId jobId = TypeConverter.toYarn(JobID.forName(jobIdStr)); + ApplicationId appId = jobId.getAppId(); + + try { + // parse the job info and configuration + Path historyFilePath = job.getJobHistoryFilePath(); + Path confFilePath = job.getJobConfFilePath(); + if ((historyFilePath == null) || (confFilePath == null)) { + continue; + } + JobInfo jobInfo = + parser.parseHistoryFile(historyFilePath); + Configuration jobConf = + parser.parseConfiguration(confFilePath); + LOG.info("parsed the job history file and the configuration file for job " + + jobIdStr); + + // create entities from job history and write them + long totalTime = 0; + Set entitySet = + converter.createTimelineEntities(jobInfo, jobConf); + LOG.info("converted them into timeline entities for job " + jobIdStr); + // use the current user for this purpose + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + long startWrite = System.nanoTime(); + try { + switch (replayMode) { + case JobHistoryFileReplayHelper.WRITE_ALL_AT_ONCE: + writeAllEntities(tlc, entitySet, ugi); + break; + case JobHistoryFileReplayHelper.WRITE_PER_ENTITY: + writePerEntity(tlc, entitySet, ugi); + break; + default: + break; + } + } catch (Exception e) { + context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_FAILURES). + increment(1); + LOG.error("writing to the timeline service failed", e); + } + long endWrite = System.nanoTime(); + totalTime += TimeUnit.NANOSECONDS.toMillis(endWrite-startWrite); + int numEntities = entitySet.size(); + LOG.info("wrote " + numEntities + " entities in " + totalTime + " ms"); + + context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_TIME). + increment(totalTime); + context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER). + increment(numEntities); + } finally { + context.progress(); // move it along + } + } + } + + private void writeAllEntities(TimelineClient tlc, + Set entitySet, UserGroupInformation ugi) + throws IOException, YarnException { + tlc.putEntities((TimelineEntity[])entitySet.toArray()); + } + + private void writePerEntity(TimelineClient tlc, + Set entitySet, UserGroupInformation ugi) + throws IOException, YarnException { + for (TimelineEntity entity : entitySet) { + tlc.putEntities(entity); + LOG.info("wrote entity " + entity.getEntityId()); + } + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV1.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV1.java new file mode 100644 index 00000000000..2c851e95869 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV1.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce; + +import java.io.IOException; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.Mapper.Context; +import org.apache.hadoop.mapreduce.TimelineServicePerformance.PerfCounters; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; +import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +/** + * Adds simple entities with random string payload, events, metrics, and + * configuration. + */ +class SimpleEntityWriterV1 extends + org.apache.hadoop.mapreduce.Mapper { + private static final Log LOG = LogFactory.getLog(SimpleEntityWriterV1.class); + + // constants for mtype = 1 + static final String KBS_SENT = "kbs sent"; + static final int KBS_SENT_DEFAULT = 1; + static final String TEST_TIMES = "testtimes"; + static final int TEST_TIMES_DEFAULT = 100; + static final String TIMELINE_SERVICE_PERFORMANCE_RUN_ID = + "timeline.server.performance.run.id"; + /** + * To ensure that the compression really gets exercised, generate a + * random alphanumeric fixed length payload + */ + private static char[] ALPHA_NUMS = new char[] { 'a', 'b', 'c', 'd', 'e', 'f', + 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', + 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', + 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', + 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '1', '2', + '3', '4', '5', '6', '7', '8', '9', '0', ' ' }; + + public void map(IntWritable key, IntWritable val, Context context) throws IOException { + TimelineClient tlc = new TimelineClientImpl(); + Configuration conf = context.getConfiguration(); + + final int kbs = conf.getInt(KBS_SENT, KBS_SENT_DEFAULT); + + long totalTime = 0; + final int testtimes = conf.getInt(TEST_TIMES, TEST_TIMES_DEFAULT); + final Random rand = new Random(); + final TaskAttemptID taskAttemptId = context.getTaskAttemptID(); + final char[] payLoad = new char[kbs * 1024]; + + for (int i = 0; i < testtimes; i++) { + // Generate a fixed length random payload + for (int xx = 0; xx < kbs * 1024; xx++) { + int alphaNumIdx = + rand.nextInt(ALPHA_NUMS.length); + payLoad[xx] = ALPHA_NUMS[alphaNumIdx]; + } + String entId = taskAttemptId + "_" + Integer.toString(i); + final TimelineEntity entity = new TimelineEntity(); + entity.setEntityId(entId); + entity.setEntityType("FOO_ATTEMPT"); + entity.addOtherInfo("PERF_TEST", payLoad); + // add an event + TimelineEvent event = new TimelineEvent(); + event.setTimestamp(System.currentTimeMillis()); + event.setEventType("foo_event"); + entity.addEvent(event); + + // use the current user for this purpose + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + long startWrite = System.nanoTime(); + try { + tlc.putEntities(entity); + } catch (Exception e) { + context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_FAILURES). + increment(1); + LOG.error("writing to the timeline service failed", e); + } + long endWrite = System.nanoTime(); + totalTime += TimeUnit.NANOSECONDS.toMillis(endWrite-startWrite); + } + LOG.info("wrote " + testtimes + " entities (" + kbs*testtimes + + " kB) in " + totalTime + " ms"); + context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_TIME). + increment(totalTime); + context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER). + increment(testtimes); + context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_KBS). + increment(kbs*testtimes); + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineEntityConverterV1.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineEntityConverterV1.java new file mode 100644 index 00000000000..79d123eb534 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineEntityConverterV1.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.CounterGroup; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; + +class TimelineEntityConverterV1 { + private static final Log LOG = + LogFactory.getLog(TimelineEntityConverterV1.class); + + static final String JOB = "MAPREDUCE_JOB"; + static final String TASK = "MAPREDUCE_TASK"; + static final String TASK_ATTEMPT = "MAPREDUCE_TASK_ATTEMPT"; + + /** + * Creates job, task, and task attempt entities based on the job history info + * and configuration. + * + * Note: currently these are plan timeline entities created for mapreduce + * types. These are not meant to be the complete and accurate entity set-up + * for mapreduce jobs. We do not leverage hierarchical timeline entities. If + * we create canonical mapreduce hierarchical timeline entities with proper + * parent-child relationship, we could modify this to use that instead. + * + * Note that we also do not add info to the YARN application entity, which + * would be needed for aggregation. + */ + public Set createTimelineEntities(JobInfo jobInfo, + Configuration conf) { + Set entities = new HashSet<>(); + + // create the job entity + TimelineEntity job = createJobEntity(jobInfo, conf); + entities.add(job); + + // create the task and task attempt entities + Set tasksAndAttempts = + createTaskAndTaskAttemptEntities(jobInfo); + entities.addAll(tasksAndAttempts); + + return entities; + } + + private TimelineEntity createJobEntity(JobInfo jobInfo, Configuration conf) { + TimelineEntity job = new TimelineEntity(); + job.setEntityType(JOB); + job.setEntityId(jobInfo.getJobId().toString()); + job.setStartTime(jobInfo.getSubmitTime()); + + job.addPrimaryFilter("JOBNAME", jobInfo.getJobname()); + job.addPrimaryFilter("USERNAME", jobInfo.getUsername()); + job.addOtherInfo("JOB_QUEUE_NAME", jobInfo.getJobQueueName()); + job.addOtherInfo("SUBMIT_TIME", jobInfo.getSubmitTime()); + job.addOtherInfo("LAUNCH_TIME", jobInfo.getLaunchTime()); + job.addOtherInfo("FINISH_TIME", jobInfo.getFinishTime()); + job.addOtherInfo("JOB_STATUS", jobInfo.getJobStatus()); + job.addOtherInfo("PRIORITY", jobInfo.getPriority()); + job.addOtherInfo("TOTAL_MAPS", jobInfo.getTotalMaps()); + job.addOtherInfo("TOTAL_REDUCES", jobInfo.getTotalReduces()); + job.addOtherInfo("UBERIZED", jobInfo.getUberized()); + job.addOtherInfo("ERROR_INFO", jobInfo.getErrorInfo()); + + LOG.info("converted job " + jobInfo.getJobId() + " to a timeline entity"); + return job; + } + + private Set createTaskAndTaskAttemptEntities(JobInfo jobInfo) { + Set entities = new HashSet<>(); + Map taskInfoMap = jobInfo.getAllTasks(); + LOG.info("job " + jobInfo.getJobId()+ " has " + taskInfoMap.size() + + " tasks"); + for (TaskInfo taskInfo: taskInfoMap.values()) { + TimelineEntity task = createTaskEntity(taskInfo); + entities.add(task); + // add the task attempts from this task + Set taskAttempts = createTaskAttemptEntities(taskInfo); + entities.addAll(taskAttempts); + } + return entities; + } + + private TimelineEntity createTaskEntity(TaskInfo taskInfo) { + TimelineEntity task = new TimelineEntity(); + task.setEntityType(TASK); + task.setEntityId(taskInfo.getTaskId().toString()); + task.setStartTime(taskInfo.getStartTime()); + + task.addOtherInfo("START_TIME", taskInfo.getStartTime()); + task.addOtherInfo("FINISH_TIME", taskInfo.getFinishTime()); + task.addOtherInfo("TASK_TYPE", taskInfo.getTaskType()); + task.addOtherInfo("TASK_STATUS", taskInfo.getTaskStatus()); + task.addOtherInfo("ERROR_INFO", taskInfo.getError()); + + LOG.info("converted task " + taskInfo.getTaskId() + + " to a timeline entity"); + return task; + } + + private Set createTaskAttemptEntities(TaskInfo taskInfo) { + Set taskAttempts = new HashSet(); + Map taskAttemptInfoMap = + taskInfo.getAllTaskAttempts(); + LOG.info("task " + taskInfo.getTaskId() + " has " + + taskAttemptInfoMap.size() + " task attempts"); + for (TaskAttemptInfo taskAttemptInfo: taskAttemptInfoMap.values()) { + TimelineEntity taskAttempt = createTaskAttemptEntity(taskAttemptInfo); + taskAttempts.add(taskAttempt); + } + return taskAttempts; + } + + private TimelineEntity createTaskAttemptEntity(TaskAttemptInfo taskAttemptInfo) { + TimelineEntity taskAttempt = new TimelineEntity(); + taskAttempt.setEntityType(TASK_ATTEMPT); + taskAttempt.setEntityId(taskAttemptInfo.getAttemptId().toString()); + taskAttempt.setStartTime(taskAttemptInfo.getStartTime()); + + taskAttempt.addOtherInfo("START_TIME", taskAttemptInfo.getStartTime()); + taskAttempt.addOtherInfo("FINISH_TIME", taskAttemptInfo.getFinishTime()); + taskAttempt.addOtherInfo("MAP_FINISH_TIME", + taskAttemptInfo.getMapFinishTime()); + taskAttempt.addOtherInfo("SHUFFLE_FINISH_TIME", + taskAttemptInfo.getShuffleFinishTime()); + taskAttempt.addOtherInfo("SORT_FINISH_TIME", + taskAttemptInfo.getSortFinishTime()); + taskAttempt.addOtherInfo("TASK_STATUS", taskAttemptInfo.getTaskStatus()); + taskAttempt.addOtherInfo("STATE", taskAttemptInfo.getState()); + taskAttempt.addOtherInfo("ERROR", taskAttemptInfo.getError()); + taskAttempt.addOtherInfo("CONTAINER_ID", + taskAttemptInfo.getContainerId().toString()); + + LOG.info("converted task attempt " + taskAttemptInfo.getAttemptId() + + " to a timeline entity"); + return taskAttempt; + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineServicePerformance.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineServicePerformance.java new file mode 100644 index 00000000000..9f7665932e9 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineServicePerformance.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce; + +import java.io.IOException; +import java.util.Date; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.SleepJob.SleepInputFormat; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + + +public class TimelineServicePerformance extends Configured implements Tool { + static final int NUM_MAPS_DEFAULT = 1; + + static final int SIMPLE_ENTITY_WRITER = 1; + static final int JOB_HISTORY_FILE_REPLAY_MAPPER = 2; + static int mapperType = SIMPLE_ENTITY_WRITER; + static final int TIMELINE_SERVICE_VERSION_1 = 1; + static final int TIMELINE_SERVICE_VERSION_2 = 2; + static int timeline_service_version = TIMELINE_SERVICE_VERSION_1; + + protected static int printUsage() { + System.err.println( + "Usage: [-m ] number of mappers (default: " + NUM_MAPS_DEFAULT + + ")\n" + + " [-v] timeline service version\n" + + " [-mtype ]\n" + + " 1. simple entity write mapper\n" + + " 2. jobhistory files replay mapper\n" + + " [-s <(KBs)test>] number of KB per put (mtype=1, default: " + + SimpleEntityWriterV1.KBS_SENT_DEFAULT + " KB)\n" + + " [-t] package sending iterations per mapper (mtype=1, default: " + + SimpleEntityWriterV1.TEST_TIMES_DEFAULT + ")\n" + + " [-d ] root path of job history files (mtype=2)\n" + + " [-r ] (mtype=2)\n" + + " 1. write all entities for a job in one put (default)\n" + + " 2. write one entity at a time\n"); + GenericOptionsParser.printGenericCommandUsage(System.err); + return -1; + } + + /** + * Configure a job given argv. + */ + public static boolean parseArgs(String[] args, Job job) throws IOException { + // set the common defaults + Configuration conf = job.getConfiguration(); + conf.setInt(MRJobConfig.NUM_MAPS, NUM_MAPS_DEFAULT); + + for (int i = 0; i < args.length; i++) { + if (args.length == i + 1) { + System.out.println("ERROR: Required parameter missing from " + args[i]); + return printUsage() == 0; + } + try { + if ("-v".equals(args[i])) { + timeline_service_version = Integer.parseInt(args[++i]); + } + if ("-m".equals(args[i])) { + if (Integer.parseInt(args[++i]) > 0) { + job.getConfiguration() + .setInt(MRJobConfig.NUM_MAPS, Integer.parseInt(args[i])); + } + } else if ("-mtype".equals(args[i])) { + mapperType = Integer.parseInt(args[++i]); + } else if ("-s".equals(args[i])) { + if (Integer.parseInt(args[++i]) > 0) { + conf.setInt(SimpleEntityWriterV1.KBS_SENT, Integer.parseInt(args[i])); + } + } else if ("-t".equals(args[i])) { + if (Integer.parseInt(args[++i]) > 0) { + conf.setInt(SimpleEntityWriterV1.TEST_TIMES, + Integer.parseInt(args[i])); + } + } else if ("-d".equals(args[i])) { + conf.set(JobHistoryFileReplayHelper.PROCESSING_PATH, args[++i]); + } else if ("-r".equals(args[i])) { + conf.setInt(JobHistoryFileReplayHelper.REPLAY_MODE, + Integer.parseInt(args[++i])); + } else { + System.out.println("Unexpected argument: " + args[i]); + return printUsage() == 0; + } + } catch (NumberFormatException except) { + System.out.println("ERROR: Integer expected instead of " + args[i]); + return printUsage() == 0; + } catch (Exception e) { + throw (IOException)new IOException().initCause(e); + } + } + + // handle mapper-specific settings + switch (timeline_service_version) { + case TIMELINE_SERVICE_VERSION_1: + default: + switch (mapperType) { + case JOB_HISTORY_FILE_REPLAY_MAPPER: + job.setMapperClass(JobHistoryFileReplayMapperV1.class); + String processingPath = + conf.get(JobHistoryFileReplayHelper.PROCESSING_PATH); + if (processingPath == null || processingPath.isEmpty()) { + System.out.println("processing path is missing while mtype = 2"); + return printUsage() == 0; + } + break; + case SIMPLE_ENTITY_WRITER: + default: + job.setMapperClass(SimpleEntityWriterV1.class); + // use the current timestamp as the "run id" of the test: this will + // be used as simulating the cluster timestamp for apps + conf.setLong(SimpleEntityWriterV1.TIMELINE_SERVICE_PERFORMANCE_RUN_ID, + System.currentTimeMillis()); + break; + } + } + return true; + } + + /** + * TimelineServer Performance counters + */ + static enum PerfCounters { + TIMELINE_SERVICE_WRITE_TIME, + TIMELINE_SERVICE_WRITE_COUNTER, + TIMELINE_SERVICE_WRITE_FAILURES, + TIMELINE_SERVICE_WRITE_KBS, + } + + public int run(String[] args) throws Exception { + + Job job = Job.getInstance(getConf()); + job.setJarByClass(TimelineServicePerformance.class); + job.setMapperClass(SimpleEntityWriterV1.class); + job.setInputFormatClass(SleepInputFormat.class); + job.setOutputFormatClass(NullOutputFormat.class); + job.setNumReduceTasks(0); + if (!parseArgs(args, job)) { + return -1; + } + + Date startTime = new Date(); + System.out.println("Job started: " + startTime); + int ret = job.waitForCompletion(true) ? 0 : 1; + org.apache.hadoop.mapreduce.Counters counters = job.getCounters(); + long writetime = + counters.findCounter(PerfCounters.TIMELINE_SERVICE_WRITE_TIME).getValue(); + long writecounts = + counters.findCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER).getValue(); + long writesize = + counters.findCounter(PerfCounters.TIMELINE_SERVICE_WRITE_KBS).getValue(); + double transacrate = writecounts * 1000 / (double)writetime; + double iorate = writesize * 1000 / (double)writetime; + int numMaps = + Integer.parseInt(job.getConfiguration().get(MRJobConfig.NUM_MAPS)); + + System.out.println("TRANSACTION RATE (per mapper): " + transacrate + + " ops/s"); + System.out.println("IO RATE (per mapper): " + iorate + " KB/s"); + + System.out.println("TRANSACTION RATE (total): " + transacrate*numMaps + + " ops/s"); + System.out.println("IO RATE (total): " + iorate*numMaps + " KB/s"); + + return ret; + } + + public static void main(String[] args) throws Exception { + int res = + ToolRunner.run(new Configuration(), new TimelineServicePerformance(), + args); + System.exit(res); + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java index 8fa82aabf17..fcad15bdcf4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java @@ -28,6 +28,7 @@ import org.apache.hadoop.mapred.TestMapRed; import org.apache.hadoop.mapred.TestSequenceFileInputFormat; import org.apache.hadoop.mapred.TestTextInputFormat; import org.apache.hadoop.mapred.ThreadedMapBenchmark; +import org.apache.hadoop.mapreduce.TimelineServicePerformance; import org.apache.hadoop.mapreduce.FailJob; import org.apache.hadoop.mapreduce.LargeSorter; import org.apache.hadoop.mapreduce.MiniHadoopClusterManager; @@ -55,60 +56,62 @@ import org.apache.hadoop.fs.slive.SliveTest; public class MapredTestDriver { private ProgramDriver pgd; - + public MapredTestDriver() { this(new ProgramDriver()); } - + public MapredTestDriver(ProgramDriver pgd) { this.pgd = pgd; try { - pgd.addClass("testsequencefile", TestSequenceFile.class, + pgd.addClass("testsequencefile", TestSequenceFile.class, "A test for flat files of binary key value pairs."); - pgd.addClass("threadedmapbench", ThreadedMapBenchmark.class, - "A map/reduce benchmark that compares the performance " + + pgd.addClass("threadedmapbench", ThreadedMapBenchmark.class, + "A map/reduce benchmark that compares the performance " + "of maps with multiple spills over maps with 1 spill"); - pgd.addClass("mrbench", MRBench.class, + pgd.addClass("mrbench", MRBench.class, "A map/reduce benchmark that can create many small jobs"); pgd.addClass("mapredtest", TestMapRed.class, "A map/reduce test check."); - pgd.addClass("testsequencefileinputformat", - TestSequenceFileInputFormat.class, + pgd.addClass("testsequencefileinputformat", + TestSequenceFileInputFormat.class, "A test for sequence file input format."); - pgd.addClass("testtextinputformat", TestTextInputFormat.class, + pgd.addClass("testtextinputformat", TestTextInputFormat.class, "A test for text input format."); - pgd.addClass("testmapredsort", SortValidator.class, + pgd.addClass("testmapredsort", SortValidator.class, "A map/reduce program that validates the " + "map-reduce framework's sort."); - pgd.addClass("testbigmapoutput", BigMapOutput.class, + pgd.addClass("testbigmapoutput", BigMapOutput.class, "A map/reduce program that works on a very big " + "non-splittable file and does identity map/reduce"); - pgd.addClass("loadgen", GenericMRLoadGenerator.class, + pgd.addClass("loadgen", GenericMRLoadGenerator.class, "Generic map/reduce load generator"); pgd.addClass("MRReliabilityTest", ReliabilityTest.class, "A program that tests the reliability of the MR framework by " + "injecting faults/failures"); pgd.addClass("fail", FailJob.class, "a job that always fails"); - pgd.addClass("sleep", SleepJob.class, + pgd.addClass("sleep", SleepJob.class, "A job that sleeps at each map and reduce task."); - pgd.addClass("nnbench", NNBench.class, + pgd.addClass("timelineperformance", TimelineServicePerformance.class, + "A job that launches mappers to test timlineserver performance."); + pgd.addClass("nnbench", NNBench.class, "A benchmark that stresses the namenode w/ MR."); pgd.addClass("nnbenchWithoutMR", NNBenchWithoutMR.class, "A benchmark that stresses the namenode w/o MR."); - pgd.addClass("testfilesystem", TestFileSystem.class, + pgd.addClass("testfilesystem", TestFileSystem.class, "A test for FileSystem read/write."); - pgd.addClass(TestDFSIO.class.getSimpleName(), TestDFSIO.class, + pgd.addClass(TestDFSIO.class.getSimpleName(), TestDFSIO.class, "Distributed i/o benchmark."); pgd.addClass("DFSCIOTest", DFSCIOTest.class, "" + "Distributed i/o benchmark of libhdfs."); - pgd.addClass("DistributedFSCheck", DistributedFSCheck.class, + pgd.addClass("DistributedFSCheck", DistributedFSCheck.class, "Distributed checkup of the file system consistency."); - pgd.addClass("filebench", FileBench.class, + pgd.addClass("filebench", FileBench.class, "Benchmark SequenceFile(Input|Output)Format " + "(block,record compressed and uncompressed), " + "Text(Input|Output)Format (compressed and uncompressed)"); - pgd.addClass(JHLogAnalyzer.class.getSimpleName(), JHLogAnalyzer.class, + pgd.addClass(JHLogAnalyzer.class.getSimpleName(), JHLogAnalyzer.class, "Job History Log analyzer."); - pgd.addClass(SliveTest.class.getSimpleName(), SliveTest.class, + pgd.addClass(SliveTest.class.getSimpleName(), SliveTest.class, "HDFS Stress Test and Live Data Verification."); pgd.addClass("minicluster", MiniHadoopClusterManager.class, "Single process HDFS and MR cluster."); diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 93c07d87ca8..8aebe21a4d3 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -232,6 +232,9 @@ Release 2.8.0 - UNRELEASED YARN-3964. Support NodeLabelsProvider at Resource Manager side. (Dian Fu via devaraj) + YARN-2556. Tool to measure the performance of the timeline server (Chang Li + via sjlee) + IMPROVEMENTS YARN-644. Basic null check is not performed on passed in arguments before