MAPREDUCE-3705. ant build fails on 0.23 branch. (Thomas Graves via mahadev)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1234227 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8a1719e999
commit
355ba01374
|
@ -526,6 +526,9 @@ Release 0.23.1 - Unreleased
|
|||
MAPREDUCE-3549. write api documentation for web service apis for RM, NM,
|
||||
mapreduce app master, and job history server (Thomas Graves via mahadev)
|
||||
|
||||
MAPREDUCE-3705. ant build fails on 0.23 branch. (Thomas Graves via
|
||||
mahadev)
|
||||
|
||||
Release 0.23.0 - 2011-11-01
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -575,8 +575,6 @@
|
|||
<copy file="${test.src.dir}/mapred/org/apache/hadoop/mapred/test.tar" todir="${test.cache.data}"/>
|
||||
<copy file="${test.src.dir}/mapred/org/apache/hadoop/mapred/test.tgz" todir="${test.cache.data}"/>
|
||||
<copy file="${test.src.dir}/mapred/org/apache/hadoop/mapred/test.tar.gz" todir="${test.cache.data}"/>
|
||||
<copy file="${test.src.dir}/mapred/org/apache/hadoop/cli/testMRConf.xml" todir="${test.cache.data}"/>
|
||||
<copy file="${test.src.dir}/mapred/org/apache/hadoop/cli/data60bytes" todir="${test.cache.data}"/>
|
||||
<copy file="${test.src.dir}/mapred/org/apache/hadoop/mapred/concat.bz2" todir="${test.concat.data}"/>
|
||||
<copy file="${test.src.dir}/mapred/org/apache/hadoop/mapred/concat.gz" todir="${test.concat.data}"/>
|
||||
<copy file="${test.src.dir}/mapred/org/apache/hadoop/mapred/testCompressThenConcat.txt.bz2" todir="${test.concat.data}"/>
|
||||
|
|
|
@ -99,6 +99,8 @@
|
|||
rev="${yarn.version}" conf="compile->default">
|
||||
<artifact name="hadoop-mapreduce-client-jobclient" type="tests" ext="jar" m:classifier="tests"/>
|
||||
</dependency>
|
||||
<dependency org="org.apache.hadoop" name="hadoop-rumen"
|
||||
rev="${hadoop-common.version}" conf="compile->default"/>
|
||||
<dependency org="org.apache.hadoop" name="hadoop-archives"
|
||||
rev="${hadoop-common.version}" conf="compile->default"/>
|
||||
|
||||
|
|
|
@ -70,6 +70,8 @@
|
|||
</dependency>
|
||||
<dependency org="org.apache.hadoop" name="hadoop-archives"
|
||||
rev="${hadoop-common.version}" conf="common->default"/>
|
||||
<dependency org="org.apache.hadoop" name="hadoop-rumen"
|
||||
rev="${hadoop-common.version}" conf="common->default"/>
|
||||
<dependency org="commons-logging"
|
||||
name="commons-logging"
|
||||
rev="${commons-logging.version}"
|
||||
|
|
|
@ -1,136 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.tools.rumen;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class TestConcurrentRead {
|
||||
static final List<LoggedJob> cachedTrace = new ArrayList<LoggedJob>();
|
||||
static final String traceFile =
|
||||
"rumen/small-trace-test/job-tracker-logs-trace-output.gz";
|
||||
|
||||
static Configuration conf;
|
||||
static FileSystem lfs;
|
||||
static Path path;
|
||||
|
||||
@BeforeClass
|
||||
static public void globalSetUp() throws IOException {
|
||||
conf = new Configuration();
|
||||
lfs = FileSystem.getLocal(conf);
|
||||
Path rootInputDir = new Path(System.getProperty("test.tools.input.dir", ""))
|
||||
.makeQualified(lfs.getUri(), lfs.getWorkingDirectory());
|
||||
path = new Path(rootInputDir, traceFile);
|
||||
JobTraceReader reader = new JobTraceReader(path, conf);
|
||||
try {
|
||||
LoggedJob job;
|
||||
while ((job = reader.getNext()) != null) {
|
||||
cachedTrace.add(job);
|
||||
}
|
||||
} finally {
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
|
||||
void readAndCompare() throws IOException {
|
||||
JobTraceReader reader = new JobTraceReader(path, conf);
|
||||
try {
|
||||
for (Iterator<LoggedJob> it = cachedTrace.iterator(); it.hasNext();) {
|
||||
LoggedJob jobExpected = it.next();
|
||||
LoggedJob jobRead = reader.getNext();
|
||||
assertNotNull(jobRead);
|
||||
try {
|
||||
jobRead.deepCompare(jobExpected, null);
|
||||
} catch (DeepInequalityException e) {
|
||||
fail(e.toString());
|
||||
}
|
||||
}
|
||||
assertNull(reader.getNext());
|
||||
} finally {
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
|
||||
class TestThread extends Thread {
|
||||
final int repeat;
|
||||
final CountDownLatch startSignal, doneSignal;
|
||||
final Map<String, Throwable> errors;
|
||||
|
||||
TestThread(int id, int repeat, CountDownLatch startSignal, CountDownLatch doneSignal, Map<String, Throwable> errors) {
|
||||
super(String.format("TestThread-%d", id));
|
||||
this.repeat = repeat;
|
||||
this.startSignal = startSignal;
|
||||
this.doneSignal = doneSignal;
|
||||
this.errors = errors;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
startSignal.await();
|
||||
for (int i = 0; i < repeat; ++i) {
|
||||
try {
|
||||
readAndCompare();
|
||||
} catch (Throwable e) {
|
||||
errors.put(getName(), e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
doneSignal.countDown();
|
||||
} catch (Throwable e) {
|
||||
errors.put(getName(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConcurrentRead() throws InterruptedException {
|
||||
int nThr = conf.getInt("test.rumen.concurrent-read.threads", 4);
|
||||
int repeat = conf.getInt("test.rumen.concurrent-read.repeat", 10);
|
||||
CountDownLatch startSignal = new CountDownLatch(1);
|
||||
CountDownLatch doneSignal = new CountDownLatch(nThr);
|
||||
Map<String, Throwable> errors = Collections
|
||||
.synchronizedMap(new TreeMap<String, Throwable>());
|
||||
for (int i = 0; i < nThr; ++i) {
|
||||
new TestThread(i, repeat, startSignal, doneSignal, errors).start();
|
||||
}
|
||||
startSignal.countDown();
|
||||
doneSignal.await();
|
||||
if (!errors.isEmpty()) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for (Map.Entry<String, Throwable> e : errors.entrySet()) {
|
||||
sb.append(String.format("%s:\n%s\n", e.getKey(), e.getValue().toString()));
|
||||
}
|
||||
fail(sb.toString());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,105 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.tools.rumen;
|
||||
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class TestParsedLine {
|
||||
static final char[] CHARS_TO_ESCAPE = new char[]{'=', '"', '.'};
|
||||
|
||||
String buildLine(String type, String[] kvseq) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(type);
|
||||
for (int i=0; i<kvseq.length; ++i) {
|
||||
sb.append(" ");
|
||||
if (kvseq[i].equals(".") || kvseq[i].equals("\n")) {
|
||||
sb.append(kvseq[i]);
|
||||
continue;
|
||||
}
|
||||
if (i == kvseq.length-1) {
|
||||
fail("Incorrect input, expecting value.");
|
||||
}
|
||||
sb.append(kvseq[i++]);
|
||||
sb.append("=\"");
|
||||
sb.append(StringUtils.escapeString(kvseq[i], StringUtils.ESCAPE_CHAR,
|
||||
CHARS_TO_ESCAPE));
|
||||
sb.append("\"");
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
void testOneLine(String type, String... kvseq) {
|
||||
String line = buildLine(type, kvseq);
|
||||
ParsedLine pl = new ParsedLine(line, Hadoop20JHParser.internalVersion);
|
||||
assertEquals("Mismatching type", type, pl.getType().toString());
|
||||
for (int i = 0; i < kvseq.length; ++i) {
|
||||
if (kvseq[i].equals(".") || kvseq[i].equals("\n")) {
|
||||
continue;
|
||||
}
|
||||
|
||||
assertEquals("Key mismatching for " + kvseq[i], kvseq[i + 1], StringUtils
|
||||
.unEscapeString(pl.get(kvseq[i]), StringUtils.ESCAPE_CHAR,
|
||||
CHARS_TO_ESCAPE));
|
||||
++i;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEscapedQuote() {
|
||||
testOneLine("REC", "A", "x", "B", "abc\"de", "C", "f");
|
||||
testOneLine("REC", "B", "abcde\"", "C", "f");
|
||||
testOneLine("REC", "A", "x", "B", "\"abcde");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEqualSign() {
|
||||
testOneLine("REC1", "A", "x", "B", "abc=de", "C", "f");
|
||||
testOneLine("REC2", "B", "=abcde", "C", "f");
|
||||
testOneLine("REC3", "A", "x", "B", "abcde=");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSpace() {
|
||||
testOneLine("REC1", "A", "x", "B", "abc de", "C", "f");
|
||||
testOneLine("REC2", "B", " ab c de", "C", "f");
|
||||
testOneLine("REC3", "A", "x", "B", "abc\t de ");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBackSlash() {
|
||||
testOneLine("REC1", "A", "x", "B", "abc\\de", "C", "f");
|
||||
testOneLine("REC2", "B", "\\ab\\c\\de", "C", "f");
|
||||
testOneLine("REC3", "A", "x", "B", "abc\\\\de\\");
|
||||
testOneLine("REC4", "A", "x", "B", "abc\\\"de\\\"", "C", "f");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLineDelimiter() {
|
||||
testOneLine("REC1", "A", "x", "B", "abc.de", "C", "f");
|
||||
testOneLine("REC2", "B", ".ab.de");
|
||||
testOneLine("REC3", "A", "x", "B", "abc.de.");
|
||||
testOneLine("REC4", "A", "x", "B", "abc.de", ".");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleLines() {
|
||||
testOneLine("REC1", "A", "x", "\n", "B", "abc.de", "\n", "C", "f", "\n", ".");
|
||||
}
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -1,196 +0,0 @@
|
|||
package org.apache.hadoop.tools.rumen;
|
||||
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class TestRumenFolder {
|
||||
@Test
|
||||
public void testFoldingSmallTrace() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
final FileSystem lfs = FileSystem.getLocal(conf);
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
final Path rootInputDir =
|
||||
new Path(System.getProperty("test.tools.input.dir", ""))
|
||||
.makeQualified(lfs);
|
||||
@SuppressWarnings("deprecation")
|
||||
final Path rootTempDir =
|
||||
new Path(System.getProperty("test.build.data", "/tmp"))
|
||||
.makeQualified(lfs);
|
||||
|
||||
final Path rootInputFile = new Path(rootInputDir, "rumen/small-trace-test");
|
||||
final Path tempDir = new Path(rootTempDir, "TestRumenJobTraces");
|
||||
lfs.delete(tempDir, true);
|
||||
|
||||
final Path foldedTracePath = new Path(tempDir, "folded-trace.json");
|
||||
|
||||
final Path inputFile =
|
||||
new Path(rootInputFile, "folder-input-trace.json.gz");
|
||||
|
||||
System.out.println("folded trace result path = " + foldedTracePath);
|
||||
|
||||
String[] args =
|
||||
{ "-input-cycle", "100S", "-output-duration", "300S",
|
||||
"-skew-buffer-length", "1", "-seed", "100", "-concentration", "2",
|
||||
inputFile.toString(), foldedTracePath.toString() };
|
||||
|
||||
final Path foldedGoldFile =
|
||||
new Path(rootInputFile, "goldFoldedTrace.json.gz");
|
||||
|
||||
Folder folder = new Folder();
|
||||
int result = ToolRunner.run(folder, args);
|
||||
assertEquals("Non-zero exit", 0, result);
|
||||
|
||||
TestRumenFolder.<LoggedJob> jsonFileMatchesGold(conf, lfs, foldedTracePath,
|
||||
foldedGoldFile, LoggedJob.class, "trace");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStartsAfterOption() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
final FileSystem lfs = FileSystem.getLocal(conf);
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
final Path rootInputDir =
|
||||
new Path(System.getProperty("test.tools.input.dir", ""))
|
||||
.makeQualified(lfs);
|
||||
@SuppressWarnings("deprecation")
|
||||
final Path rootTempDir =
|
||||
new Path(System.getProperty("test.build.data", "/tmp"))
|
||||
.makeQualified(lfs);
|
||||
|
||||
final Path rootInputFile = new Path(rootInputDir, "rumen/small-trace-test");
|
||||
final Path tempDir = new Path(rootTempDir, "TestRumenJobTraces");
|
||||
lfs.delete(tempDir, true);
|
||||
|
||||
final Path inputFile =
|
||||
new Path(rootInputFile, "goldFoldedTrace.json.gz");
|
||||
|
||||
final Path foldedTracePath = new Path(tempDir,
|
||||
"folded-skippedjob-trace.json");
|
||||
String[] args =
|
||||
{ "-input-cycle", "300S", "-output-duration", "300S",
|
||||
"-starts-after", "30S",
|
||||
inputFile.toString(), foldedTracePath.toString() };
|
||||
|
||||
Folder folder = new Folder();
|
||||
int result = ToolRunner.run(folder, args);
|
||||
assertEquals("Non-zero exit", 0, result);
|
||||
|
||||
TestRumenFolder.<LoggedJob> checkValidityAfterSkippingJobs(conf, lfs, foldedTracePath,
|
||||
inputFile, LoggedJob.class, "trace", 30000, 300000);
|
||||
}
|
||||
|
||||
static private <T extends DeepCompare> void
|
||||
checkValidityAfterSkippingJobs(Configuration conf,
|
||||
FileSystem lfs, Path result, Path inputFile,
|
||||
Class<? extends T> clazz, String fileDescription,
|
||||
long startsAfter, long duration) throws IOException {
|
||||
|
||||
JsonObjectMapperParser<T> inputFileParser =
|
||||
new JsonObjectMapperParser<T>(inputFile, clazz, conf);
|
||||
InputStream resultStream = lfs.open(result);
|
||||
JsonObjectMapperParser<T> resultParser =
|
||||
new JsonObjectMapperParser<T>(resultStream, clazz);
|
||||
List<Long> gpSubmitTimes = new LinkedList<Long>();
|
||||
List<Long> rpSubmitTimes = new LinkedList<Long>();
|
||||
try {
|
||||
//Get submitTime of first job
|
||||
LoggedJob firstJob = (LoggedJob)inputFileParser.getNext();
|
||||
gpSubmitTimes.add(firstJob.getSubmitTime());
|
||||
long absoluteStartsAfterTime = firstJob.getSubmitTime() + startsAfter;
|
||||
|
||||
//total duration
|
||||
long endTime = firstJob.getSubmitTime() + duration;
|
||||
|
||||
//read original trace
|
||||
LoggedJob oriJob = null;
|
||||
while((oriJob = (LoggedJob)inputFileParser.getNext()) != null) {
|
||||
gpSubmitTimes.add(oriJob.getSubmitTime());
|
||||
}
|
||||
|
||||
//check if retained jobs have submittime > starts-after
|
||||
LoggedJob job = null;
|
||||
while((job = (LoggedJob) resultParser.getNext()) != null) {
|
||||
assertTrue("job's submit time in the output trace is less " +
|
||||
"than the specified value of starts-after",
|
||||
(job.getSubmitTime() >= absoluteStartsAfterTime));
|
||||
rpSubmitTimes.add(job.getSubmitTime());
|
||||
}
|
||||
|
||||
List<Long> skippedJobs = new LinkedList<Long>();
|
||||
skippedJobs.addAll(gpSubmitTimes);
|
||||
skippedJobs.removeAll(rpSubmitTimes);
|
||||
|
||||
//check if the skipped job submittime < starts-after
|
||||
for(Long submitTime : skippedJobs) {
|
||||
assertTrue("skipped job submit time " + submitTime +
|
||||
" in the trace is greater " +
|
||||
"than the specified value of starts-after "
|
||||
+ absoluteStartsAfterTime,
|
||||
(submitTime < absoluteStartsAfterTime));
|
||||
}
|
||||
} finally {
|
||||
IOUtils.cleanup(null, inputFileParser, resultParser);
|
||||
}
|
||||
}
|
||||
|
||||
static private <T extends DeepCompare> void jsonFileMatchesGold(
|
||||
Configuration conf, FileSystem lfs, Path result, Path gold,
|
||||
Class<? extends T> clazz, String fileDescription) throws IOException {
|
||||
JsonObjectMapperParser<T> goldParser =
|
||||
new JsonObjectMapperParser<T>(gold, clazz, conf);
|
||||
InputStream resultStream = lfs.open(result);
|
||||
JsonObjectMapperParser<T> resultParser =
|
||||
new JsonObjectMapperParser<T>(resultStream, clazz);
|
||||
try {
|
||||
while (true) {
|
||||
DeepCompare goldJob = goldParser.getNext();
|
||||
DeepCompare resultJob = resultParser.getNext();
|
||||
if ((goldJob == null) || (resultJob == null)) {
|
||||
assertTrue(goldJob == resultJob);
|
||||
break;
|
||||
}
|
||||
|
||||
try {
|
||||
resultJob.deepCompare(goldJob, new TreePath(null, "<root>"));
|
||||
} catch (DeepInequalityException e) {
|
||||
String error = e.path.toString();
|
||||
|
||||
assertFalse(fileDescription + " mismatches: " + error, true);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
IOUtils.cleanup(null, goldParser, resultParser);
|
||||
}
|
||||
}
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -1,338 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.tools.rumen;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapred.TaskStatus.State;
|
||||
import org.apache.hadoop.mapreduce.TaskType;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class TestZombieJob {
|
||||
final double epsilon = 0.01;
|
||||
private final int[] attemptTimesPercentiles = new int[] { 10, 50, 90 };
|
||||
private long[] succeededCDF = new long[] { 5268, 5268, 5268, 5268, 5268 };
|
||||
private long[] failedCDF = new long[] { 18592, 18592, 18592, 18592, 18592 };
|
||||
private double[] expectedPs = new double[] { 0.000001, 0.18707660239708182,
|
||||
0.0013027618551328818, 2.605523710265763E-4 };
|
||||
|
||||
private final long[] mapTaskCounts = new long[] { 7838525L, 342277L, 100228L,
|
||||
1564L, 1234L };
|
||||
private final long[] reduceTaskCounts = new long[] { 4405338L, 139391L,
|
||||
1514383L, 139391, 1234L };
|
||||
|
||||
List<LoggedJob> loggedJobs = new ArrayList<LoggedJob>();
|
||||
List<JobStory> jobStories = new ArrayList<JobStory>();
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
final FileSystem lfs = FileSystem.getLocal(conf);
|
||||
|
||||
final Path rootInputDir = new Path(
|
||||
System.getProperty("test.tools.input.dir", "")).makeQualified(lfs);
|
||||
final Path rootInputFile = new Path(rootInputDir, "rumen/zombie");
|
||||
|
||||
ZombieJobProducer parser = new ZombieJobProducer(new Path(rootInputFile,
|
||||
"input-trace.json"), new ZombieCluster(new Path(rootInputFile,
|
||||
"input-topology.json"), null, conf), conf);
|
||||
|
||||
JobStory job = null;
|
||||
for (int i = 0; i < 4; i++) {
|
||||
job = parser.getNextJob();
|
||||
ZombieJob zJob = (ZombieJob) job;
|
||||
LoggedJob loggedJob = zJob.getLoggedJob();
|
||||
System.out.println(i + ":" + job.getNumberMaps() + "m, "
|
||||
+ job.getNumberReduces() + "r");
|
||||
System.out
|
||||
.println(loggedJob.getOutcome() + ", " + loggedJob.getJobtype());
|
||||
|
||||
System.out.println("Input Splits -- " + job.getInputSplits().length
|
||||
+ ", " + job.getNumberMaps());
|
||||
|
||||
System.out.println("Successful Map CDF -------");
|
||||
for (LoggedDiscreteCDF cdf : loggedJob.getSuccessfulMapAttemptCDFs()) {
|
||||
System.out.println(cdf.getNumberValues() + ": " + cdf.getMinimum()
|
||||
+ "--" + cdf.getMaximum());
|
||||
for (LoggedSingleRelativeRanking ranking : cdf.getRankings()) {
|
||||
System.out.println(" " + ranking.getRelativeRanking() + ":"
|
||||
+ ranking.getDatum());
|
||||
}
|
||||
}
|
||||
System.out.println("Failed Map CDF -----------");
|
||||
for (LoggedDiscreteCDF cdf : loggedJob.getFailedMapAttemptCDFs()) {
|
||||
System.out.println(cdf.getNumberValues() + ": " + cdf.getMinimum()
|
||||
+ "--" + cdf.getMaximum());
|
||||
for (LoggedSingleRelativeRanking ranking : cdf.getRankings()) {
|
||||
System.out.println(" " + ranking.getRelativeRanking() + ":"
|
||||
+ ranking.getDatum());
|
||||
}
|
||||
}
|
||||
System.out.println("Successful Reduce CDF ----");
|
||||
LoggedDiscreteCDF cdf = loggedJob.getSuccessfulReduceAttemptCDF();
|
||||
System.out.println(cdf.getNumberValues() + ": " + cdf.getMinimum() + "--"
|
||||
+ cdf.getMaximum());
|
||||
for (LoggedSingleRelativeRanking ranking : cdf.getRankings()) {
|
||||
System.out.println(" " + ranking.getRelativeRanking() + ":"
|
||||
+ ranking.getDatum());
|
||||
}
|
||||
System.out.println("Failed Reduce CDF --------");
|
||||
cdf = loggedJob.getFailedReduceAttemptCDF();
|
||||
System.out.println(cdf.getNumberValues() + ": " + cdf.getMinimum() + "--"
|
||||
+ cdf.getMaximum());
|
||||
for (LoggedSingleRelativeRanking ranking : cdf.getRankings()) {
|
||||
System.out.println(" " + ranking.getRelativeRanking() + ":"
|
||||
+ ranking.getDatum());
|
||||
}
|
||||
System.out.print("map attempts to success -- ");
|
||||
for (double p : loggedJob.getMapperTriesToSucceed()) {
|
||||
System.out.print(p + ", ");
|
||||
}
|
||||
System.out.println();
|
||||
System.out.println("===============");
|
||||
|
||||
loggedJobs.add(loggedJob);
|
||||
jobStories.add(job);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFirstJob() {
|
||||
// 20th job seems reasonable: "totalMaps":329,"totalReduces":101
|
||||
// successful map: 80 node-local, 196 rack-local, 53 rack-remote, 2 unknown
|
||||
// failed map: 0-0-0-1
|
||||
// successful reduce: 99 failed reduce: 13
|
||||
// map attempts to success -- 0.9969879518072289, 0.0030120481927710845,
|
||||
JobStory job = jobStories.get(0);
|
||||
assertEquals(1, job.getNumberMaps());
|
||||
assertEquals(1, job.getNumberReduces());
|
||||
|
||||
// get splits
|
||||
|
||||
TaskAttemptInfo taInfo = null;
|
||||
long expectedRuntime = 2423;
|
||||
// get a succeeded map task attempt, expect the exact same task attempt
|
||||
taInfo = job.getMapTaskAttemptInfoAdjusted(14, 0, 1);
|
||||
assertEquals(expectedRuntime, taInfo.getRuntime());
|
||||
assertEquals(State.SUCCEEDED, taInfo.getRunState());
|
||||
|
||||
// get a succeeded map attempt, but reschedule with different locality.
|
||||
taInfo = job.getMapTaskAttemptInfoAdjusted(14, 0, 2);
|
||||
assertEquals(State.SUCCEEDED, taInfo.getRunState());
|
||||
taInfo = job.getMapTaskAttemptInfoAdjusted(14, 0, 0);
|
||||
assertEquals(State.SUCCEEDED, taInfo.getRunState());
|
||||
|
||||
expectedRuntime = 97502;
|
||||
// get a succeeded reduce task attempt, expect the exact same task attempt
|
||||
taInfo = job.getTaskAttemptInfo(TaskType.REDUCE, 14, 0);
|
||||
assertEquals(State.SUCCEEDED, taInfo.getRunState());
|
||||
|
||||
// get a failed reduce task attempt, expect the exact same task attempt
|
||||
taInfo = job.getTaskAttemptInfo(TaskType.REDUCE, 14, 0);
|
||||
assertEquals(State.SUCCEEDED, taInfo.getRunState());
|
||||
|
||||
// get a non-exist reduce task attempt, expect a made-up task attempt
|
||||
// TODO fill in test case
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSecondJob() {
|
||||
// 7th job has many failed tasks.
|
||||
// 3204 m, 0 r
|
||||
// successful maps 497-586-23-1, failed maps 0-0-0-2714
|
||||
// map attempts to success -- 0.8113600833767587, 0.18707660239708182,
|
||||
// 0.0013027618551328818, 2.605523710265763E-4,
|
||||
JobStory job = jobStories.get(1);
|
||||
assertEquals(20, job.getNumberMaps());
|
||||
assertEquals(1, job.getNumberReduces());
|
||||
|
||||
TaskAttemptInfo taInfo = null;
|
||||
// get a succeeded map task attempt
|
||||
taInfo = job.getMapTaskAttemptInfoAdjusted(17, 1, 1);
|
||||
assertEquals(State.SUCCEEDED, taInfo.getRunState());
|
||||
|
||||
// get a succeeded map task attempt, with different locality
|
||||
taInfo = job.getMapTaskAttemptInfoAdjusted(17, 1, 2);
|
||||
assertEquals(State.SUCCEEDED, taInfo.getRunState());
|
||||
taInfo = job.getMapTaskAttemptInfoAdjusted(17, 1, 0);
|
||||
assertEquals(State.SUCCEEDED, taInfo.getRunState());
|
||||
|
||||
// get a failed map task attempt
|
||||
taInfo = job.getMapTaskAttemptInfoAdjusted(14, 0, 1);
|
||||
assertEquals(1927, taInfo.getRuntime());
|
||||
assertEquals(State.SUCCEEDED, taInfo.getRunState());
|
||||
|
||||
// get a failed map task attempt, with different locality
|
||||
// TODO: this test does not make sense here, because I don't have
|
||||
// available data set.
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFourthJob() {
|
||||
// 7th job has many failed tasks.
|
||||
// 3204 m, 0 r
|
||||
// successful maps 497-586-23-1, failed maps 0-0-0-2714
|
||||
// map attempts to success -- 0.8113600833767587, 0.18707660239708182,
|
||||
// 0.0013027618551328818, 2.605523710265763E-4,
|
||||
JobStory job = jobStories.get(3);
|
||||
assertEquals(131, job.getNumberMaps());
|
||||
assertEquals(47, job.getNumberReduces());
|
||||
|
||||
TaskAttemptInfo taInfo = null;
|
||||
// get a succeeded map task attempt
|
||||
long runtime = 5268;
|
||||
taInfo = job.getMapTaskAttemptInfoAdjusted(113, 1, 1);
|
||||
assertEquals(State.SUCCEEDED, taInfo.getRunState());
|
||||
assertEquals(runtime, taInfo.getRuntime());
|
||||
|
||||
// get a succeeded map task attempt, with different locality
|
||||
taInfo = job.getMapTaskAttemptInfoAdjusted(113, 1, 2);
|
||||
assertEquals(State.SUCCEEDED, taInfo.getRunState());
|
||||
assertEquals(runtime, taInfo.getRuntime() / 2);
|
||||
taInfo = job.getMapTaskAttemptInfoAdjusted(113, 1, 0);
|
||||
assertEquals(State.SUCCEEDED, taInfo.getRunState());
|
||||
assertEquals((long) (runtime / 1.5), taInfo.getRuntime());
|
||||
|
||||
// get a failed map task attempt
|
||||
taInfo = job.getMapTaskAttemptInfoAdjusted(113, 0, 1);
|
||||
assertEquals(18592, taInfo.getRuntime());
|
||||
assertEquals(State.FAILED, taInfo.getRunState());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRecordIOInfo() {
|
||||
JobStory job = jobStories.get(3);
|
||||
|
||||
TaskInfo mapTask = job.getTaskInfo(TaskType.MAP, 113);
|
||||
|
||||
TaskInfo reduceTask = job.getTaskInfo(TaskType.REDUCE, 0);
|
||||
|
||||
assertEquals(mapTaskCounts[0], mapTask.getInputBytes());
|
||||
assertEquals(mapTaskCounts[1], mapTask.getInputRecords());
|
||||
assertEquals(mapTaskCounts[2], mapTask.getOutputBytes());
|
||||
assertEquals(mapTaskCounts[3], mapTask.getOutputRecords());
|
||||
assertEquals(mapTaskCounts[4], mapTask.getTaskMemory());
|
||||
|
||||
assertEquals(reduceTaskCounts[0], reduceTask.getInputBytes());
|
||||
assertEquals(reduceTaskCounts[1], reduceTask.getInputRecords());
|
||||
assertEquals(reduceTaskCounts[2], reduceTask.getOutputBytes());
|
||||
assertEquals(reduceTaskCounts[3], reduceTask.getOutputRecords());
|
||||
assertEquals(reduceTaskCounts[4], reduceTask.getTaskMemory());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMakeUpInfo() {
|
||||
// get many non-exist tasks
|
||||
// total 3204 map tasks, 3300 is a non-exist task.
|
||||
checkMakeUpTask(jobStories.get(3), 113, 1);
|
||||
}
|
||||
|
||||
private void checkMakeUpTask(JobStory job, int taskNumber, int locality) {
|
||||
TaskAttemptInfo taInfo = null;
|
||||
|
||||
Histogram sampleSucceeded = new Histogram();
|
||||
Histogram sampleFailed = new Histogram();
|
||||
List<Integer> sampleAttempts = new ArrayList<Integer>();
|
||||
for (int i = 0; i < 100000; i++) {
|
||||
int attemptId = 0;
|
||||
while (true) {
|
||||
taInfo = job.getMapTaskAttemptInfoAdjusted(taskNumber, attemptId, 1);
|
||||
if (taInfo.getRunState() == State.SUCCEEDED) {
|
||||
sampleSucceeded.enter(taInfo.getRuntime());
|
||||
break;
|
||||
}
|
||||
sampleFailed.enter(taInfo.getRuntime());
|
||||
attemptId++;
|
||||
}
|
||||
sampleAttempts.add(attemptId);
|
||||
}
|
||||
|
||||
// check state distribution
|
||||
int[] countTries = new int[] { 0, 0, 0, 0 };
|
||||
for (int attempts : sampleAttempts) {
|
||||
assertTrue(attempts < 4);
|
||||
countTries[attempts]++;
|
||||
}
|
||||
/*
|
||||
* System.out.print("Generated map attempts to success -- "); for (int
|
||||
* count: countTries) { System.out.print((double)count/sampleAttempts.size()
|
||||
* + ", "); } System.out.println(); System.out.println("===============");
|
||||
*/
|
||||
for (int i = 0; i < 4; i++) {
|
||||
int count = countTries[i];
|
||||
double p = (double) count / sampleAttempts.size();
|
||||
assertTrue(expectedPs[i] - p < epsilon);
|
||||
}
|
||||
|
||||
// check succeeded attempts runtime distribution
|
||||
long[] expectedCDF = succeededCDF;
|
||||
LoggedDiscreteCDF cdf = new LoggedDiscreteCDF();
|
||||
cdf.setCDF(sampleSucceeded, attemptTimesPercentiles, 100);
|
||||
/*
|
||||
* System.out.println("generated succeeded map runtime distribution");
|
||||
* System.out.println(cdf.getNumberValues() + ": " + cdf.getMinimum() + "--"
|
||||
* + cdf.getMaximum()); for (LoggedSingleRelativeRanking ranking:
|
||||
* cdf.getRankings()) { System.out.println(" " +
|
||||
* ranking.getRelativeRanking() + ":" + ranking.getDatum()); }
|
||||
*/
|
||||
assertRuntimeEqual(cdf.getMinimum(), expectedCDF[0]);
|
||||
assertRuntimeEqual(cdf.getMaximum(), expectedCDF[4]);
|
||||
for (int i = 0; i < 3; i++) {
|
||||
LoggedSingleRelativeRanking ranking = cdf.getRankings().get(i);
|
||||
assertRuntimeEqual(expectedCDF[i + 1], ranking.getDatum());
|
||||
}
|
||||
|
||||
// check failed attempts runtime distribution
|
||||
expectedCDF = failedCDF;
|
||||
cdf = new LoggedDiscreteCDF();
|
||||
cdf.setCDF(sampleFailed, attemptTimesPercentiles, 100);
|
||||
|
||||
System.out.println("generated failed map runtime distribution");
|
||||
System.out.println(cdf.getNumberValues() + ": " + cdf.getMinimum() + "--"
|
||||
+ cdf.getMaximum());
|
||||
for (LoggedSingleRelativeRanking ranking : cdf.getRankings()) {
|
||||
System.out.println(" " + ranking.getRelativeRanking() + ":"
|
||||
+ ranking.getDatum());
|
||||
}
|
||||
assertRuntimeEqual(cdf.getMinimum(), expectedCDF[0]);
|
||||
assertRuntimeEqual(cdf.getMaximum(), expectedCDF[4]);
|
||||
for (int i = 0; i < 3; i++) {
|
||||
LoggedSingleRelativeRanking ranking = cdf.getRankings().get(i);
|
||||
assertRuntimeEqual(expectedCDF[i + 1], ranking.getDatum());
|
||||
}
|
||||
}
|
||||
|
||||
private void assertRuntimeEqual(long expected, long generated) {
|
||||
if (expected == 0) {
|
||||
assertTrue(generated > -1000 && generated < 1000);
|
||||
} else {
|
||||
long epsilon = Math.max(expected / 10, 5000);
|
||||
assertTrue(expected - generated > -epsilon);
|
||||
assertTrue(expected - generated < epsilon);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue