MAPREDUCE-6337. Added a mode to replay MR job history files and put them into the timeline service v2. Contributed by Sangjin Lee.
(cherry picked from commit 463e070a8e7c882706a96eaa20ea49bfe9982875)
This commit is contained in:
parent
00e85e7a2b
commit
5088f6c76a
|
@ -0,0 +1,53 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
|
||||||
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
|
||||||
|
|
||||||
|
class JobHistoryFileParser {
|
||||||
|
private static final Log LOG = LogFactory.getLog(JobHistoryFileParser.class);
|
||||||
|
|
||||||
|
private final FileSystem fs;
|
||||||
|
|
||||||
|
public JobHistoryFileParser(FileSystem fs) {
|
||||||
|
LOG.info("JobHistoryFileParser created with " + fs);
|
||||||
|
this.fs = fs;
|
||||||
|
}
|
||||||
|
|
||||||
|
public JobInfo parseHistoryFile(Path path) throws IOException {
|
||||||
|
LOG.info("parsing job history file " + path);
|
||||||
|
JobHistoryParser parser = new JobHistoryParser(fs, path);
|
||||||
|
return parser.parse();
|
||||||
|
}
|
||||||
|
|
||||||
|
public Configuration parseConfiguration(Path path) throws IOException {
|
||||||
|
LOG.info("parsing job configuration file " + path);
|
||||||
|
Configuration conf = new Configuration(false);
|
||||||
|
conf.addResource(fs.open(path));
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,301 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.regex.Matcher;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.RemoteIterator;
|
||||||
|
import org.apache.hadoop.mapred.TimelineServicePerformanceV2.EntityWriter;
|
||||||
|
import org.apache.hadoop.mapred.TimelineServicePerformanceV2.PerfCounters;
|
||||||
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
|
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||||
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Mapper for TimelineServicePerformanceV2 that replays job history files to the
|
||||||
|
* timeline service.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
class JobHistoryFileReplayMapper extends EntityWriter {
|
||||||
|
private static final Log LOG =
|
||||||
|
LogFactory.getLog(JobHistoryFileReplayMapper.class);
|
||||||
|
|
||||||
|
static final String PROCESSING_PATH = "processing path";
|
||||||
|
static final String REPLAY_MODE = "replay mode";
|
||||||
|
static final int WRITE_ALL_AT_ONCE = 1;
|
||||||
|
static final int WRITE_PER_ENTITY = 2;
|
||||||
|
static final int REPLAY_MODE_DEFAULT = WRITE_ALL_AT_ONCE;
|
||||||
|
|
||||||
|
private static final Pattern JOB_ID_PARSER =
|
||||||
|
Pattern.compile("^(job_[0-9]+_([0-9]+)).*");
|
||||||
|
|
||||||
|
public static class JobFiles {
|
||||||
|
private final String jobId;
|
||||||
|
private Path jobHistoryFilePath;
|
||||||
|
private Path jobConfFilePath;
|
||||||
|
|
||||||
|
public JobFiles(String jobId) {
|
||||||
|
this.jobId = jobId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getJobId() {
|
||||||
|
return jobId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Path getJobHistoryFilePath() {
|
||||||
|
return jobHistoryFilePath;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setJobHistoryFilePath(Path jobHistoryFilePath) {
|
||||||
|
this.jobHistoryFilePath = jobHistoryFilePath;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Path getJobConfFilePath() {
|
||||||
|
return jobConfFilePath;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setJobConfFilePath(Path jobConfFilePath) {
|
||||||
|
this.jobConfFilePath = jobConfFilePath;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return jobId.hashCode();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object obj) {
|
||||||
|
if (this == obj) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (obj == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (getClass() != obj.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
JobFiles other = (JobFiles) obj;
|
||||||
|
return jobId.equals(other.jobId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private enum FileType { JOB_HISTORY_FILE, JOB_CONF_FILE, UNKNOWN }
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void writeEntities(Configuration tlConf,
|
||||||
|
TimelineCollectorManager manager, Context context) throws IOException {
|
||||||
|
// collect the apps it needs to process
|
||||||
|
Configuration conf = context.getConfiguration();
|
||||||
|
int taskId = context.getTaskAttemptID().getTaskID().getId();
|
||||||
|
int size = conf.getInt(MRJobConfig.NUM_MAPS,
|
||||||
|
TimelineServicePerformanceV2.NUM_MAPS_DEFAULT);
|
||||||
|
String processingDir =
|
||||||
|
conf.get(JobHistoryFileReplayMapper.PROCESSING_PATH);
|
||||||
|
int replayMode =
|
||||||
|
conf.getInt(JobHistoryFileReplayMapper.REPLAY_MODE,
|
||||||
|
JobHistoryFileReplayMapper.REPLAY_MODE_DEFAULT);
|
||||||
|
Path processingPath = new Path(processingDir);
|
||||||
|
FileSystem processingFs = processingPath.getFileSystem(conf);
|
||||||
|
JobHistoryFileParser parser = new JobHistoryFileParser(processingFs);
|
||||||
|
TimelineEntityConverter converter = new TimelineEntityConverter();
|
||||||
|
|
||||||
|
Collection<JobFiles> jobs =
|
||||||
|
selectJobFiles(processingFs, processingPath, taskId, size);
|
||||||
|
if (jobs.isEmpty()) {
|
||||||
|
LOG.info(context.getTaskAttemptID().getTaskID() +
|
||||||
|
" will process no jobs");
|
||||||
|
} else {
|
||||||
|
LOG.info(context.getTaskAttemptID().getTaskID() + " will process " +
|
||||||
|
jobs.size() + " jobs");
|
||||||
|
}
|
||||||
|
for (JobFiles job: jobs) {
|
||||||
|
// process each job
|
||||||
|
String jobIdStr = job.getJobId();
|
||||||
|
LOG.info("processing " + jobIdStr + "...");
|
||||||
|
JobId jobId = TypeConverter.toYarn(JobID.forName(jobIdStr));
|
||||||
|
ApplicationId appId = jobId.getAppId();
|
||||||
|
|
||||||
|
// create the app level timeline collector and start it
|
||||||
|
AppLevelTimelineCollector collector =
|
||||||
|
new AppLevelTimelineCollector(appId);
|
||||||
|
manager.putIfAbsent(appId, collector);
|
||||||
|
try {
|
||||||
|
// parse the job info and configuration
|
||||||
|
JobInfo jobInfo =
|
||||||
|
parser.parseHistoryFile(job.getJobHistoryFilePath());
|
||||||
|
Configuration jobConf =
|
||||||
|
parser.parseConfiguration(job.getJobConfFilePath());
|
||||||
|
LOG.info("parsed the job history file and the configuration file for job"
|
||||||
|
+ jobIdStr);
|
||||||
|
|
||||||
|
// set the context
|
||||||
|
// flow id: job name, flow run id: timestamp, user id
|
||||||
|
TimelineCollectorContext tlContext =
|
||||||
|
collector.getTimelineEntityContext();
|
||||||
|
tlContext.setFlowName(jobInfo.getJobname());
|
||||||
|
tlContext.setFlowRunId(jobInfo.getSubmitTime());
|
||||||
|
tlContext.setUserId(jobInfo.getUsername());
|
||||||
|
|
||||||
|
// create entities from job history and write them
|
||||||
|
long totalTime = 0;
|
||||||
|
Set<TimelineEntity> entitySet =
|
||||||
|
converter.createTimelineEntities(jobInfo, jobConf);
|
||||||
|
LOG.info("converted them into timeline entities for job " + jobIdStr);
|
||||||
|
// use the current user for this purpose
|
||||||
|
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
||||||
|
long startWrite = System.nanoTime();
|
||||||
|
try {
|
||||||
|
switch (replayMode) {
|
||||||
|
case JobHistoryFileReplayMapper.WRITE_ALL_AT_ONCE:
|
||||||
|
writeAllEntities(collector, entitySet, ugi);
|
||||||
|
break;
|
||||||
|
case JobHistoryFileReplayMapper.WRITE_PER_ENTITY:
|
||||||
|
writePerEntity(collector, entitySet, ugi);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_FAILURES).
|
||||||
|
increment(1);
|
||||||
|
LOG.error("writing to the timeline service failed", e);
|
||||||
|
}
|
||||||
|
long endWrite = System.nanoTime();
|
||||||
|
totalTime += TimeUnit.NANOSECONDS.toMillis(endWrite-startWrite);
|
||||||
|
int numEntities = entitySet.size();
|
||||||
|
LOG.info("wrote " + numEntities + " entities in " + totalTime + " ms");
|
||||||
|
|
||||||
|
context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_TIME).
|
||||||
|
increment(totalTime);
|
||||||
|
context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER).
|
||||||
|
increment(numEntities);
|
||||||
|
} finally {
|
||||||
|
manager.remove(appId);
|
||||||
|
context.progress(); // move it along
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void writeAllEntities(AppLevelTimelineCollector collector,
|
||||||
|
Set<TimelineEntity> entitySet, UserGroupInformation ugi)
|
||||||
|
throws IOException {
|
||||||
|
TimelineEntities entities = new TimelineEntities();
|
||||||
|
entities.setEntities(entitySet);
|
||||||
|
collector.putEntities(entities, ugi);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void writePerEntity(AppLevelTimelineCollector collector,
|
||||||
|
Set<TimelineEntity> entitySet, UserGroupInformation ugi)
|
||||||
|
throws IOException {
|
||||||
|
for (TimelineEntity entity : entitySet) {
|
||||||
|
TimelineEntities entities = new TimelineEntities();
|
||||||
|
entities.addEntity(entity);
|
||||||
|
collector.putEntities(entities, ugi);
|
||||||
|
LOG.info("wrote entity " + entity.getId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Collection<JobFiles> selectJobFiles(FileSystem fs,
|
||||||
|
Path processingRoot, int i, int size) throws IOException {
|
||||||
|
Map<String,JobFiles> jobs = new HashMap<>();
|
||||||
|
RemoteIterator<LocatedFileStatus> it = fs.listFiles(processingRoot, true);
|
||||||
|
while (it.hasNext()) {
|
||||||
|
LocatedFileStatus status = it.next();
|
||||||
|
Path path = status.getPath();
|
||||||
|
String fileName = path.getName();
|
||||||
|
Matcher m = JOB_ID_PARSER.matcher(fileName);
|
||||||
|
if (!m.matches()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
String jobId = m.group(1);
|
||||||
|
int lastId = Integer.parseInt(m.group(2));
|
||||||
|
int mod = lastId % size;
|
||||||
|
if (mod != i) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
LOG.info("this mapper will process file " + fileName);
|
||||||
|
// it's mine
|
||||||
|
JobFiles jobFiles = jobs.get(jobId);
|
||||||
|
if (jobFiles == null) {
|
||||||
|
jobFiles = new JobFiles(jobId);
|
||||||
|
jobs.put(jobId, jobFiles);
|
||||||
|
}
|
||||||
|
setFilePath(fileName, path, jobFiles);
|
||||||
|
}
|
||||||
|
return jobs.values();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setFilePath(String fileName, Path path,
|
||||||
|
JobFiles jobFiles) {
|
||||||
|
// determine if we're dealing with a job history file or a job conf file
|
||||||
|
FileType type = getFileType(fileName);
|
||||||
|
switch (type) {
|
||||||
|
case JOB_HISTORY_FILE:
|
||||||
|
if (jobFiles.getJobHistoryFilePath() == null) {
|
||||||
|
jobFiles.setJobHistoryFilePath(path);
|
||||||
|
} else {
|
||||||
|
LOG.warn("we already have the job history file " +
|
||||||
|
jobFiles.getJobHistoryFilePath() + ": skipping " + path);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case JOB_CONF_FILE:
|
||||||
|
if (jobFiles.getJobConfFilePath() == null) {
|
||||||
|
jobFiles.setJobConfFilePath(path);
|
||||||
|
} else {
|
||||||
|
LOG.warn("we already have the job conf file " +
|
||||||
|
jobFiles.getJobConfFilePath() + ": skipping " + path);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case UNKNOWN:
|
||||||
|
LOG.warn("unknown type: " + path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private FileType getFileType(String fileName) {
|
||||||
|
if (fileName.endsWith(".jhist")) {
|
||||||
|
return FileType.JOB_HISTORY_FILE;
|
||||||
|
}
|
||||||
|
if (fileName.endsWith("_conf.xml")) {
|
||||||
|
return FileType.JOB_CONF_FILE;
|
||||||
|
}
|
||||||
|
return FileType.UNKNOWN;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,139 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.mapred.TimelineServicePerformanceV2.EntityWriter;
|
||||||
|
import org.apache.hadoop.mapred.TimelineServicePerformanceV2.PerfCounters;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds simple entities with random string payload, events, metrics, and
|
||||||
|
* configuration.
|
||||||
|
*/
|
||||||
|
class SimpleEntityWriter extends EntityWriter {
|
||||||
|
private static final Log LOG = LogFactory.getLog(SimpleEntityWriter.class);
|
||||||
|
|
||||||
|
// constants for mtype = 1
|
||||||
|
static final String KBS_SENT = "kbs sent";
|
||||||
|
static final int KBS_SENT_DEFAULT = 1;
|
||||||
|
static final String TEST_TIMES = "testtimes";
|
||||||
|
static final int TEST_TIMES_DEFAULT = 100;
|
||||||
|
static final String TIMELINE_SERVICE_PERFORMANCE_RUN_ID =
|
||||||
|
"timeline.server.performance.run.id";
|
||||||
|
|
||||||
|
protected void writeEntities(Configuration tlConf,
|
||||||
|
TimelineCollectorManager manager, Context context) throws IOException {
|
||||||
|
Configuration conf = context.getConfiguration();
|
||||||
|
// simulate the app id with the task id
|
||||||
|
int taskId = context.getTaskAttemptID().getTaskID().getId();
|
||||||
|
long timestamp = conf.getLong(TIMELINE_SERVICE_PERFORMANCE_RUN_ID, 0);
|
||||||
|
ApplicationId appId = ApplicationId.newInstance(timestamp, taskId);
|
||||||
|
|
||||||
|
// create the app level timeline collector
|
||||||
|
AppLevelTimelineCollector collector =
|
||||||
|
new AppLevelTimelineCollector(appId);
|
||||||
|
manager.putIfAbsent(appId, collector);
|
||||||
|
|
||||||
|
try {
|
||||||
|
// set the context
|
||||||
|
// flow id: job name, flow run id: timestamp, user id
|
||||||
|
TimelineCollectorContext tlContext =
|
||||||
|
collector.getTimelineEntityContext();
|
||||||
|
tlContext.setFlowName(context.getJobName());
|
||||||
|
tlContext.setFlowRunId(timestamp);
|
||||||
|
tlContext.setUserId(context.getUser());
|
||||||
|
|
||||||
|
final int kbs = conf.getInt(KBS_SENT, KBS_SENT_DEFAULT);
|
||||||
|
|
||||||
|
long totalTime = 0;
|
||||||
|
final int testtimes = conf.getInt(TEST_TIMES, TEST_TIMES_DEFAULT);
|
||||||
|
final Random rand = new Random();
|
||||||
|
final TaskAttemptID taskAttemptId = context.getTaskAttemptID();
|
||||||
|
final char[] payLoad = new char[kbs * 1024];
|
||||||
|
|
||||||
|
for (int i = 0; i < testtimes; i++) {
|
||||||
|
// Generate a fixed length random payload
|
||||||
|
for (int xx = 0; xx < kbs * 1024; xx++) {
|
||||||
|
int alphaNumIdx =
|
||||||
|
rand.nextInt(TimelineServicePerformanceV2.alphaNums.length);
|
||||||
|
payLoad[xx] = TimelineServicePerformanceV2.alphaNums[alphaNumIdx];
|
||||||
|
}
|
||||||
|
String entId = taskAttemptId + "_" + Integer.toString(i);
|
||||||
|
final TimelineEntity entity = new TimelineEntity();
|
||||||
|
entity.setId(entId);
|
||||||
|
entity.setType("FOO_ATTEMPT");
|
||||||
|
entity.addInfo("PERF_TEST", payLoad);
|
||||||
|
// add an event
|
||||||
|
TimelineEvent event = new TimelineEvent();
|
||||||
|
event.setTimestamp(System.currentTimeMillis());
|
||||||
|
event.addInfo("foo_event", "test");
|
||||||
|
entity.addEvent(event);
|
||||||
|
// add a metric
|
||||||
|
TimelineMetric metric = new TimelineMetric();
|
||||||
|
metric.setId("foo_metric");
|
||||||
|
metric.addValue(System.currentTimeMillis(), 123456789L);
|
||||||
|
entity.addMetric(metric);
|
||||||
|
// add a config
|
||||||
|
entity.addConfig("foo", "bar");
|
||||||
|
|
||||||
|
TimelineEntities entities = new TimelineEntities();
|
||||||
|
entities.addEntity(entity);
|
||||||
|
// use the current user for this purpose
|
||||||
|
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
||||||
|
long startWrite = System.nanoTime();
|
||||||
|
try {
|
||||||
|
collector.putEntities(entities, ugi);
|
||||||
|
} catch (Exception e) {
|
||||||
|
context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_FAILURES).
|
||||||
|
increment(1);
|
||||||
|
LOG.error("writing to the timeline service failed", e);
|
||||||
|
}
|
||||||
|
long endWrite = System.nanoTime();
|
||||||
|
totalTime += TimeUnit.NANOSECONDS.toMillis(endWrite-startWrite);
|
||||||
|
}
|
||||||
|
LOG.info("wrote " + testtimes + " entities (" + kbs*testtimes +
|
||||||
|
" kB) in " + totalTime + " ms");
|
||||||
|
context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_TIME).
|
||||||
|
increment(totalTime);
|
||||||
|
context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER).
|
||||||
|
increment(testtimes);
|
||||||
|
context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_KBS).
|
||||||
|
increment(kbs*testtimes);
|
||||||
|
} finally {
|
||||||
|
// clean up
|
||||||
|
manager.remove(appId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,207 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.mapreduce.Counter;
|
||||||
|
import org.apache.hadoop.mapreduce.CounterGroup;
|
||||||
|
import org.apache.hadoop.mapreduce.Counters;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskID;
|
||||||
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
|
||||||
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
|
||||||
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
||||||
|
|
||||||
|
class TimelineEntityConverter {
|
||||||
|
private static final Log LOG =
|
||||||
|
LogFactory.getLog(TimelineEntityConverter.class);
|
||||||
|
|
||||||
|
static final String JOB = "MAPREDUCE_JOB";
|
||||||
|
static final String TASK = "MAPREDUCE_TASK";
|
||||||
|
static final String TASK_ATTEMPT = "MAPREDUCE_TASK_ATTEMPT";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates job, task, and task attempt entities based on the job history info
|
||||||
|
* and configuration.
|
||||||
|
*
|
||||||
|
* Note: currently these are plan timeline entities created for mapreduce
|
||||||
|
* types. These are not meant to be the complete and accurate entity set-up
|
||||||
|
* for mapreduce jobs. We do not leverage hierarchical timeline entities. If
|
||||||
|
* we create canonical mapreduce hierarchical timeline entities with proper
|
||||||
|
* parent-child relationship, we could modify this to use that instead.
|
||||||
|
*
|
||||||
|
* Note that we also do not add info to the YARN application entity, which
|
||||||
|
* would be needed for aggregation.
|
||||||
|
*/
|
||||||
|
public Set<TimelineEntity> createTimelineEntities(JobInfo jobInfo,
|
||||||
|
Configuration conf) {
|
||||||
|
Set<TimelineEntity> entities = new HashSet<>();
|
||||||
|
|
||||||
|
// create the job entity
|
||||||
|
TimelineEntity job = createJobEntity(jobInfo, conf);
|
||||||
|
entities.add(job);
|
||||||
|
|
||||||
|
// create the task and task attempt entities
|
||||||
|
Set<TimelineEntity> tasksAndAttempts =
|
||||||
|
createTaskAndTaskAttemptEntities(jobInfo);
|
||||||
|
entities.addAll(tasksAndAttempts);
|
||||||
|
|
||||||
|
return entities;
|
||||||
|
}
|
||||||
|
|
||||||
|
private TimelineEntity createJobEntity(JobInfo jobInfo, Configuration conf) {
|
||||||
|
TimelineEntity job = new TimelineEntity();
|
||||||
|
job.setType(JOB);
|
||||||
|
job.setId(jobInfo.getJobId().toString());
|
||||||
|
job.setCreatedTime(jobInfo.getSubmitTime());
|
||||||
|
|
||||||
|
job.addInfo("JOBNAME", jobInfo.getJobname());
|
||||||
|
job.addInfo("USERNAME", jobInfo.getUsername());
|
||||||
|
job.addInfo("JOB_QUEUE_NAME", jobInfo.getJobQueueName());
|
||||||
|
job.addInfo("SUBMIT_TIME", jobInfo.getSubmitTime());
|
||||||
|
job.addInfo("LAUNCH_TIME", jobInfo.getLaunchTime());
|
||||||
|
job.addInfo("FINISH_TIME", jobInfo.getFinishTime());
|
||||||
|
job.addInfo("JOB_STATUS", jobInfo.getJobStatus());
|
||||||
|
job.addInfo("PRIORITY", jobInfo.getPriority());
|
||||||
|
job.addInfo("TOTAL_MAPS", jobInfo.getTotalMaps());
|
||||||
|
job.addInfo("TOTAL_REDUCES", jobInfo.getTotalReduces());
|
||||||
|
job.addInfo("UBERIZED", jobInfo.getUberized());
|
||||||
|
job.addInfo("ERROR_INFO", jobInfo.getErrorInfo());
|
||||||
|
|
||||||
|
// add metrics from total counters
|
||||||
|
// we omit the map counters and reduce counters for now as it's kind of
|
||||||
|
// awkward to put them (map/reduce/total counters are really a group of
|
||||||
|
// related counters)
|
||||||
|
Counters totalCounters = jobInfo.getTotalCounters();
|
||||||
|
if (totalCounters != null) {
|
||||||
|
addMetrics(job, totalCounters);
|
||||||
|
}
|
||||||
|
// finally add configuration to the job
|
||||||
|
addConfiguration(job, conf);
|
||||||
|
LOG.info("converted job " + jobInfo.getJobId() + " to a timeline entity");
|
||||||
|
return job;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addConfiguration(TimelineEntity job, Configuration conf) {
|
||||||
|
for (Map.Entry<String,String> e: conf) {
|
||||||
|
job.addConfig(e.getKey(), e.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addMetrics(TimelineEntity entity, Counters counters) {
|
||||||
|
for (CounterGroup g: counters) {
|
||||||
|
String groupName = g.getName();
|
||||||
|
for (Counter c: g) {
|
||||||
|
String name = groupName + ":" + c.getName();
|
||||||
|
TimelineMetric metric = new TimelineMetric();
|
||||||
|
metric.setId(name);
|
||||||
|
metric.addValue(System.currentTimeMillis(), c.getValue());
|
||||||
|
entity.addMetric(metric);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Set<TimelineEntity> createTaskAndTaskAttemptEntities(JobInfo jobInfo) {
|
||||||
|
Set<TimelineEntity> entities = new HashSet<>();
|
||||||
|
Map<TaskID,TaskInfo> taskInfoMap = jobInfo.getAllTasks();
|
||||||
|
LOG.info("job " + jobInfo.getJobId()+ " has " + taskInfoMap.size() +
|
||||||
|
" tasks");
|
||||||
|
for (TaskInfo taskInfo: taskInfoMap.values()) {
|
||||||
|
TimelineEntity task = createTaskEntity(taskInfo);
|
||||||
|
entities.add(task);
|
||||||
|
// add the task attempts from this task
|
||||||
|
Set<TimelineEntity> taskAttempts = createTaskAttemptEntities(taskInfo);
|
||||||
|
entities.addAll(taskAttempts);
|
||||||
|
}
|
||||||
|
return entities;
|
||||||
|
}
|
||||||
|
|
||||||
|
private TimelineEntity createTaskEntity(TaskInfo taskInfo) {
|
||||||
|
TimelineEntity task = new TimelineEntity();
|
||||||
|
task.setType(TASK);
|
||||||
|
task.setId(taskInfo.getTaskId().toString());
|
||||||
|
task.setCreatedTime(taskInfo.getStartTime());
|
||||||
|
|
||||||
|
task.addInfo("START_TIME", taskInfo.getStartTime());
|
||||||
|
task.addInfo("FINISH_TIME", taskInfo.getFinishTime());
|
||||||
|
task.addInfo("TASK_TYPE", taskInfo.getTaskType());
|
||||||
|
task.addInfo("TASK_STATUS", taskInfo.getTaskStatus());
|
||||||
|
task.addInfo("ERROR_INFO", taskInfo.getError());
|
||||||
|
|
||||||
|
// add metrics from counters
|
||||||
|
Counters counters = taskInfo.getCounters();
|
||||||
|
if (counters != null) {
|
||||||
|
addMetrics(task, counters);
|
||||||
|
}
|
||||||
|
LOG.info("converted task " + taskInfo.getTaskId() +
|
||||||
|
" to a timeline entity");
|
||||||
|
return task;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Set<TimelineEntity> createTaskAttemptEntities(TaskInfo taskInfo) {
|
||||||
|
Set<TimelineEntity> taskAttempts = new HashSet<TimelineEntity>();
|
||||||
|
Map<TaskAttemptID,TaskAttemptInfo> taskAttemptInfoMap =
|
||||||
|
taskInfo.getAllTaskAttempts();
|
||||||
|
LOG.info("task " + taskInfo.getTaskId() + " has " +
|
||||||
|
taskAttemptInfoMap.size() + " task attempts");
|
||||||
|
for (TaskAttemptInfo taskAttemptInfo: taskAttemptInfoMap.values()) {
|
||||||
|
TimelineEntity taskAttempt = createTaskAttemptEntity(taskAttemptInfo);
|
||||||
|
taskAttempts.add(taskAttempt);
|
||||||
|
}
|
||||||
|
return taskAttempts;
|
||||||
|
}
|
||||||
|
|
||||||
|
private TimelineEntity createTaskAttemptEntity(TaskAttemptInfo taskAttemptInfo) {
|
||||||
|
TimelineEntity taskAttempt = new TimelineEntity();
|
||||||
|
taskAttempt.setType(TASK_ATTEMPT);
|
||||||
|
taskAttempt.setId(taskAttemptInfo.getAttemptId().toString());
|
||||||
|
taskAttempt.setCreatedTime(taskAttemptInfo.getStartTime());
|
||||||
|
|
||||||
|
taskAttempt.addInfo("START_TIME", taskAttemptInfo.getStartTime());
|
||||||
|
taskAttempt.addInfo("FINISH_TIME", taskAttemptInfo.getFinishTime());
|
||||||
|
taskAttempt.addInfo("MAP_FINISH_TIME",
|
||||||
|
taskAttemptInfo.getMapFinishTime());
|
||||||
|
taskAttempt.addInfo("SHUFFLE_FINISH_TIME",
|
||||||
|
taskAttemptInfo.getShuffleFinishTime());
|
||||||
|
taskAttempt.addInfo("SORT_FINISH_TIME",
|
||||||
|
taskAttemptInfo.getSortFinishTime());
|
||||||
|
taskAttempt.addInfo("TASK_STATUS", taskAttemptInfo.getTaskStatus());
|
||||||
|
taskAttempt.addInfo("STATE", taskAttemptInfo.getState());
|
||||||
|
taskAttempt.addInfo("ERROR", taskAttemptInfo.getError());
|
||||||
|
taskAttempt.addInfo("CONTAINER_ID",
|
||||||
|
taskAttemptInfo.getContainerId().toString());
|
||||||
|
|
||||||
|
// add metrics from counters
|
||||||
|
Counters counters = taskAttemptInfo.getCounters();
|
||||||
|
if (counters != null) {
|
||||||
|
addMetrics(taskAttempt, counters);
|
||||||
|
}
|
||||||
|
LOG.info("converted task attempt " + taskAttemptInfo.getAttemptId() +
|
||||||
|
" to a timeline entity");
|
||||||
|
return taskAttempt;
|
||||||
|
}
|
||||||
|
}
|
|
@ -20,10 +20,7 @@ package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.Random;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.conf.Configured;
|
import org.apache.hadoop.conf.Configured;
|
||||||
import org.apache.hadoop.io.IntWritable;
|
import org.apache.hadoop.io.IntWritable;
|
||||||
|
@ -31,49 +28,35 @@ import org.apache.hadoop.io.Writable;
|
||||||
import org.apache.hadoop.mapreduce.Job;
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.SleepJob.SleepInputFormat;
|
import org.apache.hadoop.mapreduce.SleepJob.SleepInputFormat;
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
|
||||||
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
|
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
|
||||||
import org.apache.hadoop.util.GenericOptionsParser;
|
import org.apache.hadoop.util.GenericOptionsParser;
|
||||||
import org.apache.hadoop.util.Tool;
|
import org.apache.hadoop.util.Tool;
|
||||||
import org.apache.hadoop.util.ToolRunner;
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
|
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
|
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector;
|
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
|
|
||||||
|
|
||||||
public class TimelineServicePerformanceV2 extends Configured implements Tool {
|
public class TimelineServicePerformanceV2 extends Configured implements Tool {
|
||||||
private static final Log LOG =
|
|
||||||
LogFactory.getLog(TimelineServicePerformanceV2.class);
|
|
||||||
|
|
||||||
static final int NUM_MAPS_DEFAULT = 1;
|
static final int NUM_MAPS_DEFAULT = 1;
|
||||||
|
|
||||||
static final int SIMPLE_ENTITY_WRITER = 1;
|
static final int SIMPLE_ENTITY_WRITER = 1;
|
||||||
// constants for mtype = 1
|
static final int JOB_HISTORY_FILE_REPLAY_MAPPER = 2;
|
||||||
static final String KBS_SENT = "kbs sent";
|
|
||||||
static final int KBS_SENT_DEFAULT = 1;
|
|
||||||
static final String TEST_TIMES = "testtimes";
|
|
||||||
static final int TEST_TIMES_DEFAULT = 100;
|
|
||||||
static final String TIMELINE_SERVICE_PERFORMANCE_RUN_ID =
|
|
||||||
"timeline.server.performance.run.id";
|
|
||||||
|
|
||||||
static int mapperType = SIMPLE_ENTITY_WRITER;
|
static int mapperType = SIMPLE_ENTITY_WRITER;
|
||||||
|
|
||||||
protected static int printUsage() {
|
protected static int printUsage() {
|
||||||
// TODO is there a way to handle mapper-specific options more gracefully?
|
|
||||||
System.err.println(
|
System.err.println(
|
||||||
"Usage: [-m <maps>] number of mappers (default: " + NUM_MAPS_DEFAULT +
|
"Usage: [-m <maps>] number of mappers (default: " + NUM_MAPS_DEFAULT +
|
||||||
")\n" +
|
")\n" +
|
||||||
" [-mtype <mapper type in integer>] \n" +
|
" [-mtype <mapper type in integer>]\n" +
|
||||||
" 1. simple entity write mapper\n" +
|
" 1. simple entity write mapper\n" +
|
||||||
" [-s <(KBs)test>] number of KB per put (default: " +
|
" 2. job history file replay mapper\n" +
|
||||||
KBS_SENT_DEFAULT + " KB)\n" +
|
" [-s <(KBs)test>] number of KB per put (mtype=1, default: " +
|
||||||
" [-t] package sending iterations per mapper (default: " +
|
SimpleEntityWriter.KBS_SENT_DEFAULT + " KB)\n" +
|
||||||
TEST_TIMES_DEFAULT + ")\n");
|
" [-t] package sending iterations per mapper (mtype=1, default: " +
|
||||||
|
SimpleEntityWriter.TEST_TIMES_DEFAULT + ")\n" +
|
||||||
|
" [-d <path>] root path of job history files (mtype=2)\n" +
|
||||||
|
" [-r <replay mode>] (mtype=2)\n" +
|
||||||
|
" 1. write all entities for a job in one put (default)\n" +
|
||||||
|
" 2. write one entity at a time\n");
|
||||||
GenericOptionsParser.printGenericCommandUsage(System.err);
|
GenericOptionsParser.printGenericCommandUsage(System.err);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -82,11 +65,9 @@ public class TimelineServicePerformanceV2 extends Configured implements Tool {
|
||||||
* Configure a job given argv.
|
* Configure a job given argv.
|
||||||
*/
|
*/
|
||||||
public static boolean parseArgs(String[] args, Job job) throws IOException {
|
public static boolean parseArgs(String[] args, Job job) throws IOException {
|
||||||
// set the defaults
|
// set the common defaults
|
||||||
Configuration conf = job.getConfiguration();
|
Configuration conf = job.getConfiguration();
|
||||||
conf.setInt(MRJobConfig.NUM_MAPS, NUM_MAPS_DEFAULT);
|
conf.setInt(MRJobConfig.NUM_MAPS, NUM_MAPS_DEFAULT);
|
||||||
conf.setInt(KBS_SENT, KBS_SENT_DEFAULT);
|
|
||||||
conf.setInt(TEST_TIMES, TEST_TIMES_DEFAULT);
|
|
||||||
|
|
||||||
for (int i = 0; i < args.length; i++) {
|
for (int i = 0; i < args.length; i++) {
|
||||||
if (args.length == i + 1) {
|
if (args.length == i + 1) {
|
||||||
|
@ -97,25 +78,24 @@ public class TimelineServicePerformanceV2 extends Configured implements Tool {
|
||||||
if ("-m".equals(args[i])) {
|
if ("-m".equals(args[i])) {
|
||||||
if (Integer.parseInt(args[++i]) > 0) {
|
if (Integer.parseInt(args[++i]) > 0) {
|
||||||
job.getConfiguration()
|
job.getConfiguration()
|
||||||
.setInt(MRJobConfig.NUM_MAPS, (Integer.parseInt(args[i])));
|
.setInt(MRJobConfig.NUM_MAPS, Integer.parseInt(args[i]));
|
||||||
}
|
}
|
||||||
} else if ("-mtype".equals(args[i])) {
|
} else if ("-mtype".equals(args[i])) {
|
||||||
mapperType = Integer.parseInt(args[++i]);
|
mapperType = Integer.parseInt(args[++i]);
|
||||||
switch (mapperType) {
|
|
||||||
case SIMPLE_ENTITY_WRITER:
|
|
||||||
job.setMapperClass(SimpleEntityWriter.class);
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
job.setMapperClass(SimpleEntityWriter.class);
|
|
||||||
}
|
|
||||||
} else if ("-s".equals(args[i])) {
|
} else if ("-s".equals(args[i])) {
|
||||||
if (Integer.parseInt(args[++i]) > 0) {
|
if (Integer.parseInt(args[++i]) > 0) {
|
||||||
conf.setInt(KBS_SENT, (Integer.parseInt(args[i])));
|
conf.setInt(SimpleEntityWriter.KBS_SENT, Integer.parseInt(args[i]));
|
||||||
}
|
}
|
||||||
} else if ("-t".equals(args[i])) {
|
} else if ("-t".equals(args[i])) {
|
||||||
if (Integer.parseInt(args[++i]) > 0) {
|
if (Integer.parseInt(args[++i]) > 0) {
|
||||||
conf.setInt(TEST_TIMES, (Integer.parseInt(args[i])));
|
conf.setInt(SimpleEntityWriter.TEST_TIMES,
|
||||||
|
Integer.parseInt(args[i]));
|
||||||
}
|
}
|
||||||
|
} else if ("-d".equals(args[i])) {
|
||||||
|
conf.set(JobHistoryFileReplayMapper.PROCESSING_PATH, args[++i]);
|
||||||
|
} else if ("-r".equals(args[i])) {
|
||||||
|
conf.setInt(JobHistoryFileReplayMapper.REPLAY_MODE,
|
||||||
|
Integer.parseInt(args[++i]));
|
||||||
} else {
|
} else {
|
||||||
System.out.println("Unexpected argument: " + args[i]);
|
System.out.println("Unexpected argument: " + args[i]);
|
||||||
return printUsage() == 0;
|
return printUsage() == 0;
|
||||||
|
@ -128,6 +108,27 @@ public class TimelineServicePerformanceV2 extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// handle mapper-specific settings
|
||||||
|
switch (mapperType) {
|
||||||
|
case JOB_HISTORY_FILE_REPLAY_MAPPER:
|
||||||
|
job.setMapperClass(JobHistoryFileReplayMapper.class);
|
||||||
|
String processingPath =
|
||||||
|
conf.get(JobHistoryFileReplayMapper.PROCESSING_PATH);
|
||||||
|
if (processingPath == null || processingPath.isEmpty()) {
|
||||||
|
System.out.println("processing path is missing while mtype = 2");
|
||||||
|
return printUsage() == 0;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case SIMPLE_ENTITY_WRITER:
|
||||||
|
default:
|
||||||
|
job.setMapperClass(SimpleEntityWriter.class);
|
||||||
|
// use the current timestamp as the "run id" of the test: this will
|
||||||
|
// be used as simulating the cluster timestamp for apps
|
||||||
|
conf.setLong(SimpleEntityWriter.TIMELINE_SERVICE_PERFORMANCE_RUN_ID,
|
||||||
|
System.currentTimeMillis());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -153,13 +154,6 @@ public class TimelineServicePerformanceV2 extends Configured implements Tool {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// for mtype = 1
|
|
||||||
// use the current timestamp as the "run id" of the test: this will be used
|
|
||||||
// as simulating the cluster timestamp for apps
|
|
||||||
Configuration conf = job.getConfiguration();
|
|
||||||
conf.setLong(TIMELINE_SERVICE_PERFORMANCE_RUN_ID,
|
|
||||||
System.currentTimeMillis());
|
|
||||||
|
|
||||||
Date startTime = new Date();
|
Date startTime = new Date();
|
||||||
System.out.println("Job started: " + startTime);
|
System.out.println("Job started: " + startTime);
|
||||||
int ret = job.waitForCompletion(true) ? 0 : 1;
|
int ret = job.waitForCompletion(true) ? 0 : 1;
|
||||||
|
@ -172,7 +166,8 @@ public class TimelineServicePerformanceV2 extends Configured implements Tool {
|
||||||
counters.findCounter(PerfCounters.TIMELINE_SERVICE_WRITE_KBS).getValue();
|
counters.findCounter(PerfCounters.TIMELINE_SERVICE_WRITE_KBS).getValue();
|
||||||
double transacrate = writecounts * 1000 / (double)writetime;
|
double transacrate = writecounts * 1000 / (double)writetime;
|
||||||
double iorate = writesize * 1000 / (double)writetime;
|
double iorate = writesize * 1000 / (double)writetime;
|
||||||
int numMaps = Integer.parseInt(conf.get(MRJobConfig.NUM_MAPS));
|
int numMaps =
|
||||||
|
Integer.parseInt(job.getConfiguration().get(MRJobConfig.NUM_MAPS));
|
||||||
|
|
||||||
System.out.println("TRANSACTION RATE (per mapper): " + transacrate +
|
System.out.println("TRANSACTION RATE (per mapper): " + transacrate +
|
||||||
" ops/s");
|
" ops/s");
|
||||||
|
@ -204,95 +199,31 @@ public class TimelineServicePerformanceV2 extends Configured implements Tool {
|
||||||
'3', '4', '5', '6', '7', '8', '9', '0', ' ' };
|
'3', '4', '5', '6', '7', '8', '9', '0', ' ' };
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Adds simple entities with random string payload, events, metrics, and
|
* Base mapper for writing entities to the timeline service. Subclasses
|
||||||
* configuration.
|
* override {@link #writeEntities(Configuration, TimelineCollectorManager,
|
||||||
|
* org.apache.hadoop.mapreduce.Mapper.Context)} to create and write entities
|
||||||
|
* to the timeline service.
|
||||||
*/
|
*/
|
||||||
public static class SimpleEntityWriter
|
public static abstract class EntityWriter
|
||||||
extends org.apache.hadoop.mapreduce.Mapper<IntWritable,IntWritable,Writable,Writable> {
|
extends org.apache.hadoop.mapreduce.Mapper<IntWritable,IntWritable,Writable,Writable> {
|
||||||
|
@Override
|
||||||
public void map(IntWritable key, IntWritable val, Context context)
|
public void map(IntWritable key, IntWritable val, Context context)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
Configuration conf = context.getConfiguration();
|
// create the timeline collector manager wired with the writer
|
||||||
// simulate the app id with the task id
|
|
||||||
int taskId = context.getTaskAttemptID().getTaskID().getId();
|
|
||||||
long timestamp = conf.getLong(TIMELINE_SERVICE_PERFORMANCE_RUN_ID, 0);
|
|
||||||
ApplicationId appId = ApplicationId.newInstance(timestamp, taskId);
|
|
||||||
|
|
||||||
// create the app level timeline collector
|
|
||||||
Configuration tlConf = new YarnConfiguration();
|
Configuration tlConf = new YarnConfiguration();
|
||||||
AppLevelTimelineCollector collector =
|
TimelineCollectorManager manager = new TimelineCollectorManager("test");
|
||||||
new AppLevelTimelineCollector(appId);
|
manager.init(tlConf);
|
||||||
collector.init(tlConf);
|
manager.start();
|
||||||
collector.start();
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// set the context
|
// invoke the method to have the subclass write entities
|
||||||
// flow id: job name, flow run id: timestamp, user id
|
writeEntities(tlConf, manager, context);
|
||||||
TimelineCollectorContext tlContext =
|
|
||||||
collector.getTimelineEntityContext();
|
|
||||||
tlContext.setFlowName(context.getJobName());
|
|
||||||
tlContext.setFlowRunId(timestamp);
|
|
||||||
tlContext.setUserId(context.getUser());
|
|
||||||
|
|
||||||
final int kbs = Integer.parseInt(conf.get(KBS_SENT));
|
|
||||||
|
|
||||||
long totalTime = 0;
|
|
||||||
final int testtimes = Integer.parseInt(conf.get(TEST_TIMES));
|
|
||||||
final Random rand = new Random();
|
|
||||||
final TaskAttemptID taskAttemptId = context.getTaskAttemptID();
|
|
||||||
final char[] payLoad = new char[kbs * 1024];
|
|
||||||
|
|
||||||
for (int i = 0; i < testtimes; i++) {
|
|
||||||
// Generate a fixed length random payload
|
|
||||||
for (int xx = 0; xx < kbs * 1024; xx++) {
|
|
||||||
int alphaNumIdx = rand.nextInt(alphaNums.length);
|
|
||||||
payLoad[xx] = alphaNums[alphaNumIdx];
|
|
||||||
}
|
|
||||||
String entId = taskAttemptId + "_" + Integer.toString(i);
|
|
||||||
final TimelineEntity entity = new TimelineEntity();
|
|
||||||
entity.setId(entId);
|
|
||||||
entity.setType("FOO_ATTEMPT");
|
|
||||||
entity.addInfo("PERF_TEST", payLoad);
|
|
||||||
// add an event
|
|
||||||
TimelineEvent event = new TimelineEvent();
|
|
||||||
event.setTimestamp(System.currentTimeMillis());
|
|
||||||
event.addInfo("foo_event", "test");
|
|
||||||
entity.addEvent(event);
|
|
||||||
// add a metric
|
|
||||||
TimelineMetric metric = new TimelineMetric();
|
|
||||||
metric.setId("foo_metric");
|
|
||||||
metric.addValue(System.currentTimeMillis(), 123456789L);
|
|
||||||
entity.addMetric(metric);
|
|
||||||
// add a config
|
|
||||||
entity.addConfig("foo", "bar");
|
|
||||||
|
|
||||||
TimelineEntities entities = new TimelineEntities();
|
|
||||||
entities.addEntity(entity);
|
|
||||||
// use the current user for this purpose
|
|
||||||
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
|
||||||
long startWrite = System.nanoTime();
|
|
||||||
try {
|
|
||||||
collector.putEntities(entities, ugi);
|
|
||||||
} catch (Exception e) {
|
|
||||||
context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_FAILURES).
|
|
||||||
increment(1);
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
long endWrite = System.nanoTime();
|
|
||||||
totalTime += (endWrite-startWrite)/1000000L;
|
|
||||||
}
|
|
||||||
LOG.info("wrote " + testtimes + " entities (" + kbs*testtimes +
|
|
||||||
" kB) in " + totalTime + " ms");
|
|
||||||
context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_TIME).
|
|
||||||
increment(totalTime);
|
|
||||||
context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER).
|
|
||||||
increment(testtimes);
|
|
||||||
context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_KBS).
|
|
||||||
increment(kbs*testtimes);
|
|
||||||
} finally {
|
} finally {
|
||||||
// clean up
|
manager.close();
|
||||||
collector.close();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected abstract void writeEntities(Configuration tlConf,
|
||||||
|
TimelineCollectorManager manager, Context context) throws IOException;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,7 +48,7 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Unstable
|
@InterfaceStability.Unstable
|
||||||
public abstract class TimelineCollectorManager extends AbstractService {
|
public class TimelineCollectorManager extends AbstractService {
|
||||||
private static final Log LOG =
|
private static final Log LOG =
|
||||||
LogFactory.getLog(TimelineCollectorManager.class);
|
LogFactory.getLog(TimelineCollectorManager.class);
|
||||||
|
|
||||||
|
@ -90,10 +90,14 @@ public abstract class TimelineCollectorManager extends AbstractService {
|
||||||
Collections.synchronizedMap(
|
Collections.synchronizedMap(
|
||||||
new HashMap<ApplicationId, TimelineCollector>());
|
new HashMap<ApplicationId, TimelineCollector>());
|
||||||
|
|
||||||
protected TimelineCollectorManager(String name) {
|
public TimelineCollectorManager(String name) {
|
||||||
super(name);
|
super(name);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected TimelineWriter getWriter() {
|
||||||
|
return writer;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Put the collector into the collection if an collector mapped by id does
|
* Put the collector into the collection if an collector mapped by id does
|
||||||
* not exist.
|
* not exist.
|
||||||
|
|
|
@ -47,17 +47,17 @@ public class FileSystemTimelineWriterImpl extends AbstractService
|
||||||
|
|
||||||
private String outputRoot;
|
private String outputRoot;
|
||||||
|
|
||||||
/** Config param for timeline service storage tmp root for FILE YARN-3264 */
|
/** Config param for timeline service storage tmp root for FILE YARN-3264. */
|
||||||
public static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT
|
public static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT
|
||||||
= YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.root-dir";
|
= YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.root-dir";
|
||||||
|
|
||||||
/** default value for storage location on local disk */
|
/** default value for storage location on local disk. */
|
||||||
public static final String DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT
|
public static final String DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT
|
||||||
= "/tmp/timeline_service_data";
|
= "/tmp/timeline_service_data";
|
||||||
|
|
||||||
public static final String ENTITIES_DIR = "entities";
|
public static final String ENTITIES_DIR = "entities";
|
||||||
|
|
||||||
/** Default extension for output files */
|
/** Default extension for output files. */
|
||||||
public static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist";
|
public static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist";
|
||||||
|
|
||||||
FileSystemTimelineWriterImpl() {
|
FileSystemTimelineWriterImpl() {
|
||||||
|
@ -81,9 +81,11 @@ public class FileSystemTimelineWriterImpl extends AbstractService
|
||||||
TimelineWriteResponse response) throws IOException {
|
TimelineWriteResponse response) throws IOException {
|
||||||
PrintWriter out = null;
|
PrintWriter out = null;
|
||||||
try {
|
try {
|
||||||
String dir = mkdirs(outputRoot, ENTITIES_DIR, clusterId, userId,flowName,
|
String dir = mkdirs(outputRoot, ENTITIES_DIR, clusterId, userId,
|
||||||
flowVersion, String.valueOf(flowRun), appId, entity.getType());
|
escape(flowName), escape(flowVersion), String.valueOf(flowRun), appId,
|
||||||
String fileName = dir + entity.getId() + TIMELINE_SERVICE_STORAGE_EXTENSION;
|
entity.getType());
|
||||||
|
String fileName = dir + entity.getId() +
|
||||||
|
TIMELINE_SERVICE_STORAGE_EXTENSION;
|
||||||
out =
|
out =
|
||||||
new PrintWriter(new BufferedWriter(new OutputStreamWriter(
|
new PrintWriter(new BufferedWriter(new OutputStreamWriter(
|
||||||
new FileOutputStream(fileName, true), "UTF-8")));
|
new FileOutputStream(fileName, true), "UTF-8")));
|
||||||
|
@ -145,4 +147,9 @@ public class FileSystemTimelineWriterImpl extends AbstractService
|
||||||
}
|
}
|
||||||
return path.toString();
|
return path.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// specifically escape the separator character
|
||||||
|
private static String escape(String str) {
|
||||||
|
return str.replace(File.separatorChar, '_');
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,24 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
package org.apache.hadoop.yarn.server.timelineservice.storage;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
Loading…
Reference in New Issue