MAPREDUCE-6546. reconcile the two versions of the timeline service performance tests. (Sangjin Lee via Naganarasimha G R)
This commit is contained in:
parent
d56dde490b
commit
4ba6354717
|
@ -1,53 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.mapred;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
|
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
|
|
||||||
|
|
||||||
class JobHistoryFileParser {
|
|
||||||
private static final Log LOG = LogFactory.getLog(JobHistoryFileParser.class);
|
|
||||||
|
|
||||||
private final FileSystem fs;
|
|
||||||
|
|
||||||
public JobHistoryFileParser(FileSystem fs) {
|
|
||||||
LOG.info("JobHistoryFileParser created with " + fs);
|
|
||||||
this.fs = fs;
|
|
||||||
}
|
|
||||||
|
|
||||||
public JobInfo parseHistoryFile(Path path) throws IOException {
|
|
||||||
LOG.info("parsing job history file " + path);
|
|
||||||
JobHistoryParser parser = new JobHistoryParser(fs, path);
|
|
||||||
return parser.parse();
|
|
||||||
}
|
|
||||||
|
|
||||||
public Configuration parseConfiguration(Path path) throws IOException {
|
|
||||||
LOG.info("parsing job configuration file " + path);
|
|
||||||
Configuration conf = new Configuration(false);
|
|
||||||
conf.addResource(fs.open(path));
|
|
||||||
return conf;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,229 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.mapred;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Date;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.conf.Configured;
|
|
||||||
import org.apache.hadoop.io.IntWritable;
|
|
||||||
import org.apache.hadoop.io.Writable;
|
|
||||||
import org.apache.hadoop.mapreduce.Job;
|
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
||||||
import org.apache.hadoop.mapreduce.SleepJob.SleepInputFormat;
|
|
||||||
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
|
|
||||||
import org.apache.hadoop.util.GenericOptionsParser;
|
|
||||||
import org.apache.hadoop.util.Tool;
|
|
||||||
import org.apache.hadoop.util.ToolRunner;
|
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
|
|
||||||
|
|
||||||
public class TimelineServicePerformanceV2 extends Configured implements Tool {
|
|
||||||
static final int NUM_MAPS_DEFAULT = 1;
|
|
||||||
|
|
||||||
static final int SIMPLE_ENTITY_WRITER = 1;
|
|
||||||
static final int JOB_HISTORY_FILE_REPLAY_MAPPER = 2;
|
|
||||||
static int mapperType = SIMPLE_ENTITY_WRITER;
|
|
||||||
|
|
||||||
protected static int printUsage() {
|
|
||||||
System.err.println(
|
|
||||||
"Usage: [-m <maps>] number of mappers (default: " + NUM_MAPS_DEFAULT +
|
|
||||||
")\n" +
|
|
||||||
" [-mtype <mapper type in integer>]\n" +
|
|
||||||
" 1. simple entity write mapper\n" +
|
|
||||||
" 2. job history file replay mapper\n" +
|
|
||||||
" [-s <(KBs)test>] number of KB per put (mtype=1, default: " +
|
|
||||||
SimpleEntityWriter.KBS_SENT_DEFAULT + " KB)\n" +
|
|
||||||
" [-t] package sending iterations per mapper (mtype=1, default: " +
|
|
||||||
SimpleEntityWriter.TEST_TIMES_DEFAULT + ")\n" +
|
|
||||||
" [-d <path>] root path of job history files (mtype=2)\n" +
|
|
||||||
" [-r <replay mode>] (mtype=2)\n" +
|
|
||||||
" 1. write all entities for a job in one put (default)\n" +
|
|
||||||
" 2. write one entity at a time\n");
|
|
||||||
GenericOptionsParser.printGenericCommandUsage(System.err);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Configure a job given argv.
|
|
||||||
*/
|
|
||||||
public static boolean parseArgs(String[] args, Job job) throws IOException {
|
|
||||||
// set the common defaults
|
|
||||||
Configuration conf = job.getConfiguration();
|
|
||||||
conf.setInt(MRJobConfig.NUM_MAPS, NUM_MAPS_DEFAULT);
|
|
||||||
|
|
||||||
for (int i = 0; i < args.length; i++) {
|
|
||||||
if (args.length == i + 1) {
|
|
||||||
System.out.println("ERROR: Required parameter missing from " + args[i]);
|
|
||||||
return printUsage() == 0;
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
if ("-m".equals(args[i])) {
|
|
||||||
if (Integer.parseInt(args[++i]) > 0) {
|
|
||||||
job.getConfiguration()
|
|
||||||
.setInt(MRJobConfig.NUM_MAPS, Integer.parseInt(args[i]));
|
|
||||||
}
|
|
||||||
} else if ("-mtype".equals(args[i])) {
|
|
||||||
mapperType = Integer.parseInt(args[++i]);
|
|
||||||
} else if ("-s".equals(args[i])) {
|
|
||||||
if (Integer.parseInt(args[++i]) > 0) {
|
|
||||||
conf.setInt(SimpleEntityWriter.KBS_SENT, Integer.parseInt(args[i]));
|
|
||||||
}
|
|
||||||
} else if ("-t".equals(args[i])) {
|
|
||||||
if (Integer.parseInt(args[++i]) > 0) {
|
|
||||||
conf.setInt(SimpleEntityWriter.TEST_TIMES,
|
|
||||||
Integer.parseInt(args[i]));
|
|
||||||
}
|
|
||||||
} else if ("-d".equals(args[i])) {
|
|
||||||
conf.set(JobHistoryFileReplayMapper.PROCESSING_PATH, args[++i]);
|
|
||||||
} else if ("-r".equals(args[i])) {
|
|
||||||
conf.setInt(JobHistoryFileReplayMapper.REPLAY_MODE,
|
|
||||||
Integer.parseInt(args[++i]));
|
|
||||||
} else {
|
|
||||||
System.out.println("Unexpected argument: " + args[i]);
|
|
||||||
return printUsage() == 0;
|
|
||||||
}
|
|
||||||
} catch (NumberFormatException except) {
|
|
||||||
System.out.println("ERROR: Integer expected instead of " + args[i]);
|
|
||||||
return printUsage() == 0;
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw (IOException)new IOException().initCause(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// handle mapper-specific settings
|
|
||||||
switch (mapperType) {
|
|
||||||
case JOB_HISTORY_FILE_REPLAY_MAPPER:
|
|
||||||
job.setMapperClass(JobHistoryFileReplayMapper.class);
|
|
||||||
String processingPath =
|
|
||||||
conf.get(JobHistoryFileReplayMapper.PROCESSING_PATH);
|
|
||||||
if (processingPath == null || processingPath.isEmpty()) {
|
|
||||||
System.out.println("processing path is missing while mtype = 2");
|
|
||||||
return printUsage() == 0;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case SIMPLE_ENTITY_WRITER:
|
|
||||||
default:
|
|
||||||
job.setMapperClass(SimpleEntityWriter.class);
|
|
||||||
// use the current timestamp as the "run id" of the test: this will
|
|
||||||
// be used as simulating the cluster timestamp for apps
|
|
||||||
conf.setLong(SimpleEntityWriter.TIMELINE_SERVICE_PERFORMANCE_RUN_ID,
|
|
||||||
System.currentTimeMillis());
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* TimelineServer Performance counters
|
|
||||||
*/
|
|
||||||
static enum PerfCounters {
|
|
||||||
TIMELINE_SERVICE_WRITE_TIME,
|
|
||||||
TIMELINE_SERVICE_WRITE_COUNTER,
|
|
||||||
TIMELINE_SERVICE_WRITE_FAILURES,
|
|
||||||
TIMELINE_SERVICE_WRITE_KBS,
|
|
||||||
}
|
|
||||||
|
|
||||||
public int run(String[] args) throws Exception {
|
|
||||||
|
|
||||||
Job job = Job.getInstance(getConf());
|
|
||||||
job.setJarByClass(TimelineServicePerformanceV2.class);
|
|
||||||
job.setMapperClass(SimpleEntityWriter.class);
|
|
||||||
job.setInputFormatClass(SleepInputFormat.class);
|
|
||||||
job.setOutputFormatClass(NullOutputFormat.class);
|
|
||||||
job.setNumReduceTasks(0);
|
|
||||||
if (!parseArgs(args, job)) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
Date startTime = new Date();
|
|
||||||
System.out.println("Job started: " + startTime);
|
|
||||||
int ret = job.waitForCompletion(true) ? 0 : 1;
|
|
||||||
org.apache.hadoop.mapreduce.Counters counters = job.getCounters();
|
|
||||||
long writetime =
|
|
||||||
counters.findCounter(PerfCounters.TIMELINE_SERVICE_WRITE_TIME).getValue();
|
|
||||||
long writecounts =
|
|
||||||
counters.findCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER).getValue();
|
|
||||||
long writesize =
|
|
||||||
counters.findCounter(PerfCounters.TIMELINE_SERVICE_WRITE_KBS).getValue();
|
|
||||||
double transacrate = writecounts * 1000 / (double)writetime;
|
|
||||||
double iorate = writesize * 1000 / (double)writetime;
|
|
||||||
int numMaps =
|
|
||||||
Integer.parseInt(job.getConfiguration().get(MRJobConfig.NUM_MAPS));
|
|
||||||
|
|
||||||
System.out.println("TRANSACTION RATE (per mapper): " + transacrate +
|
|
||||||
" ops/s");
|
|
||||||
System.out.println("IO RATE (per mapper): " + iorate + " KB/s");
|
|
||||||
|
|
||||||
System.out.println("TRANSACTION RATE (total): " + transacrate*numMaps +
|
|
||||||
" ops/s");
|
|
||||||
System.out.println("IO RATE (total): " + iorate*numMaps + " KB/s");
|
|
||||||
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
|
||||||
int res =
|
|
||||||
ToolRunner.run(new Configuration(), new TimelineServicePerformanceV2(),
|
|
||||||
args);
|
|
||||||
System.exit(res);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* To ensure that the compression really gets exercised, generate a
|
|
||||||
* random alphanumeric fixed length payload
|
|
||||||
*/
|
|
||||||
static final char[] alphaNums = new char[] { 'a', 'b', 'c', 'd', 'e', 'f',
|
|
||||||
'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r',
|
|
||||||
's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D',
|
|
||||||
'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P',
|
|
||||||
'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '1', '2',
|
|
||||||
'3', '4', '5', '6', '7', '8', '9', '0', ' ' };
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Base mapper for writing entities to the timeline service. Subclasses
|
|
||||||
* override {@link #writeEntities(Configuration, TimelineCollectorManager,
|
|
||||||
* org.apache.hadoop.mapreduce.Mapper.Context)} to create and write entities
|
|
||||||
* to the timeline service.
|
|
||||||
*/
|
|
||||||
public static abstract class EntityWriter
|
|
||||||
extends org.apache.hadoop.mapreduce.Mapper<IntWritable,IntWritable,Writable,Writable> {
|
|
||||||
@Override
|
|
||||||
public void map(IntWritable key, IntWritable val, Context context)
|
|
||||||
throws IOException {
|
|
||||||
|
|
||||||
// create the timeline collector manager wired with the writer
|
|
||||||
Configuration tlConf = new YarnConfiguration();
|
|
||||||
TimelineCollectorManager manager = new TimelineCollectorManager("test");
|
|
||||||
manager.init(tlConf);
|
|
||||||
manager.start();
|
|
||||||
try {
|
|
||||||
// invoke the method to have the subclass write entities
|
|
||||||
writeEntities(tlConf, manager, context);
|
|
||||||
} finally {
|
|
||||||
manager.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected abstract void writeEntities(Configuration tlConf,
|
|
||||||
TimelineCollectorManager manager, Context context) throws IOException;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,56 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapreduce;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.io.IntWritable;
|
||||||
|
import org.apache.hadoop.io.Writable;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Base mapper for writing entities to the timeline service. Subclasses
|
||||||
|
* override {@link #writeEntities(Configuration, TimelineCollectorManager,
|
||||||
|
* org.apache.hadoop.mapreduce.Mapper.Context)} to create and write entities
|
||||||
|
* to the timeline service.
|
||||||
|
*/
|
||||||
|
abstract class EntityWriterV2
|
||||||
|
extends org.apache.hadoop.mapreduce.Mapper<IntWritable,IntWritable,Writable,Writable> {
|
||||||
|
@Override
|
||||||
|
public void map(IntWritable key, IntWritable val, Context context)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
// create the timeline collector manager wired with the writer
|
||||||
|
Configuration tlConf = new YarnConfiguration();
|
||||||
|
TimelineCollectorManager manager = new TimelineCollectorManager("test");
|
||||||
|
manager.init(tlConf);
|
||||||
|
manager.start();
|
||||||
|
try {
|
||||||
|
// invoke the method to have the subclass write entities
|
||||||
|
writeEntities(tlConf, manager, context);
|
||||||
|
} finally {
|
||||||
|
manager.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract void writeEntities(Configuration tlConf,
|
||||||
|
TimelineCollectorManager manager, Context context) throws IOException;
|
||||||
|
}
|
|
@ -20,33 +20,21 @@ package org.apache.hadoop.mapreduce;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.regex.Matcher;
|
|
||||||
import java.util.regex.Pattern;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.RemoteIterator;
|
|
||||||
import org.apache.hadoop.io.IntWritable;
|
import org.apache.hadoop.io.IntWritable;
|
||||||
import org.apache.hadoop.io.Writable;
|
import org.apache.hadoop.io.Writable;
|
||||||
import org.apache.hadoop.mapreduce.Mapper.Context;
|
|
||||||
import org.apache.hadoop.mapreduce.TimelineServicePerformance.PerfCounters;
|
|
||||||
import org.apache.hadoop.mapreduce.JobHistoryFileReplayHelper;
|
|
||||||
import org.apache.hadoop.mapreduce.JobHistoryFileReplayHelper.JobFiles;
|
import org.apache.hadoop.mapreduce.JobHistoryFileReplayHelper.JobFiles;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.TimelineServicePerformance.PerfCounters;
|
||||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||||
import org.apache.hadoop.yarn.client.api.TimelineClient;
|
import org.apache.hadoop.yarn.client.api.TimelineClient;
|
||||||
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
|
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
|
||||||
|
|
|
@ -16,28 +16,19 @@
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.apache.hadoop.mapred;
|
package org.apache.hadoop.mapreduce;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.regex.Matcher;
|
|
||||||
import java.util.regex.Pattern;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.mapred.JobID;
|
||||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
import org.apache.hadoop.mapreduce.JobHistoryFileReplayHelper.JobFiles;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.mapreduce.TimelineServicePerformance.PerfCounters;
|
||||||
import org.apache.hadoop.fs.RemoteIterator;
|
|
||||||
import org.apache.hadoop.mapred.TimelineServicePerformanceV2.EntityWriter;
|
|
||||||
import org.apache.hadoop.mapred.TimelineServicePerformanceV2.PerfCounters;
|
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
||||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
@ -49,96 +40,24 @@ import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
|
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Mapper for TimelineServicePerformanceV2 that replays job history files to the
|
* Mapper for TimelineServicePerformance that replays job history files to the
|
||||||
* timeline service.
|
* timeline service v.2.
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
class JobHistoryFileReplayMapper extends EntityWriter {
|
class JobHistoryFileReplayMapperV2 extends EntityWriterV2 {
|
||||||
private static final Log LOG =
|
private static final Log LOG =
|
||||||
LogFactory.getLog(JobHistoryFileReplayMapper.class);
|
LogFactory.getLog(JobHistoryFileReplayMapperV2.class);
|
||||||
|
|
||||||
static final String PROCESSING_PATH = "processing path";
|
|
||||||
static final String REPLAY_MODE = "replay mode";
|
|
||||||
static final int WRITE_ALL_AT_ONCE = 1;
|
|
||||||
static final int WRITE_PER_ENTITY = 2;
|
|
||||||
static final int REPLAY_MODE_DEFAULT = WRITE_ALL_AT_ONCE;
|
|
||||||
|
|
||||||
private static final Pattern JOB_ID_PARSER =
|
|
||||||
Pattern.compile("^(job_[0-9]+_([0-9]+)).*");
|
|
||||||
|
|
||||||
public static class JobFiles {
|
|
||||||
private final String jobId;
|
|
||||||
private Path jobHistoryFilePath;
|
|
||||||
private Path jobConfFilePath;
|
|
||||||
|
|
||||||
public JobFiles(String jobId) {
|
|
||||||
this.jobId = jobId;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getJobId() {
|
|
||||||
return jobId;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Path getJobHistoryFilePath() {
|
|
||||||
return jobHistoryFilePath;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setJobHistoryFilePath(Path jobHistoryFilePath) {
|
|
||||||
this.jobHistoryFilePath = jobHistoryFilePath;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Path getJobConfFilePath() {
|
|
||||||
return jobConfFilePath;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setJobConfFilePath(Path jobConfFilePath) {
|
|
||||||
this.jobConfFilePath = jobConfFilePath;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int hashCode() {
|
|
||||||
return jobId.hashCode();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean equals(Object obj) {
|
|
||||||
if (this == obj) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
if (obj == null) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (getClass() != obj.getClass()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
JobFiles other = (JobFiles) obj;
|
|
||||||
return jobId.equals(other.jobId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private enum FileType { JOB_HISTORY_FILE, JOB_CONF_FILE, UNKNOWN }
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void writeEntities(Configuration tlConf,
|
protected void writeEntities(Configuration tlConf,
|
||||||
TimelineCollectorManager manager, Context context) throws IOException {
|
TimelineCollectorManager manager, Context context) throws IOException {
|
||||||
// collect the apps it needs to process
|
JobHistoryFileReplayHelper helper = new JobHistoryFileReplayHelper(context);
|
||||||
Configuration conf = context.getConfiguration();
|
int replayMode = helper.getReplayMode();
|
||||||
int taskId = context.getTaskAttemptID().getTaskID().getId();
|
JobHistoryFileParser parser = helper.getParser();
|
||||||
int size = conf.getInt(MRJobConfig.NUM_MAPS,
|
TimelineEntityConverterV2 converter = new TimelineEntityConverterV2();
|
||||||
TimelineServicePerformanceV2.NUM_MAPS_DEFAULT);
|
|
||||||
String processingDir =
|
|
||||||
conf.get(JobHistoryFileReplayMapper.PROCESSING_PATH);
|
|
||||||
int replayMode =
|
|
||||||
conf.getInt(JobHistoryFileReplayMapper.REPLAY_MODE,
|
|
||||||
JobHistoryFileReplayMapper.REPLAY_MODE_DEFAULT);
|
|
||||||
Path processingPath = new Path(processingDir);
|
|
||||||
FileSystem processingFs = processingPath.getFileSystem(conf);
|
|
||||||
JobHistoryFileParser parser = new JobHistoryFileParser(processingFs);
|
|
||||||
TimelineEntityConverter converter = new TimelineEntityConverter();
|
|
||||||
|
|
||||||
Collection<JobFiles> jobs =
|
// collect the apps it needs to process
|
||||||
selectJobFiles(processingFs, processingPath, taskId, size);
|
Collection<JobFiles> jobs = helper.getJobFiles();
|
||||||
if (jobs.isEmpty()) {
|
if (jobs.isEmpty()) {
|
||||||
LOG.info(context.getTaskAttemptID().getTaskID() +
|
LOG.info(context.getTaskAttemptID().getTaskID() +
|
||||||
" will process no jobs");
|
" will process no jobs");
|
||||||
|
@ -149,6 +68,13 @@ class JobHistoryFileReplayMapper extends EntityWriter {
|
||||||
for (JobFiles job: jobs) {
|
for (JobFiles job: jobs) {
|
||||||
// process each job
|
// process each job
|
||||||
String jobIdStr = job.getJobId();
|
String jobIdStr = job.getJobId();
|
||||||
|
// skip if either of the file is missing
|
||||||
|
if (job.getJobConfFilePath() == null ||
|
||||||
|
job.getJobHistoryFilePath() == null) {
|
||||||
|
LOG.info(jobIdStr + " missing either the job history file or the " +
|
||||||
|
"configuration file. Skipping.");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
LOG.info("processing " + jobIdStr + "...");
|
LOG.info("processing " + jobIdStr + "...");
|
||||||
JobId jobId = TypeConverter.toYarn(JobID.forName(jobIdStr));
|
JobId jobId = TypeConverter.toYarn(JobID.forName(jobIdStr));
|
||||||
ApplicationId appId = jobId.getAppId();
|
ApplicationId appId = jobId.getAppId();
|
||||||
|
@ -184,10 +110,10 @@ class JobHistoryFileReplayMapper extends EntityWriter {
|
||||||
long startWrite = System.nanoTime();
|
long startWrite = System.nanoTime();
|
||||||
try {
|
try {
|
||||||
switch (replayMode) {
|
switch (replayMode) {
|
||||||
case JobHistoryFileReplayMapper.WRITE_ALL_AT_ONCE:
|
case JobHistoryFileReplayHelper.WRITE_ALL_AT_ONCE:
|
||||||
writeAllEntities(collector, entitySet, ugi);
|
writeAllEntities(collector, entitySet, ugi);
|
||||||
break;
|
break;
|
||||||
case JobHistoryFileReplayMapper.WRITE_PER_ENTITY:
|
case JobHistoryFileReplayHelper.WRITE_PER_ENTITY:
|
||||||
writePerEntity(collector, entitySet, ugi);
|
writePerEntity(collector, entitySet, ugi);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
|
@ -232,70 +158,4 @@ class JobHistoryFileReplayMapper extends EntityWriter {
|
||||||
LOG.info("wrote entity " + entity.getId());
|
LOG.info("wrote entity " + entity.getId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Collection<JobFiles> selectJobFiles(FileSystem fs,
|
|
||||||
Path processingRoot, int i, int size) throws IOException {
|
|
||||||
Map<String,JobFiles> jobs = new HashMap<>();
|
|
||||||
RemoteIterator<LocatedFileStatus> it = fs.listFiles(processingRoot, true);
|
|
||||||
while (it.hasNext()) {
|
|
||||||
LocatedFileStatus status = it.next();
|
|
||||||
Path path = status.getPath();
|
|
||||||
String fileName = path.getName();
|
|
||||||
Matcher m = JOB_ID_PARSER.matcher(fileName);
|
|
||||||
if (!m.matches()) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
String jobId = m.group(1);
|
|
||||||
int lastId = Integer.parseInt(m.group(2));
|
|
||||||
int mod = lastId % size;
|
|
||||||
if (mod != i) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
LOG.info("this mapper will process file " + fileName);
|
|
||||||
// it's mine
|
|
||||||
JobFiles jobFiles = jobs.get(jobId);
|
|
||||||
if (jobFiles == null) {
|
|
||||||
jobFiles = new JobFiles(jobId);
|
|
||||||
jobs.put(jobId, jobFiles);
|
|
||||||
}
|
|
||||||
setFilePath(fileName, path, jobFiles);
|
|
||||||
}
|
|
||||||
return jobs.values();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void setFilePath(String fileName, Path path,
|
|
||||||
JobFiles jobFiles) {
|
|
||||||
// determine if we're dealing with a job history file or a job conf file
|
|
||||||
FileType type = getFileType(fileName);
|
|
||||||
switch (type) {
|
|
||||||
case JOB_HISTORY_FILE:
|
|
||||||
if (jobFiles.getJobHistoryFilePath() == null) {
|
|
||||||
jobFiles.setJobHistoryFilePath(path);
|
|
||||||
} else {
|
|
||||||
LOG.warn("we already have the job history file " +
|
|
||||||
jobFiles.getJobHistoryFilePath() + ": skipping " + path);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case JOB_CONF_FILE:
|
|
||||||
if (jobFiles.getJobConfFilePath() == null) {
|
|
||||||
jobFiles.setJobConfFilePath(path);
|
|
||||||
} else {
|
|
||||||
LOG.warn("we already have the job conf file " +
|
|
||||||
jobFiles.getJobConfFilePath() + ": skipping " + path);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case UNKNOWN:
|
|
||||||
LOG.warn("unknown type: " + path);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private FileType getFileType(String fileName) {
|
|
||||||
if (fileName.endsWith(".jhist")) {
|
|
||||||
return FileType.JOB_HISTORY_FILE;
|
|
||||||
}
|
|
||||||
if (fileName.endsWith("_conf.xml")) {
|
|
||||||
return FileType.JOB_CONF_FILE;
|
|
||||||
}
|
|
||||||
return FileType.UNKNOWN;
|
|
||||||
}
|
|
||||||
}
|
}
|
|
@ -0,0 +1,43 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapreduce;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constants for simple entity writers.
|
||||||
|
*/
|
||||||
|
interface SimpleEntityWriterConstants {
|
||||||
|
// constants for mtype = 1
|
||||||
|
String KBS_SENT = "kbs sent";
|
||||||
|
int KBS_SENT_DEFAULT = 1;
|
||||||
|
String TEST_TIMES = "testtimes";
|
||||||
|
int TEST_TIMES_DEFAULT = 100;
|
||||||
|
String TIMELINE_SERVICE_PERFORMANCE_RUN_ID =
|
||||||
|
"timeline.server.performance.run.id";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* To ensure that the compression really gets exercised, generate a
|
||||||
|
* random alphanumeric fixed length payload
|
||||||
|
*/
|
||||||
|
char[] ALPHA_NUMS = new char[] { 'a', 'b', 'c', 'd', 'e', 'f',
|
||||||
|
'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r',
|
||||||
|
's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D',
|
||||||
|
'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P',
|
||||||
|
'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '1', '2',
|
||||||
|
'3', '4', '5', '6', '7', '8', '9', '0', ' ' };
|
||||||
|
}
|
|
@ -27,44 +27,22 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.io.IntWritable;
|
import org.apache.hadoop.io.IntWritable;
|
||||||
import org.apache.hadoop.io.Writable;
|
import org.apache.hadoop.io.Writable;
|
||||||
import org.apache.hadoop.mapreduce.Mapper.Context;
|
|
||||||
import org.apache.hadoop.mapreduce.TimelineServicePerformance.PerfCounters;
|
import org.apache.hadoop.mapreduce.TimelineServicePerformance.PerfCounters;
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
|
||||||
import org.apache.hadoop.yarn.client.api.TimelineClient;
|
import org.apache.hadoop.yarn.client.api.TimelineClient;
|
||||||
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
|
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Adds simple entities with random string payload, events, metrics, and
|
* Adds simple entities with random string payload, events, metrics, and
|
||||||
* configuration.
|
* configuration.
|
||||||
*/
|
*/
|
||||||
class SimpleEntityWriterV1 extends
|
class SimpleEntityWriterV1
|
||||||
org.apache.hadoop.mapreduce.Mapper<IntWritable,IntWritable,Writable,Writable> {
|
extends org.apache.hadoop.mapreduce.Mapper<IntWritable,IntWritable,Writable,Writable>
|
||||||
|
implements SimpleEntityWriterConstants {
|
||||||
private static final Log LOG = LogFactory.getLog(SimpleEntityWriterV1.class);
|
private static final Log LOG = LogFactory.getLog(SimpleEntityWriterV1.class);
|
||||||
|
|
||||||
// constants for mtype = 1
|
|
||||||
static final String KBS_SENT = "kbs sent";
|
|
||||||
static final int KBS_SENT_DEFAULT = 1;
|
|
||||||
static final String TEST_TIMES = "testtimes";
|
|
||||||
static final int TEST_TIMES_DEFAULT = 100;
|
|
||||||
static final String TIMELINE_SERVICE_PERFORMANCE_RUN_ID =
|
|
||||||
"timeline.server.performance.run.id";
|
|
||||||
/**
|
|
||||||
* To ensure that the compression really gets exercised, generate a
|
|
||||||
* random alphanumeric fixed length payload
|
|
||||||
*/
|
|
||||||
private static char[] ALPHA_NUMS = new char[] { 'a', 'b', 'c', 'd', 'e', 'f',
|
|
||||||
'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r',
|
|
||||||
's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D',
|
|
||||||
'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P',
|
|
||||||
'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '1', '2',
|
|
||||||
'3', '4', '5', '6', '7', '8', '9', '0', ' ' };
|
|
||||||
|
|
||||||
public void map(IntWritable key, IntWritable val, Context context) throws IOException {
|
public void map(IntWritable key, IntWritable val, Context context) throws IOException {
|
||||||
TimelineClient tlc = new TimelineClientImpl();
|
TimelineClient tlc = new TimelineClientImpl();
|
||||||
Configuration conf = context.getConfiguration();
|
Configuration conf = context.getConfiguration();
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.apache.hadoop.mapred;
|
package org.apache.hadoop.mapreduce;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
@ -25,9 +25,7 @@ import java.util.concurrent.TimeUnit;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.mapred.TimelineServicePerformanceV2.EntityWriter;
|
import org.apache.hadoop.mapreduce.TimelineServicePerformance.PerfCounters;
|
||||||
import org.apache.hadoop.mapred.TimelineServicePerformanceV2.PerfCounters;
|
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
|
||||||
|
@ -39,19 +37,12 @@ import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
|
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Adds simple entities with random string payload, events, metrics, and
|
* Adds simple entities with random string payload, events, metrics, and
|
||||||
* configuration.
|
* configuration.
|
||||||
*/
|
*/
|
||||||
class SimpleEntityWriter extends EntityWriter {
|
class SimpleEntityWriterV2 extends EntityWriterV2
|
||||||
private static final Log LOG = LogFactory.getLog(SimpleEntityWriter.class);
|
implements SimpleEntityWriterConstants {
|
||||||
|
private static final Log LOG = LogFactory.getLog(SimpleEntityWriterV2.class);
|
||||||
// constants for mtype = 1
|
|
||||||
static final String KBS_SENT = "kbs sent";
|
|
||||||
static final int KBS_SENT_DEFAULT = 1;
|
|
||||||
static final String TEST_TIMES = "testtimes";
|
|
||||||
static final int TEST_TIMES_DEFAULT = 100;
|
|
||||||
static final String TIMELINE_SERVICE_PERFORMANCE_RUN_ID =
|
|
||||||
"timeline.server.performance.run.id";
|
|
||||||
|
|
||||||
protected void writeEntities(Configuration tlConf,
|
protected void writeEntities(Configuration tlConf,
|
||||||
TimelineCollectorManager manager, Context context) throws IOException {
|
TimelineCollectorManager manager, Context context) throws IOException {
|
||||||
|
@ -87,8 +78,8 @@ class SimpleEntityWriter extends EntityWriter {
|
||||||
// Generate a fixed length random payload
|
// Generate a fixed length random payload
|
||||||
for (int xx = 0; xx < kbs * 1024; xx++) {
|
for (int xx = 0; xx < kbs * 1024; xx++) {
|
||||||
int alphaNumIdx =
|
int alphaNumIdx =
|
||||||
rand.nextInt(TimelineServicePerformanceV2.alphaNums.length);
|
rand.nextInt(ALPHA_NUMS.length);
|
||||||
payLoad[xx] = TimelineServicePerformanceV2.alphaNums[alphaNumIdx];
|
payLoad[xx] = ALPHA_NUMS[alphaNumIdx];
|
||||||
}
|
}
|
||||||
String entId = taskAttemptId + "_" + Integer.toString(i);
|
String entId = taskAttemptId + "_" + Integer.toString(i);
|
||||||
final TimelineEntity entity = new TimelineEntity();
|
final TimelineEntity entity = new TimelineEntity();
|
|
@ -25,11 +25,6 @@ import java.util.Set;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.mapreduce.Counter;
|
|
||||||
import org.apache.hadoop.mapreduce.CounterGroup;
|
|
||||||
import org.apache.hadoop.mapreduce.Counters;
|
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
|
||||||
import org.apache.hadoop.mapreduce.TaskID;
|
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.apache.hadoop.mapred;
|
package org.apache.hadoop.mapreduce;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
@ -38,9 +38,9 @@ import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
||||||
|
|
||||||
class TimelineEntityConverter {
|
class TimelineEntityConverterV2 {
|
||||||
private static final Log LOG =
|
private static final Log LOG =
|
||||||
LogFactory.getLog(TimelineEntityConverter.class);
|
LogFactory.getLog(TimelineEntityConverterV2.class);
|
||||||
|
|
||||||
static final String JOB = "MAPREDUCE_JOB";
|
static final String JOB = "MAPREDUCE_JOB";
|
||||||
static final String TASK = "MAPREDUCE_TASK";
|
static final String TASK = "MAPREDUCE_TASK";
|
|
@ -23,8 +23,6 @@ import java.util.Date;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.conf.Configured;
|
import org.apache.hadoop.conf.Configured;
|
||||||
import org.apache.hadoop.mapreduce.Job;
|
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
||||||
import org.apache.hadoop.mapreduce.SleepJob.SleepInputFormat;
|
import org.apache.hadoop.mapreduce.SleepJob.SleepInputFormat;
|
||||||
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
|
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
|
||||||
import org.apache.hadoop.util.GenericOptionsParser;
|
import org.apache.hadoop.util.GenericOptionsParser;
|
||||||
|
@ -46,15 +44,19 @@ public class TimelineServicePerformance extends Configured implements Tool {
|
||||||
System.err.println(
|
System.err.println(
|
||||||
"Usage: [-m <maps>] number of mappers (default: " + NUM_MAPS_DEFAULT +
|
"Usage: [-m <maps>] number of mappers (default: " + NUM_MAPS_DEFAULT +
|
||||||
")\n" +
|
")\n" +
|
||||||
" [-v] timeline service version\n" +
|
" [-v] timeline service version (default: " +
|
||||||
" [-mtype <mapper type in integer>]\n" +
|
TIMELINE_SERVICE_VERSION_1 + ")\n" +
|
||||||
" 1. simple entity write mapper (default)\n" +
|
" 1. version 1.x\n" +
|
||||||
|
" 2. version 2.x\n" +
|
||||||
|
" [-mtype <mapper type in integer>] (default: " +
|
||||||
|
SIMPLE_ENTITY_WRITER + ")\n" +
|
||||||
|
" 1. simple entity write mapper\n" +
|
||||||
" 2. jobhistory files replay mapper\n" +
|
" 2. jobhistory files replay mapper\n" +
|
||||||
" [-s <(KBs)test>] number of KB per put (mtype=1, default: " +
|
" [-s <(KBs)test>] number of KB per put (mtype=1, default: " +
|
||||||
SimpleEntityWriterV1.KBS_SENT_DEFAULT + " KB)\n" +
|
SimpleEntityWriterConstants.KBS_SENT_DEFAULT + " KB)\n" +
|
||||||
" [-t] package sending iterations per mapper (mtype=1, default: " +
|
" [-t] package sending iterations per mapper (mtype=1, default: " +
|
||||||
SimpleEntityWriterV1.TEST_TIMES_DEFAULT + ")\n" +
|
SimpleEntityWriterConstants.TEST_TIMES_DEFAULT + ")\n" +
|
||||||
" [-d <path>] root path of job history files (mtype=2)\n" +
|
" [-d <path>] hdfs root path of job history files (mtype=2)\n" +
|
||||||
" [-r <replay mode>] (mtype=2)\n" +
|
" [-r <replay mode>] (mtype=2)\n" +
|
||||||
" 1. write all entities for a job in one put (default)\n" +
|
" 1. write all entities for a job in one put (default)\n" +
|
||||||
" 2. write one entity at a time\n");
|
" 2. write one entity at a time\n");
|
||||||
|
@ -78,8 +80,7 @@ public class TimelineServicePerformance extends Configured implements Tool {
|
||||||
try {
|
try {
|
||||||
if ("-v".equals(args[i])) {
|
if ("-v".equals(args[i])) {
|
||||||
timeline_service_version = Integer.parseInt(args[++i]);
|
timeline_service_version = Integer.parseInt(args[++i]);
|
||||||
}
|
} else if ("-m".equals(args[i])) {
|
||||||
if ("-m".equals(args[i])) {
|
|
||||||
if (Integer.parseInt(args[++i]) > 0) {
|
if (Integer.parseInt(args[++i]) > 0) {
|
||||||
job.getConfiguration()
|
job.getConfiguration()
|
||||||
.setInt(MRJobConfig.NUM_MAPS, Integer.parseInt(args[i]));
|
.setInt(MRJobConfig.NUM_MAPS, Integer.parseInt(args[i]));
|
||||||
|
@ -88,11 +89,12 @@ public class TimelineServicePerformance extends Configured implements Tool {
|
||||||
mapperType = Integer.parseInt(args[++i]);
|
mapperType = Integer.parseInt(args[++i]);
|
||||||
} else if ("-s".equals(args[i])) {
|
} else if ("-s".equals(args[i])) {
|
||||||
if (Integer.parseInt(args[++i]) > 0) {
|
if (Integer.parseInt(args[++i]) > 0) {
|
||||||
conf.setInt(SimpleEntityWriterV1.KBS_SENT, Integer.parseInt(args[i]));
|
conf.setInt(SimpleEntityWriterConstants.KBS_SENT,
|
||||||
|
Integer.parseInt(args[i]));
|
||||||
}
|
}
|
||||||
} else if ("-t".equals(args[i])) {
|
} else if ("-t".equals(args[i])) {
|
||||||
if (Integer.parseInt(args[++i]) > 0) {
|
if (Integer.parseInt(args[++i]) > 0) {
|
||||||
conf.setInt(SimpleEntityWriterV1.TEST_TIMES,
|
conf.setInt(SimpleEntityWriterConstants.TEST_TIMES,
|
||||||
Integer.parseInt(args[i]));
|
Integer.parseInt(args[i]));
|
||||||
}
|
}
|
||||||
} else if ("-d".equals(args[i])) {
|
} else if ("-d".equals(args[i])) {
|
||||||
|
@ -113,28 +115,40 @@ public class TimelineServicePerformance extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
|
|
||||||
// handle mapper-specific settings
|
// handle mapper-specific settings
|
||||||
switch (timeline_service_version) {
|
switch (mapperType) {
|
||||||
case TIMELINE_SERVICE_VERSION_1:
|
case JOB_HISTORY_FILE_REPLAY_MAPPER:
|
||||||
default:
|
String processingPath =
|
||||||
switch (mapperType) {
|
conf.get(JobHistoryFileReplayHelper.PROCESSING_PATH);
|
||||||
case JOB_HISTORY_FILE_REPLAY_MAPPER:
|
if (processingPath == null || processingPath.isEmpty()) {
|
||||||
job.setMapperClass(JobHistoryFileReplayMapperV1.class);
|
System.out.println("processing path is missing while mtype = 2");
|
||||||
String processingPath =
|
return printUsage() == 0;
|
||||||
conf.get(JobHistoryFileReplayHelper.PROCESSING_PATH);
|
}
|
||||||
if (processingPath == null || processingPath.isEmpty()) {
|
switch (timeline_service_version) {
|
||||||
System.out.println("processing path is missing while mtype = 2");
|
case TIMELINE_SERVICE_VERSION_2:
|
||||||
return printUsage() == 0;
|
job.setMapperClass(JobHistoryFileReplayMapperV2.class);
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
case SIMPLE_ENTITY_WRITER:
|
case TIMELINE_SERVICE_VERSION_1:
|
||||||
default:
|
default:
|
||||||
job.setMapperClass(SimpleEntityWriterV1.class);
|
job.setMapperClass(JobHistoryFileReplayMapperV1.class);
|
||||||
// use the current timestamp as the "run id" of the test: this will
|
|
||||||
// be used as simulating the cluster timestamp for apps
|
|
||||||
conf.setLong(SimpleEntityWriterV1.TIMELINE_SERVICE_PERFORMANCE_RUN_ID,
|
|
||||||
System.currentTimeMillis());
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
break;
|
||||||
|
case SIMPLE_ENTITY_WRITER:
|
||||||
|
default:
|
||||||
|
// use the current timestamp as the "run id" of the test: this will
|
||||||
|
// be used as simulating the cluster timestamp for apps
|
||||||
|
conf.setLong(SimpleEntityWriterConstants.TIMELINE_SERVICE_PERFORMANCE_RUN_ID,
|
||||||
|
System.currentTimeMillis());
|
||||||
|
switch (timeline_service_version) {
|
||||||
|
case TIMELINE_SERVICE_VERSION_2:
|
||||||
|
job.setMapperClass(SimpleEntityWriterV2.class);
|
||||||
|
break;
|
||||||
|
case TIMELINE_SERVICE_VERSION_1:
|
||||||
|
default:
|
||||||
|
job.setMapperClass(SimpleEntityWriterV1.class);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -164,25 +178,46 @@ public class TimelineServicePerformance extends Configured implements Tool {
|
||||||
Date startTime = new Date();
|
Date startTime = new Date();
|
||||||
System.out.println("Job started: " + startTime);
|
System.out.println("Job started: " + startTime);
|
||||||
int ret = job.waitForCompletion(true) ? 0 : 1;
|
int ret = job.waitForCompletion(true) ? 0 : 1;
|
||||||
org.apache.hadoop.mapreduce.Counters counters = job.getCounters();
|
if (job.isSuccessful()) {
|
||||||
long writetime =
|
org.apache.hadoop.mapreduce.Counters counters = job.getCounters();
|
||||||
counters.findCounter(PerfCounters.TIMELINE_SERVICE_WRITE_TIME).getValue();
|
long writecounts =
|
||||||
long writecounts =
|
counters.findCounter(
|
||||||
counters.findCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER).getValue();
|
PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER).getValue();
|
||||||
long writesize =
|
long writefailures =
|
||||||
counters.findCounter(PerfCounters.TIMELINE_SERVICE_WRITE_KBS).getValue();
|
counters.findCounter(
|
||||||
double transacrate = writecounts * 1000 / (double)writetime;
|
PerfCounters.TIMELINE_SERVICE_WRITE_FAILURES).getValue();
|
||||||
double iorate = writesize * 1000 / (double)writetime;
|
if (writefailures > 0 && writefailures == writecounts) {
|
||||||
int numMaps =
|
// see if we have a complete failure to write
|
||||||
Integer.parseInt(job.getConfiguration().get(MRJobConfig.NUM_MAPS));
|
System.out.println("Job failed: all writes failed!");
|
||||||
|
} else {
|
||||||
|
long writetime =
|
||||||
|
counters.findCounter(
|
||||||
|
PerfCounters.TIMELINE_SERVICE_WRITE_TIME).getValue();
|
||||||
|
long writesize =
|
||||||
|
counters.findCounter(
|
||||||
|
PerfCounters.TIMELINE_SERVICE_WRITE_KBS).getValue();
|
||||||
|
if (writetime == 0L) {
|
||||||
|
// see if write time is zero (normally shouldn't happen)
|
||||||
|
System.out.println("Job failed: write time is 0!");
|
||||||
|
} else {
|
||||||
|
double transacrate = writecounts * 1000 / (double)writetime;
|
||||||
|
double iorate = writesize * 1000 / (double)writetime;
|
||||||
|
int numMaps =
|
||||||
|
Integer.parseInt(
|
||||||
|
job.getConfiguration().get(MRJobConfig.NUM_MAPS));
|
||||||
|
|
||||||
System.out.println("TRANSACTION RATE (per mapper): " + transacrate +
|
System.out.println("TRANSACTION RATE (per mapper): " + transacrate +
|
||||||
" ops/s");
|
" ops/s");
|
||||||
System.out.println("IO RATE (per mapper): " + iorate + " KB/s");
|
System.out.println("IO RATE (per mapper): " + iorate + " KB/s");
|
||||||
|
|
||||||
System.out.println("TRANSACTION RATE (total): " + transacrate*numMaps +
|
System.out.println("TRANSACTION RATE (total): " +
|
||||||
" ops/s");
|
transacrate*numMaps + " ops/s");
|
||||||
System.out.println("IO RATE (total): " + iorate*numMaps + " KB/s");
|
System.out.println("IO RATE (total): " + iorate*numMaps + " KB/s");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
System.out.println("Job failed: " + job.getStatus().getFailureInfo());
|
||||||
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,19 @@
|
||||||
|
|
||||||
package org.apache.hadoop.test;
|
package org.apache.hadoop.test;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.DFSCIOTest;
|
||||||
|
import org.apache.hadoop.fs.DistributedFSCheck;
|
||||||
|
import org.apache.hadoop.fs.JHLogAnalyzer;
|
||||||
|
import org.apache.hadoop.fs.TestDFSIO;
|
||||||
|
import org.apache.hadoop.fs.TestFileSystem;
|
||||||
|
import org.apache.hadoop.fs.loadGenerator.DataGenerator;
|
||||||
|
import org.apache.hadoop.fs.loadGenerator.LoadGenerator;
|
||||||
|
import org.apache.hadoop.fs.loadGenerator.LoadGeneratorMR;
|
||||||
|
import org.apache.hadoop.fs.loadGenerator.StructureGenerator;
|
||||||
|
import org.apache.hadoop.fs.slive.SliveTest;
|
||||||
|
import org.apache.hadoop.hdfs.NNBench;
|
||||||
|
import org.apache.hadoop.hdfs.NNBenchWithoutMR;
|
||||||
|
import org.apache.hadoop.io.FileBench;
|
||||||
import org.apache.hadoop.io.TestSequenceFile;
|
import org.apache.hadoop.io.TestSequenceFile;
|
||||||
import org.apache.hadoop.mapred.BigMapOutput;
|
import org.apache.hadoop.mapred.BigMapOutput;
|
||||||
import org.apache.hadoop.mapred.GenericMRLoadGenerator;
|
import org.apache.hadoop.mapred.GenericMRLoadGenerator;
|
||||||
|
@ -28,28 +41,13 @@ import org.apache.hadoop.mapred.TestMapRed;
|
||||||
import org.apache.hadoop.mapred.TestSequenceFileInputFormat;
|
import org.apache.hadoop.mapred.TestSequenceFileInputFormat;
|
||||||
import org.apache.hadoop.mapred.TestTextInputFormat;
|
import org.apache.hadoop.mapred.TestTextInputFormat;
|
||||||
import org.apache.hadoop.mapred.ThreadedMapBenchmark;
|
import org.apache.hadoop.mapred.ThreadedMapBenchmark;
|
||||||
import org.apache.hadoop.mapreduce.TimelineServicePerformance;
|
|
||||||
import org.apache.hadoop.mapred.TimelineServicePerformanceV2;
|
|
||||||
import org.apache.hadoop.mapreduce.FailJob;
|
import org.apache.hadoop.mapreduce.FailJob;
|
||||||
import org.apache.hadoop.mapreduce.LargeSorter;
|
import org.apache.hadoop.mapreduce.LargeSorter;
|
||||||
import org.apache.hadoop.mapreduce.MiniHadoopClusterManager;
|
import org.apache.hadoop.mapreduce.MiniHadoopClusterManager;
|
||||||
import org.apache.hadoop.mapreduce.SleepJob;
|
import org.apache.hadoop.mapreduce.SleepJob;
|
||||||
|
import org.apache.hadoop.mapreduce.TimelineServicePerformance;
|
||||||
import org.apache.hadoop.util.ProgramDriver;
|
import org.apache.hadoop.util.ProgramDriver;
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.NNBench;
|
|
||||||
import org.apache.hadoop.hdfs.NNBenchWithoutMR;
|
|
||||||
import org.apache.hadoop.fs.TestFileSystem;
|
|
||||||
import org.apache.hadoop.fs.TestDFSIO;
|
|
||||||
import org.apache.hadoop.fs.DFSCIOTest;
|
|
||||||
import org.apache.hadoop.fs.DistributedFSCheck;
|
|
||||||
import org.apache.hadoop.io.FileBench;
|
|
||||||
import org.apache.hadoop.fs.JHLogAnalyzer;
|
|
||||||
import org.apache.hadoop.fs.loadGenerator.DataGenerator;
|
|
||||||
import org.apache.hadoop.fs.loadGenerator.LoadGenerator;
|
|
||||||
import org.apache.hadoop.fs.loadGenerator.LoadGeneratorMR;
|
|
||||||
import org.apache.hadoop.fs.loadGenerator.StructureGenerator;
|
|
||||||
import org.apache.hadoop.fs.slive.SliveTest;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Driver for Map-reduce tests.
|
* Driver for Map-reduce tests.
|
||||||
*
|
*
|
||||||
|
@ -93,9 +91,8 @@ public class MapredTestDriver {
|
||||||
pgd.addClass("sleep", SleepJob.class,
|
pgd.addClass("sleep", SleepJob.class,
|
||||||
"A job that sleeps at each map and reduce task.");
|
"A job that sleeps at each map and reduce task.");
|
||||||
pgd.addClass("timelineperformance", TimelineServicePerformance.class,
|
pgd.addClass("timelineperformance", TimelineServicePerformance.class,
|
||||||
"A job that launches mappers to test timlineserver performance.");
|
"A job that launches mappers to test timline service " +
|
||||||
pgd.addClass("timelineperformance", TimelineServicePerformanceV2.class,
|
"performance.");
|
||||||
"A job that launch mappers to test timline service v.2 performance.");
|
|
||||||
pgd.addClass("nnbench", NNBench.class,
|
pgd.addClass("nnbench", NNBench.class,
|
||||||
"A benchmark that stresses the namenode w/ MR.");
|
"A benchmark that stresses the namenode w/ MR.");
|
||||||
pgd.addClass("nnbenchWithoutMR", NNBenchWithoutMR.class,
|
pgd.addClass("nnbenchWithoutMR", NNBenchWithoutMR.class,
|
||||||
|
|
Loading…
Reference in New Issue