MAPREDUCE-3705. ant build fails on 0.23 branch. (Thomas Graves via mahadev)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1234228 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mahadev Konar 2012-01-21 01:17:43 +00:00
parent 6b3f5b7bd2
commit fbe394d208
11 changed files with 9 additions and 3978 deletions

View File

@ -481,6 +481,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3549. write api documentation for web service apis for RM, NM, MAPREDUCE-3549. write api documentation for web service apis for RM, NM,
mapreduce app master, and job history server (Thomas Graves via mahadev) mapreduce app master, and job history server (Thomas Graves via mahadev)
MAPREDUCE-3705. ant build fails on 0.23 branch. (Thomas Graves via
mahadev)
Release 0.23.0 - 2011-11-01 Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -575,8 +575,6 @@
<copy file="${test.src.dir}/mapred/org/apache/hadoop/mapred/test.tar" todir="${test.cache.data}"/> <copy file="${test.src.dir}/mapred/org/apache/hadoop/mapred/test.tar" todir="${test.cache.data}"/>
<copy file="${test.src.dir}/mapred/org/apache/hadoop/mapred/test.tgz" todir="${test.cache.data}"/> <copy file="${test.src.dir}/mapred/org/apache/hadoop/mapred/test.tgz" todir="${test.cache.data}"/>
<copy file="${test.src.dir}/mapred/org/apache/hadoop/mapred/test.tar.gz" todir="${test.cache.data}"/> <copy file="${test.src.dir}/mapred/org/apache/hadoop/mapred/test.tar.gz" todir="${test.cache.data}"/>
<copy file="${test.src.dir}/mapred/org/apache/hadoop/cli/testMRConf.xml" todir="${test.cache.data}"/>
<copy file="${test.src.dir}/mapred/org/apache/hadoop/cli/data60bytes" todir="${test.cache.data}"/>
<copy file="${test.src.dir}/mapred/org/apache/hadoop/mapred/concat.bz2" todir="${test.concat.data}"/> <copy file="${test.src.dir}/mapred/org/apache/hadoop/mapred/concat.bz2" todir="${test.concat.data}"/>
<copy file="${test.src.dir}/mapred/org/apache/hadoop/mapred/concat.gz" todir="${test.concat.data}"/> <copy file="${test.src.dir}/mapred/org/apache/hadoop/mapred/concat.gz" todir="${test.concat.data}"/>
<copy file="${test.src.dir}/mapred/org/apache/hadoop/mapred/testCompressThenConcat.txt.bz2" todir="${test.concat.data}"/> <copy file="${test.src.dir}/mapred/org/apache/hadoop/mapred/testCompressThenConcat.txt.bz2" todir="${test.concat.data}"/>

View File

@ -99,6 +99,8 @@
rev="${yarn.version}" conf="compile->default"> rev="${yarn.version}" conf="compile->default">
<artifact name="hadoop-mapreduce-client-jobclient" type="tests" ext="jar" m:classifier="tests"/> <artifact name="hadoop-mapreduce-client-jobclient" type="tests" ext="jar" m:classifier="tests"/>
</dependency> </dependency>
<dependency org="org.apache.hadoop" name="hadoop-rumen"
rev="${hadoop-common.version}" conf="compile->default"/>
<dependency org="org.apache.hadoop" name="hadoop-archives" <dependency org="org.apache.hadoop" name="hadoop-archives"
rev="${hadoop-common.version}" conf="compile->default"/> rev="${hadoop-common.version}" conf="compile->default"/>

View File

@ -82,5 +82,5 @@ xmlenc.version=0.52
xerces.version=1.4.4 xerces.version=1.4.4
jackson.version=1.8.2 jackson.version=1.8.2
yarn.version=0.24.0-SNAPSHOT yarn.version=0.23.1-SNAPSHOT
hadoop-mapreduce.version=0.24.0-SNAPSHOT hadoop-mapreduce.version=0.23.1-SNAPSHOT

View File

@ -70,6 +70,8 @@
</dependency> </dependency>
<dependency org="org.apache.hadoop" name="hadoop-archives" <dependency org="org.apache.hadoop" name="hadoop-archives"
rev="${hadoop-common.version}" conf="common->default"/> rev="${hadoop-common.version}" conf="common->default"/>
<dependency org="org.apache.hadoop" name="hadoop-rumen"
rev="${hadoop-common.version}" conf="common->default"/>
<dependency org="commons-logging" <dependency org="commons-logging"
name="commons-logging" name="commons-logging"
rev="${commons-logging.version}" rev="${commons-logging.version}"

View File

@ -1,136 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.rumen;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
public class TestConcurrentRead {
static final List<LoggedJob> cachedTrace = new ArrayList<LoggedJob>();
static final String traceFile =
"rumen/small-trace-test/job-tracker-logs-trace-output.gz";
static Configuration conf;
static FileSystem lfs;
static Path path;
@BeforeClass
static public void globalSetUp() throws IOException {
conf = new Configuration();
lfs = FileSystem.getLocal(conf);
Path rootInputDir = new Path(System.getProperty("test.tools.input.dir", ""))
.makeQualified(lfs.getUri(), lfs.getWorkingDirectory());
path = new Path(rootInputDir, traceFile);
JobTraceReader reader = new JobTraceReader(path, conf);
try {
LoggedJob job;
while ((job = reader.getNext()) != null) {
cachedTrace.add(job);
}
} finally {
reader.close();
}
}
void readAndCompare() throws IOException {
JobTraceReader reader = new JobTraceReader(path, conf);
try {
for (Iterator<LoggedJob> it = cachedTrace.iterator(); it.hasNext();) {
LoggedJob jobExpected = it.next();
LoggedJob jobRead = reader.getNext();
assertNotNull(jobRead);
try {
jobRead.deepCompare(jobExpected, null);
} catch (DeepInequalityException e) {
fail(e.toString());
}
}
assertNull(reader.getNext());
} finally {
reader.close();
}
}
class TestThread extends Thread {
final int repeat;
final CountDownLatch startSignal, doneSignal;
final Map<String, Throwable> errors;
TestThread(int id, int repeat, CountDownLatch startSignal, CountDownLatch doneSignal, Map<String, Throwable> errors) {
super(String.format("TestThread-%d", id));
this.repeat = repeat;
this.startSignal = startSignal;
this.doneSignal = doneSignal;
this.errors = errors;
}
@Override
public void run() {
try {
startSignal.await();
for (int i = 0; i < repeat; ++i) {
try {
readAndCompare();
} catch (Throwable e) {
errors.put(getName(), e);
break;
}
}
doneSignal.countDown();
} catch (Throwable e) {
errors.put(getName(), e);
}
}
}
@Test
public void testConcurrentRead() throws InterruptedException {
int nThr = conf.getInt("test.rumen.concurrent-read.threads", 4);
int repeat = conf.getInt("test.rumen.concurrent-read.repeat", 10);
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(nThr);
Map<String, Throwable> errors = Collections
.synchronizedMap(new TreeMap<String, Throwable>());
for (int i = 0; i < nThr; ++i) {
new TestThread(i, repeat, startSignal, doneSignal, errors).start();
}
startSignal.countDown();
doneSignal.await();
if (!errors.isEmpty()) {
StringBuilder sb = new StringBuilder();
for (Map.Entry<String, Throwable> e : errors.entrySet()) {
sb.append(String.format("%s:\n%s\n", e.getKey(), e.getValue().toString()));
}
fail(sb.toString());
}
}
}

View File

@ -1,105 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.rumen;
import org.apache.hadoop.util.StringUtils;
import org.junit.Test;
import static org.junit.Assert.*;
public class TestParsedLine {
static final char[] CHARS_TO_ESCAPE = new char[]{'=', '"', '.'};
String buildLine(String type, String[] kvseq) {
StringBuilder sb = new StringBuilder();
sb.append(type);
for (int i=0; i<kvseq.length; ++i) {
sb.append(" ");
if (kvseq[i].equals(".") || kvseq[i].equals("\n")) {
sb.append(kvseq[i]);
continue;
}
if (i == kvseq.length-1) {
fail("Incorrect input, expecting value.");
}
sb.append(kvseq[i++]);
sb.append("=\"");
sb.append(StringUtils.escapeString(kvseq[i], StringUtils.ESCAPE_CHAR,
CHARS_TO_ESCAPE));
sb.append("\"");
}
return sb.toString();
}
void testOneLine(String type, String... kvseq) {
String line = buildLine(type, kvseq);
ParsedLine pl = new ParsedLine(line, Hadoop20JHParser.internalVersion);
assertEquals("Mismatching type", type, pl.getType().toString());
for (int i = 0; i < kvseq.length; ++i) {
if (kvseq[i].equals(".") || kvseq[i].equals("\n")) {
continue;
}
assertEquals("Key mismatching for " + kvseq[i], kvseq[i + 1], StringUtils
.unEscapeString(pl.get(kvseq[i]), StringUtils.ESCAPE_CHAR,
CHARS_TO_ESCAPE));
++i;
}
}
@Test
public void testEscapedQuote() {
testOneLine("REC", "A", "x", "B", "abc\"de", "C", "f");
testOneLine("REC", "B", "abcde\"", "C", "f");
testOneLine("REC", "A", "x", "B", "\"abcde");
}
@Test
public void testEqualSign() {
testOneLine("REC1", "A", "x", "B", "abc=de", "C", "f");
testOneLine("REC2", "B", "=abcde", "C", "f");
testOneLine("REC3", "A", "x", "B", "abcde=");
}
@Test
public void testSpace() {
testOneLine("REC1", "A", "x", "B", "abc de", "C", "f");
testOneLine("REC2", "B", " ab c de", "C", "f");
testOneLine("REC3", "A", "x", "B", "abc\t de ");
}
@Test
public void testBackSlash() {
testOneLine("REC1", "A", "x", "B", "abc\\de", "C", "f");
testOneLine("REC2", "B", "\\ab\\c\\de", "C", "f");
testOneLine("REC3", "A", "x", "B", "abc\\\\de\\");
testOneLine("REC4", "A", "x", "B", "abc\\\"de\\\"", "C", "f");
}
@Test
public void testLineDelimiter() {
testOneLine("REC1", "A", "x", "B", "abc.de", "C", "f");
testOneLine("REC2", "B", ".ab.de");
testOneLine("REC3", "A", "x", "B", "abc.de.");
testOneLine("REC4", "A", "x", "B", "abc.de", ".");
}
@Test
public void testMultipleLines() {
testOneLine("REC1", "A", "x", "\n", "B", "abc.de", "\n", "C", "f", "\n", ".");
}
}

View File

@ -1,196 +0,0 @@
package org.apache.hadoop.tools.rumen;
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.io.InputStream;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.ToolRunner;
import org.junit.Test;
import static org.junit.Assert.*;
public class TestRumenFolder {
@Test
public void testFoldingSmallTrace() throws Exception {
final Configuration conf = new Configuration();
final FileSystem lfs = FileSystem.getLocal(conf);
@SuppressWarnings("deprecation")
final Path rootInputDir =
new Path(System.getProperty("test.tools.input.dir", ""))
.makeQualified(lfs);
@SuppressWarnings("deprecation")
final Path rootTempDir =
new Path(System.getProperty("test.build.data", "/tmp"))
.makeQualified(lfs);
final Path rootInputFile = new Path(rootInputDir, "rumen/small-trace-test");
final Path tempDir = new Path(rootTempDir, "TestRumenJobTraces");
lfs.delete(tempDir, true);
final Path foldedTracePath = new Path(tempDir, "folded-trace.json");
final Path inputFile =
new Path(rootInputFile, "folder-input-trace.json.gz");
System.out.println("folded trace result path = " + foldedTracePath);
String[] args =
{ "-input-cycle", "100S", "-output-duration", "300S",
"-skew-buffer-length", "1", "-seed", "100", "-concentration", "2",
inputFile.toString(), foldedTracePath.toString() };
final Path foldedGoldFile =
new Path(rootInputFile, "goldFoldedTrace.json.gz");
Folder folder = new Folder();
int result = ToolRunner.run(folder, args);
assertEquals("Non-zero exit", 0, result);
TestRumenFolder.<LoggedJob> jsonFileMatchesGold(conf, lfs, foldedTracePath,
foldedGoldFile, LoggedJob.class, "trace");
}
@Test
public void testStartsAfterOption() throws Exception {
final Configuration conf = new Configuration();
final FileSystem lfs = FileSystem.getLocal(conf);
@SuppressWarnings("deprecation")
final Path rootInputDir =
new Path(System.getProperty("test.tools.input.dir", ""))
.makeQualified(lfs);
@SuppressWarnings("deprecation")
final Path rootTempDir =
new Path(System.getProperty("test.build.data", "/tmp"))
.makeQualified(lfs);
final Path rootInputFile = new Path(rootInputDir, "rumen/small-trace-test");
final Path tempDir = new Path(rootTempDir, "TestRumenJobTraces");
lfs.delete(tempDir, true);
final Path inputFile =
new Path(rootInputFile, "goldFoldedTrace.json.gz");
final Path foldedTracePath = new Path(tempDir,
"folded-skippedjob-trace.json");
String[] args =
{ "-input-cycle", "300S", "-output-duration", "300S",
"-starts-after", "30S",
inputFile.toString(), foldedTracePath.toString() };
Folder folder = new Folder();
int result = ToolRunner.run(folder, args);
assertEquals("Non-zero exit", 0, result);
TestRumenFolder.<LoggedJob> checkValidityAfterSkippingJobs(conf, lfs, foldedTracePath,
inputFile, LoggedJob.class, "trace", 30000, 300000);
}
static private <T extends DeepCompare> void
checkValidityAfterSkippingJobs(Configuration conf,
FileSystem lfs, Path result, Path inputFile,
Class<? extends T> clazz, String fileDescription,
long startsAfter, long duration) throws IOException {
JsonObjectMapperParser<T> inputFileParser =
new JsonObjectMapperParser<T>(inputFile, clazz, conf);
InputStream resultStream = lfs.open(result);
JsonObjectMapperParser<T> resultParser =
new JsonObjectMapperParser<T>(resultStream, clazz);
List<Long> gpSubmitTimes = new LinkedList<Long>();
List<Long> rpSubmitTimes = new LinkedList<Long>();
try {
//Get submitTime of first job
LoggedJob firstJob = (LoggedJob)inputFileParser.getNext();
gpSubmitTimes.add(firstJob.getSubmitTime());
long absoluteStartsAfterTime = firstJob.getSubmitTime() + startsAfter;
//total duration
long endTime = firstJob.getSubmitTime() + duration;
//read original trace
LoggedJob oriJob = null;
while((oriJob = (LoggedJob)inputFileParser.getNext()) != null) {
gpSubmitTimes.add(oriJob.getSubmitTime());
}
//check if retained jobs have submittime > starts-after
LoggedJob job = null;
while((job = (LoggedJob) resultParser.getNext()) != null) {
assertTrue("job's submit time in the output trace is less " +
"than the specified value of starts-after",
(job.getSubmitTime() >= absoluteStartsAfterTime));
rpSubmitTimes.add(job.getSubmitTime());
}
List<Long> skippedJobs = new LinkedList<Long>();
skippedJobs.addAll(gpSubmitTimes);
skippedJobs.removeAll(rpSubmitTimes);
//check if the skipped job submittime < starts-after
for(Long submitTime : skippedJobs) {
assertTrue("skipped job submit time " + submitTime +
" in the trace is greater " +
"than the specified value of starts-after "
+ absoluteStartsAfterTime,
(submitTime < absoluteStartsAfterTime));
}
} finally {
IOUtils.cleanup(null, inputFileParser, resultParser);
}
}
static private <T extends DeepCompare> void jsonFileMatchesGold(
Configuration conf, FileSystem lfs, Path result, Path gold,
Class<? extends T> clazz, String fileDescription) throws IOException {
JsonObjectMapperParser<T> goldParser =
new JsonObjectMapperParser<T>(gold, clazz, conf);
InputStream resultStream = lfs.open(result);
JsonObjectMapperParser<T> resultParser =
new JsonObjectMapperParser<T>(resultStream, clazz);
try {
while (true) {
DeepCompare goldJob = goldParser.getNext();
DeepCompare resultJob = resultParser.getNext();
if ((goldJob == null) || (resultJob == null)) {
assertTrue(goldJob == resultJob);
break;
}
try {
resultJob.deepCompare(goldJob, new TreePath(null, "<root>"));
} catch (DeepInequalityException e) {
String error = e.path.toString();
assertFalse(fileDescription + " mismatches: " + error, true);
}
}
} finally {
IOUtils.cleanup(null, goldParser, resultParser);
}
}
}

View File

@ -1,338 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.rumen;
import java.util.List;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.TaskStatus.State;
import org.apache.hadoop.mapreduce.TaskType;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.*;
public class TestZombieJob {
final double epsilon = 0.01;
private final int[] attemptTimesPercentiles = new int[] { 10, 50, 90 };
private long[] succeededCDF = new long[] { 5268, 5268, 5268, 5268, 5268 };
private long[] failedCDF = new long[] { 18592, 18592, 18592, 18592, 18592 };
private double[] expectedPs = new double[] { 0.000001, 0.18707660239708182,
0.0013027618551328818, 2.605523710265763E-4 };
private final long[] mapTaskCounts = new long[] { 7838525L, 342277L, 100228L,
1564L, 1234L };
private final long[] reduceTaskCounts = new long[] { 4405338L, 139391L,
1514383L, 139391, 1234L };
List<LoggedJob> loggedJobs = new ArrayList<LoggedJob>();
List<JobStory> jobStories = new ArrayList<JobStory>();
@Before
public void setUp() throws Exception {
final Configuration conf = new Configuration();
final FileSystem lfs = FileSystem.getLocal(conf);
final Path rootInputDir = new Path(
System.getProperty("test.tools.input.dir", "")).makeQualified(lfs);
final Path rootInputFile = new Path(rootInputDir, "rumen/zombie");
ZombieJobProducer parser = new ZombieJobProducer(new Path(rootInputFile,
"input-trace.json"), new ZombieCluster(new Path(rootInputFile,
"input-topology.json"), null, conf), conf);
JobStory job = null;
for (int i = 0; i < 4; i++) {
job = parser.getNextJob();
ZombieJob zJob = (ZombieJob) job;
LoggedJob loggedJob = zJob.getLoggedJob();
System.out.println(i + ":" + job.getNumberMaps() + "m, "
+ job.getNumberReduces() + "r");
System.out
.println(loggedJob.getOutcome() + ", " + loggedJob.getJobtype());
System.out.println("Input Splits -- " + job.getInputSplits().length
+ ", " + job.getNumberMaps());
System.out.println("Successful Map CDF -------");
for (LoggedDiscreteCDF cdf : loggedJob.getSuccessfulMapAttemptCDFs()) {
System.out.println(cdf.getNumberValues() + ": " + cdf.getMinimum()
+ "--" + cdf.getMaximum());
for (LoggedSingleRelativeRanking ranking : cdf.getRankings()) {
System.out.println(" " + ranking.getRelativeRanking() + ":"
+ ranking.getDatum());
}
}
System.out.println("Failed Map CDF -----------");
for (LoggedDiscreteCDF cdf : loggedJob.getFailedMapAttemptCDFs()) {
System.out.println(cdf.getNumberValues() + ": " + cdf.getMinimum()
+ "--" + cdf.getMaximum());
for (LoggedSingleRelativeRanking ranking : cdf.getRankings()) {
System.out.println(" " + ranking.getRelativeRanking() + ":"
+ ranking.getDatum());
}
}
System.out.println("Successful Reduce CDF ----");
LoggedDiscreteCDF cdf = loggedJob.getSuccessfulReduceAttemptCDF();
System.out.println(cdf.getNumberValues() + ": " + cdf.getMinimum() + "--"
+ cdf.getMaximum());
for (LoggedSingleRelativeRanking ranking : cdf.getRankings()) {
System.out.println(" " + ranking.getRelativeRanking() + ":"
+ ranking.getDatum());
}
System.out.println("Failed Reduce CDF --------");
cdf = loggedJob.getFailedReduceAttemptCDF();
System.out.println(cdf.getNumberValues() + ": " + cdf.getMinimum() + "--"
+ cdf.getMaximum());
for (LoggedSingleRelativeRanking ranking : cdf.getRankings()) {
System.out.println(" " + ranking.getRelativeRanking() + ":"
+ ranking.getDatum());
}
System.out.print("map attempts to success -- ");
for (double p : loggedJob.getMapperTriesToSucceed()) {
System.out.print(p + ", ");
}
System.out.println();
System.out.println("===============");
loggedJobs.add(loggedJob);
jobStories.add(job);
}
}
@Test
public void testFirstJob() {
// 20th job seems reasonable: "totalMaps":329,"totalReduces":101
// successful map: 80 node-local, 196 rack-local, 53 rack-remote, 2 unknown
// failed map: 0-0-0-1
// successful reduce: 99 failed reduce: 13
// map attempts to success -- 0.9969879518072289, 0.0030120481927710845,
JobStory job = jobStories.get(0);
assertEquals(1, job.getNumberMaps());
assertEquals(1, job.getNumberReduces());
// get splits
TaskAttemptInfo taInfo = null;
long expectedRuntime = 2423;
// get a succeeded map task attempt, expect the exact same task attempt
taInfo = job.getMapTaskAttemptInfoAdjusted(14, 0, 1);
assertEquals(expectedRuntime, taInfo.getRuntime());
assertEquals(State.SUCCEEDED, taInfo.getRunState());
// get a succeeded map attempt, but reschedule with different locality.
taInfo = job.getMapTaskAttemptInfoAdjusted(14, 0, 2);
assertEquals(State.SUCCEEDED, taInfo.getRunState());
taInfo = job.getMapTaskAttemptInfoAdjusted(14, 0, 0);
assertEquals(State.SUCCEEDED, taInfo.getRunState());
expectedRuntime = 97502;
// get a succeeded reduce task attempt, expect the exact same task attempt
taInfo = job.getTaskAttemptInfo(TaskType.REDUCE, 14, 0);
assertEquals(State.SUCCEEDED, taInfo.getRunState());
// get a failed reduce task attempt, expect the exact same task attempt
taInfo = job.getTaskAttemptInfo(TaskType.REDUCE, 14, 0);
assertEquals(State.SUCCEEDED, taInfo.getRunState());
// get a non-exist reduce task attempt, expect a made-up task attempt
// TODO fill in test case
}
@Test
public void testSecondJob() {
// 7th job has many failed tasks.
// 3204 m, 0 r
// successful maps 497-586-23-1, failed maps 0-0-0-2714
// map attempts to success -- 0.8113600833767587, 0.18707660239708182,
// 0.0013027618551328818, 2.605523710265763E-4,
JobStory job = jobStories.get(1);
assertEquals(20, job.getNumberMaps());
assertEquals(1, job.getNumberReduces());
TaskAttemptInfo taInfo = null;
// get a succeeded map task attempt
taInfo = job.getMapTaskAttemptInfoAdjusted(17, 1, 1);
assertEquals(State.SUCCEEDED, taInfo.getRunState());
// get a succeeded map task attempt, with different locality
taInfo = job.getMapTaskAttemptInfoAdjusted(17, 1, 2);
assertEquals(State.SUCCEEDED, taInfo.getRunState());
taInfo = job.getMapTaskAttemptInfoAdjusted(17, 1, 0);
assertEquals(State.SUCCEEDED, taInfo.getRunState());
// get a failed map task attempt
taInfo = job.getMapTaskAttemptInfoAdjusted(14, 0, 1);
assertEquals(1927, taInfo.getRuntime());
assertEquals(State.SUCCEEDED, taInfo.getRunState());
// get a failed map task attempt, with different locality
// TODO: this test does not make sense here, because I don't have
// available data set.
}
@Test
public void testFourthJob() {
// 7th job has many failed tasks.
// 3204 m, 0 r
// successful maps 497-586-23-1, failed maps 0-0-0-2714
// map attempts to success -- 0.8113600833767587, 0.18707660239708182,
// 0.0013027618551328818, 2.605523710265763E-4,
JobStory job = jobStories.get(3);
assertEquals(131, job.getNumberMaps());
assertEquals(47, job.getNumberReduces());
TaskAttemptInfo taInfo = null;
// get a succeeded map task attempt
long runtime = 5268;
taInfo = job.getMapTaskAttemptInfoAdjusted(113, 1, 1);
assertEquals(State.SUCCEEDED, taInfo.getRunState());
assertEquals(runtime, taInfo.getRuntime());
// get a succeeded map task attempt, with different locality
taInfo = job.getMapTaskAttemptInfoAdjusted(113, 1, 2);
assertEquals(State.SUCCEEDED, taInfo.getRunState());
assertEquals(runtime, taInfo.getRuntime() / 2);
taInfo = job.getMapTaskAttemptInfoAdjusted(113, 1, 0);
assertEquals(State.SUCCEEDED, taInfo.getRunState());
assertEquals((long) (runtime / 1.5), taInfo.getRuntime());
// get a failed map task attempt
taInfo = job.getMapTaskAttemptInfoAdjusted(113, 0, 1);
assertEquals(18592, taInfo.getRuntime());
assertEquals(State.FAILED, taInfo.getRunState());
}
@Test
public void testRecordIOInfo() {
JobStory job = jobStories.get(3);
TaskInfo mapTask = job.getTaskInfo(TaskType.MAP, 113);
TaskInfo reduceTask = job.getTaskInfo(TaskType.REDUCE, 0);
assertEquals(mapTaskCounts[0], mapTask.getInputBytes());
assertEquals(mapTaskCounts[1], mapTask.getInputRecords());
assertEquals(mapTaskCounts[2], mapTask.getOutputBytes());
assertEquals(mapTaskCounts[3], mapTask.getOutputRecords());
assertEquals(mapTaskCounts[4], mapTask.getTaskMemory());
assertEquals(reduceTaskCounts[0], reduceTask.getInputBytes());
assertEquals(reduceTaskCounts[1], reduceTask.getInputRecords());
assertEquals(reduceTaskCounts[2], reduceTask.getOutputBytes());
assertEquals(reduceTaskCounts[3], reduceTask.getOutputRecords());
assertEquals(reduceTaskCounts[4], reduceTask.getTaskMemory());
}
@Test
public void testMakeUpInfo() {
// get many non-exist tasks
// total 3204 map tasks, 3300 is a non-exist task.
checkMakeUpTask(jobStories.get(3), 113, 1);
}
private void checkMakeUpTask(JobStory job, int taskNumber, int locality) {
TaskAttemptInfo taInfo = null;
Histogram sampleSucceeded = new Histogram();
Histogram sampleFailed = new Histogram();
List<Integer> sampleAttempts = new ArrayList<Integer>();
for (int i = 0; i < 100000; i++) {
int attemptId = 0;
while (true) {
taInfo = job.getMapTaskAttemptInfoAdjusted(taskNumber, attemptId, 1);
if (taInfo.getRunState() == State.SUCCEEDED) {
sampleSucceeded.enter(taInfo.getRuntime());
break;
}
sampleFailed.enter(taInfo.getRuntime());
attemptId++;
}
sampleAttempts.add(attemptId);
}
// check state distribution
int[] countTries = new int[] { 0, 0, 0, 0 };
for (int attempts : sampleAttempts) {
assertTrue(attempts < 4);
countTries[attempts]++;
}
/*
* System.out.print("Generated map attempts to success -- "); for (int
* count: countTries) { System.out.print((double)count/sampleAttempts.size()
* + ", "); } System.out.println(); System.out.println("===============");
*/
for (int i = 0; i < 4; i++) {
int count = countTries[i];
double p = (double) count / sampleAttempts.size();
assertTrue(expectedPs[i] - p < epsilon);
}
// check succeeded attempts runtime distribution
long[] expectedCDF = succeededCDF;
LoggedDiscreteCDF cdf = new LoggedDiscreteCDF();
cdf.setCDF(sampleSucceeded, attemptTimesPercentiles, 100);
/*
* System.out.println("generated succeeded map runtime distribution");
* System.out.println(cdf.getNumberValues() + ": " + cdf.getMinimum() + "--"
* + cdf.getMaximum()); for (LoggedSingleRelativeRanking ranking:
* cdf.getRankings()) { System.out.println(" " +
* ranking.getRelativeRanking() + ":" + ranking.getDatum()); }
*/
assertRuntimeEqual(cdf.getMinimum(), expectedCDF[0]);
assertRuntimeEqual(cdf.getMaximum(), expectedCDF[4]);
for (int i = 0; i < 3; i++) {
LoggedSingleRelativeRanking ranking = cdf.getRankings().get(i);
assertRuntimeEqual(expectedCDF[i + 1], ranking.getDatum());
}
// check failed attempts runtime distribution
expectedCDF = failedCDF;
cdf = new LoggedDiscreteCDF();
cdf.setCDF(sampleFailed, attemptTimesPercentiles, 100);
System.out.println("generated failed map runtime distribution");
System.out.println(cdf.getNumberValues() + ": " + cdf.getMinimum() + "--"
+ cdf.getMaximum());
for (LoggedSingleRelativeRanking ranking : cdf.getRankings()) {
System.out.println(" " + ranking.getRelativeRanking() + ":"
+ ranking.getDatum());
}
assertRuntimeEqual(cdf.getMinimum(), expectedCDF[0]);
assertRuntimeEqual(cdf.getMaximum(), expectedCDF[4]);
for (int i = 0; i < 3; i++) {
LoggedSingleRelativeRanking ranking = cdf.getRankings().get(i);
assertRuntimeEqual(expectedCDF[i + 1], ranking.getDatum());
}
}
private void assertRuntimeEqual(long expected, long generated) {
if (expected == 0) {
assertTrue(generated > -1000 && generated < 1000);
} else {
long epsilon = Math.max(expected / 10, 5000);
assertTrue(expected - generated > -epsilon);
assertTrue(expected - generated < epsilon);
}
}
}