diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 9ebe603027c..8b6bb0e5cd5 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -708,6 +708,8 @@ Release 0.23.7 - UNRELEASED MAPREDUCE-5053. java.lang.InternalError from decompression codec cause reducer to fail (Robert Parker via jeagles) + MAPREDUCE-4991. coverage for gridmix (Aleksey Gorshkov via tgraves) + Release 0.23.6 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-tools/hadoop-gridmix/pom.xml b/hadoop-tools/hadoop-gridmix/pom.xml index aea47e2b14f..db0b664c17f 100644 --- a/hadoop-tools/hadoop-gridmix/pom.xml +++ b/hadoop-tools/hadoop-gridmix/pom.xml @@ -91,6 +91,11 @@ test-jar test + + org.mockito + mockito-all + test + @@ -115,6 +120,15 @@ + + org.apache.rat + apache-rat-plugin + + + src/test/resources/data/* + + + org.apache.maven.plugins maven-jar-plugin diff --git a/hadoop-tools/hadoop-gridmix/src/main/java/org/apache/hadoop/mapred/gridmix/Gridmix.java b/hadoop-tools/hadoop-gridmix/src/main/java/org/apache/hadoop/mapred/gridmix/Gridmix.java index 4bdc001c550..dc52e997fd1 100644 --- a/hadoop-tools/hadoop-gridmix/src/main/java/org/apache/hadoop/mapred/gridmix/Gridmix.java +++ b/hadoop-tools/hadoop-gridmix/src/main/java/org/apache/hadoop/mapred/gridmix/Gridmix.java @@ -37,6 +37,7 @@ import org.apache.hadoop.mapred.gridmix.GenerateData.DataStatistics; import org.apache.hadoop.mapred.gridmix.Statistics.JobStats; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Tool; @@ -136,6 +137,7 @@ public class Gridmix extends Configured implements Tool { private DistributedCacheEmulator distCacheEmulator; // Submit data structures + @SuppressWarnings("rawtypes") private JobFactory factory; private JobSubmitter submitter; private JobMonitor monitor; @@ -176,6 +178,7 @@ public class Gridmix extends Configured implements Tool { * @see org.apache.hadoop.mapred.gridmix.GenerateData * @return exit status */ + @SuppressWarnings("deprecation") protected int writeInputData(long genbytes, Path inputDir) throws IOException, InterruptedException { if (genbytes > 0) { @@ -287,6 +290,7 @@ public class Gridmix extends Configured implements Tool { * @param scratchDir Path into which job output is written * @param startFlag Semaphore for starting job trace pipeline */ + @SuppressWarnings("unchecked") private void startThreads(Configuration conf, String traceIn, Path ioPath, Path scratchDir, CountDownLatch startFlag, UserResolver userResolver) throws IOException { @@ -345,6 +349,7 @@ public class Gridmix extends Configured implements Tool { return new JobSubmitter(monitor, threads, queueDepth, pool, statistics); } + @SuppressWarnings("rawtypes") protected JobFactory createJobFactory(JobSubmitter submitter, String traceIn, Path scratchDir, Configuration conf, CountDownLatch startFlag, UserResolver resolver) @@ -383,6 +388,7 @@ public class Gridmix extends Configured implements Tool { return val; } + @SuppressWarnings("deprecation") private int runJob(Configuration conf, String[] argv) throws IOException, InterruptedException { if (argv.length < 2) { @@ -693,7 +699,7 @@ public class Gridmix extends Configured implements Tool { try { res = ToolRunner.run(new Configuration(), new Gridmix(argv), argv); } finally { - System.exit(res); + ExitUtil.terminate(res); } } @@ -800,6 +806,10 @@ public class Gridmix extends Configured implements Tool { */ void abort(); } - + // it is need for tests + protected Summarizer getSummarizer() { + return summarizer; + } + } diff --git a/hadoop-tools/hadoop-gridmix/src/main/java/org/apache/hadoop/mapred/gridmix/SerialJobFactory.java b/hadoop-tools/hadoop-gridmix/src/main/java/org/apache/hadoop/mapred/gridmix/SerialJobFactory.java index 3301cbdf888..af554ff0b1f 100644 --- a/hadoop-tools/hadoop-gridmix/src/main/java/org/apache/hadoop/mapred/gridmix/SerialJobFactory.java +++ b/hadoop-tools/hadoop-gridmix/src/main/java/org/apache/hadoop/mapred/gridmix/SerialJobFactory.java @@ -175,4 +175,8 @@ public class SerialJobFactory extends JobFactory { LOG.info(" Starting Serial submission "); this.rThread.start(); } + // it is need for test + void setDistCacheEmulator(DistributedCacheEmulator e) { + jobCreator.setDistCacheEmulator(e); + } } diff --git a/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/CommonJobTest.java b/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/CommonJobTest.java new file mode 100644 index 00000000000..77d7f0743b1 --- /dev/null +++ b/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/CommonJobTest.java @@ -0,0 +1,384 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.gridmix; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.IOException; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.TaskReport; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.TaskCounter; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.tools.rumen.TaskInfo; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.tools.rumen.JobStory; +import org.apache.hadoop.util.ToolRunner; + +public class CommonJobTest { + public static final Log LOG = LogFactory.getLog(Gridmix.class); + + protected static int NJOBS = 2; + protected static final long GENDATA = 1; // in megabytes + protected static GridmixJobSubmissionPolicy policy = GridmixJobSubmissionPolicy.REPLAY; + private static File workspace = new File("target" + File.separator + + TestGridmixSubmission.class.getName() + "-test"); + + static class DebugGridmix extends Gridmix { + + private JobFactory factory; + private TestMonitor monitor; + + @Override + protected JobMonitor createJobMonitor(Statistics stats, Configuration conf) + throws IOException { + monitor = new TestMonitor(3, stats); + return monitor; + } + + @Override + protected JobFactory createJobFactory(JobSubmitter submitter, + String traceIn, Path scratchDir, Configuration conf, + CountDownLatch startFlag, UserResolver userResolver) throws IOException { + factory = DebugJobFactory.getFactory(submitter, scratchDir, NJOBS, conf, + startFlag, userResolver); + return factory; + } + + public void checkMonitor() throws Exception { + monitor.verify(((DebugJobFactory.Debuggable) factory).getSubmitted()); + } + } + + static class TestMonitor extends JobMonitor { + private final BlockingQueue retiredJobs; + private final int expected; + static final long SLOPBYTES = 1024; + + public TestMonitor(int expected, Statistics stats) { + super(3, TimeUnit.SECONDS, stats, 1); + this.expected = expected; + retiredJobs = new LinkedBlockingQueue(); + } + + @Override + protected void onSuccess(Job job) { + LOG.info(" Job Success " + job); + retiredJobs.add(job); + } + + @Override + protected void onFailure(Job job) { + fail("Job failure: " + job); + } + + public void verify(ArrayList submitted) throws Exception { + assertEquals("Bad job count", expected, retiredJobs.size()); + + final ArrayList succeeded = new ArrayList(); + assertEquals("Bad job count", expected, retiredJobs.drainTo(succeeded)); + final HashMap sub = new HashMap(); + for (JobStory spec : submitted) { + sub.put(spec.getJobID().toString(), spec); + } + for (Job job : succeeded) { + final String jobName = job.getJobName(); + Configuration configuration = job.getConfiguration(); + if (GenerateData.JOB_NAME.equals(jobName)) { + RemoteIterator rit = GridmixTestUtils.dfs + .listFiles(new Path("/"), true); + while (rit.hasNext()) { + System.out.println(rit.next().toString()); + } + final Path in = new Path("foo").makeQualified( + GridmixTestUtils.dfs.getUri(), + GridmixTestUtils.dfs.getWorkingDirectory()); + // data was compressed. All files = compressed test size+ logs= 1000000/2 + logs + final ContentSummary generated = GridmixTestUtils.dfs + .getContentSummary(in); + assertEquals(550000, generated.getLength(), 10000); + + Counter counter = job.getCounters() + .getGroup("org.apache.hadoop.mapreduce.FileSystemCounter") + .findCounter("HDFS_BYTES_WRITTEN"); + + assertEquals(generated.getLength(), counter.getValue()); + + continue; + } else if (GenerateDistCacheData.JOB_NAME.equals(jobName)) { + continue; + } + + final String originalJobId = configuration.get(Gridmix.ORIGINAL_JOB_ID); + final JobStory spec = sub.get(originalJobId); + assertNotNull("No spec for " + jobName, spec); + assertNotNull("No counters for " + jobName, job.getCounters()); + final String originalJobName = spec.getName(); + System.out.println("originalJobName=" + originalJobName + + ";GridmixJobName=" + jobName + ";originalJobID=" + originalJobId); + assertTrue("Original job name is wrong.", + originalJobName.equals(configuration.get(Gridmix.ORIGINAL_JOB_NAME))); + + // Gridmix job seqNum contains 6 digits + int seqNumLength = 6; + String jobSeqNum = new DecimalFormat("000000").format(configuration.getInt( + GridmixJob.GRIDMIX_JOB_SEQ, -1)); + // Original job name is of the format MOCKJOB<6 digit sequence number> + // because MockJob jobNames are of this format. + assertTrue(originalJobName.substring( + originalJobName.length() - seqNumLength).equals(jobSeqNum)); + + assertTrue("Gridmix job name is not in the expected format.", + jobName.equals(GridmixJob.JOB_NAME_PREFIX + jobSeqNum)); + final FileStatus stat = GridmixTestUtils.dfs.getFileStatus(new Path( + GridmixTestUtils.DEST, "" + Integer.valueOf(jobSeqNum))); + assertEquals("Wrong owner for " + jobName, spec.getUser(), + stat.getOwner()); + final int nMaps = spec.getNumberMaps(); + final int nReds = spec.getNumberReduces(); + + final JobClient client = new JobClient( + GridmixTestUtils.mrvl.getConfig()); + final TaskReport[] mReports = client.getMapTaskReports(JobID + .downgrade(job.getJobID())); + assertEquals("Mismatched map count", nMaps, mReports.length); + check(TaskType.MAP, spec, mReports, 0, 0, SLOPBYTES, nReds); + + final TaskReport[] rReports = client.getReduceTaskReports(JobID + .downgrade(job.getJobID())); + assertEquals("Mismatched reduce count", nReds, rReports.length); + check(TaskType.REDUCE, spec, rReports, nMaps * SLOPBYTES, 2 * nMaps, 0, + 0); + + } + + } + // Verify if correct job queue is used + private void check(final TaskType type, JobStory spec, + final TaskReport[] runTasks, long extraInputBytes, + int extraInputRecords, long extraOutputBytes, int extraOutputRecords) + throws Exception { + + long[] runInputRecords = new long[runTasks.length]; + long[] runInputBytes = new long[runTasks.length]; + long[] runOutputRecords = new long[runTasks.length]; + long[] runOutputBytes = new long[runTasks.length]; + long[] specInputRecords = new long[runTasks.length]; + long[] specInputBytes = new long[runTasks.length]; + long[] specOutputRecords = new long[runTasks.length]; + long[] specOutputBytes = new long[runTasks.length]; + + for (int i = 0; i < runTasks.length; ++i) { + final TaskInfo specInfo; + final Counters counters = runTasks[i].getCounters(); + switch (type) { + case MAP: + runInputBytes[i] = counters.findCounter("FileSystemCounters", + "HDFS_BYTES_READ").getValue() + - counters.findCounter(TaskCounter.SPLIT_RAW_BYTES).getValue(); + runInputRecords[i] = (int) counters.findCounter( + TaskCounter.MAP_INPUT_RECORDS).getValue(); + runOutputBytes[i] = counters + .findCounter(TaskCounter.MAP_OUTPUT_BYTES).getValue(); + runOutputRecords[i] = (int) counters.findCounter( + TaskCounter.MAP_OUTPUT_RECORDS).getValue(); + + specInfo = spec.getTaskInfo(TaskType.MAP, i); + specInputRecords[i] = specInfo.getInputRecords(); + specInputBytes[i] = specInfo.getInputBytes(); + specOutputRecords[i] = specInfo.getOutputRecords(); + specOutputBytes[i] = specInfo.getOutputBytes(); + + LOG.info(String.format(type + " SPEC: %9d -> %9d :: %5d -> %5d\n", + specInputBytes[i], specOutputBytes[i], specInputRecords[i], + specOutputRecords[i])); + LOG.info(String.format(type + " RUN: %9d -> %9d :: %5d -> %5d\n", + runInputBytes[i], runOutputBytes[i], runInputRecords[i], + runOutputRecords[i])); + break; + case REDUCE: + runInputBytes[i] = 0; + runInputRecords[i] = (int) counters.findCounter( + TaskCounter.REDUCE_INPUT_RECORDS).getValue(); + runOutputBytes[i] = counters.findCounter("FileSystemCounters", + "HDFS_BYTES_WRITTEN").getValue(); + runOutputRecords[i] = (int) counters.findCounter( + TaskCounter.REDUCE_OUTPUT_RECORDS).getValue(); + + specInfo = spec.getTaskInfo(TaskType.REDUCE, i); + // There is no reliable counter for reduce input bytes. The + // variable-length encoding of intermediate records and other noise + // make this quantity difficult to estimate. The shuffle and spec + // input bytes are included in debug output for reference, but are + // not checked + specInputBytes[i] = 0; + specInputRecords[i] = specInfo.getInputRecords(); + specOutputRecords[i] = specInfo.getOutputRecords(); + specOutputBytes[i] = specInfo.getOutputBytes(); + LOG.info(String.format(type + " SPEC: (%9d) -> %9d :: %5d -> %5d\n", + specInfo.getInputBytes(), specOutputBytes[i], + specInputRecords[i], specOutputRecords[i])); + LOG.info(String + .format(type + " RUN: (%9d) -> %9d :: %5d -> %5d\n", counters + .findCounter(TaskCounter.REDUCE_SHUFFLE_BYTES).getValue(), + runOutputBytes[i], runInputRecords[i], runOutputRecords[i])); + break; + default: + fail("Unexpected type: " + type); + } + } + + // Check input bytes + Arrays.sort(specInputBytes); + Arrays.sort(runInputBytes); + for (int i = 0; i < runTasks.length; ++i) { + assertTrue("Mismatched " + type + " input bytes " + specInputBytes[i] + + "/" + runInputBytes[i], + eqPlusMinus(runInputBytes[i], specInputBytes[i], extraInputBytes)); + } + + // Check input records + Arrays.sort(specInputRecords); + Arrays.sort(runInputRecords); + for (int i = 0; i < runTasks.length; ++i) { + assertTrue( + "Mismatched " + type + " input records " + specInputRecords[i] + + "/" + runInputRecords[i], + eqPlusMinus(runInputRecords[i], specInputRecords[i], + extraInputRecords)); + } + + // Check output bytes + Arrays.sort(specOutputBytes); + Arrays.sort(runOutputBytes); + for (int i = 0; i < runTasks.length; ++i) { + assertTrue( + "Mismatched " + type + " output bytes " + specOutputBytes[i] + "/" + + runOutputBytes[i], + eqPlusMinus(runOutputBytes[i], specOutputBytes[i], extraOutputBytes)); + } + + // Check output records + Arrays.sort(specOutputRecords); + Arrays.sort(runOutputRecords); + for (int i = 0; i < runTasks.length; ++i) { + assertTrue( + "Mismatched " + type + " output records " + specOutputRecords[i] + + "/" + runOutputRecords[i], + eqPlusMinus(runOutputRecords[i], specOutputRecords[i], + extraOutputRecords)); + } + + } + + private static boolean eqPlusMinus(long a, long b, long x) { + final long diff = Math.abs(a - b); + return diff <= x; + } + + } + + protected void doSubmission(String jobCreatorName, boolean defaultOutputPath) + throws Exception { + final Path in = new Path("foo").makeQualified( + GridmixTestUtils.dfs.getUri(), + GridmixTestUtils.dfs.getWorkingDirectory()); + final Path out = GridmixTestUtils.DEST.makeQualified( + GridmixTestUtils.dfs.getUri(), + GridmixTestUtils.dfs.getWorkingDirectory()); + final Path root = new Path(workspace.getAbsolutePath()); + if (!workspace.exists()) { + assertTrue(workspace.mkdirs()); + } + Configuration conf = null; + + try { + ArrayList argsList = new ArrayList(); + + argsList.add("-D" + FilePool.GRIDMIX_MIN_FILE + "=0"); + argsList.add("-D" + Gridmix.GRIDMIX_USR_RSV + "=" + + EchoUserResolver.class.getName()); + if (jobCreatorName != null) { + argsList.add("-D" + JobCreator.GRIDMIX_JOB_TYPE + "=" + jobCreatorName); + } + + // Set the config property gridmix.output.directory only if + // defaultOutputPath is false. If defaultOutputPath is true, then + // let us allow gridmix to use the path foo/gridmix/ as output dir. + if (!defaultOutputPath) { + argsList.add("-D" + Gridmix.GRIDMIX_OUT_DIR + "=" + out); + } + argsList.add("-generate"); + argsList.add(String.valueOf(GENDATA) + "m"); + argsList.add(in.toString()); + argsList.add("-"); // ignored by DebugGridmix + + String[] argv = argsList.toArray(new String[argsList.size()]); + + DebugGridmix client = new DebugGridmix(); + conf = GridmixTestUtils.mrvl.getConfig(); + + CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true); + conf.setEnum(GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY, policy); + + conf.setBoolean(GridmixJob.GRIDMIX_USE_QUEUE_IN_TRACE, true); + UserGroupInformation ugi = UserGroupInformation.getLoginUser(); + conf.set(MRJobConfig.USER_NAME, ugi.getUserName()); + + // allow synthetic users to create home directories + GridmixTestUtils.dfs.mkdirs(root, new FsPermission((short) 777)); + GridmixTestUtils.dfs.setPermission(root, new FsPermission((short) 777)); + + int res = ToolRunner.run(conf, client, argv); + assertEquals("Client exited with nonzero status", 0, res); + client.checkMonitor(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + in.getFileSystem(conf).delete(in, true); + out.getFileSystem(conf).delete(out, true); + root.getFileSystem(conf).delete(root, true); + } + } +} diff --git a/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java b/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java index 413dfd99074..99b4571b7e0 100644 --- a/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java +++ b/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java @@ -29,13 +29,13 @@ import java.util.concurrent.CountDownLatch; /** * Component generating random job traces for testing on a single node. */ -class DebugJobFactory { +public class DebugJobFactory { interface Debuggable { ArrayList getSubmitted(); } - public static JobFactory getFactory( + public static JobFactory getFactory( JobSubmitter submitter, Path scratch, int numJobs, Configuration conf, CountDownLatch startFlag, UserResolver resolver) throws IOException { GridmixJobSubmissionPolicy policy = GridmixJobSubmissionPolicy.getPolicy( diff --git a/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java b/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java index fca29afce23..9a39d1d1d1a 100644 --- a/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java +++ b/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java @@ -216,7 +216,7 @@ public class DebugJobProducer implements JobStoryProducer { if (user == null) { user = String.format("foobar%d", id); } - GridmixTestUtils.createHomeAndStagingDirectory(user, (JobConf)conf); + GridmixTestUtils.createHomeAndStagingDirectory(user, conf); return user; } @@ -264,6 +264,7 @@ public class DebugJobProducer implements JobStoryProducer { throw new UnsupportedOperationException(); } + @SuppressWarnings({ "deprecation", "incomplete-switch" }) @Override public TaskAttemptInfo getTaskAttemptInfo( TaskType taskType, int taskNumber, int taskAttemptNumber) { @@ -300,7 +301,7 @@ public class DebugJobProducer implements JobStoryProducer { @Override public String getQueueName() { - String qName = "q" + ((id % 2) + 1); + String qName = "default"; return qName; } diff --git a/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/GridmixTestUtils.java b/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/GridmixTestUtils.java index 49f17097fed..50865b53da9 100644 --- a/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/GridmixTestUtils.java +++ b/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/GridmixTestUtils.java @@ -4,55 +4,76 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.mapred.MiniMRCluster; -import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MiniMRClientCluster; +import org.apache.hadoop.mapred.MiniMRClientClusterFactory; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.ShellBasedUnixGroupsMapping; -import org.apache.hadoop.security.Groups; import java.io.IOException; /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at *

* http://www.apache.org/licenses/LICENSE-2.0 *

* Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. */ public class GridmixTestUtils { private static final Log LOG = LogFactory.getLog(GridmixTestUtils.class); static final Path DEST = new Path("/gridmix"); static FileSystem dfs = null; static MiniDFSCluster dfsCluster = null; - static MiniMRCluster mrCluster = null; + static MiniMRClientCluster mrvl = null; + protected static final String GRIDMIX_USE_QUEUE_IN_TRACE = + "gridmix.job-submission.use-queue-in-trace"; + protected static final String GRIDMIX_DEFAULT_QUEUE = + "gridmix.job-submission.default-queue"; - public static void initCluster() throws IOException { + public static void initCluster(Class caller) throws IOException { Configuration conf = new Configuration(); - conf.set("mapred.queue.names", "default,q1,q2"); - dfsCluster = new MiniDFSCluster(conf, 3, true, null); +// conf.set("mapred.queue.names", "default,q1,q2"); + conf.set("mapred.queue.names", "default"); + conf.set("yarn.scheduler.capacity.root.queues", "default"); + conf.set("yarn.scheduler.capacity.root.default.capacity", "100.0"); + + + conf.setBoolean(GRIDMIX_USE_QUEUE_IN_TRACE, false); + conf.set(GRIDMIX_DEFAULT_QUEUE, "default"); + + + dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true) + .build();// MiniDFSCluster(conf, 3, true, null); dfs = dfsCluster.getFileSystem(); conf.set(JTConfig.JT_RETIREJOBS, "false"); - mrCluster = new MiniMRCluster(3, dfs.getUri().toString(), 1, null, null, - new JobConf(conf)); + mrvl = MiniMRClientClusterFactory.create(caller, 2, conf); + + conf = mrvl.getConfig(); + String[] files = conf.getStrings(MRJobConfig.CACHE_FILES); + if (files != null) { + String[] timestamps = new String[files.length]; + for (int i = 0; i < files.length; i++) { + timestamps[i] = Long.toString(System.currentTimeMillis()); + } + conf.setStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS, timestamps); + } + } public static void shutdownCluster() throws IOException { - if (mrCluster != null) { - mrCluster.shutdown(); + if (mrvl != null) { + mrvl.stop(); } if (dfsCluster != null) { dfsCluster.shutdown(); @@ -61,23 +82,25 @@ public class GridmixTestUtils { /** * Methods to generate the home directory for dummy users. - * + * * @param conf */ - public static void createHomeAndStagingDirectory(String user, JobConf conf) { + public static void createHomeAndStagingDirectory(String user, + Configuration conf) { try { FileSystem fs = dfsCluster.getFileSystem(); String path = "/user/" + user; Path homeDirectory = new Path(path); - if(fs.exists(homeDirectory)) { - fs.delete(homeDirectory,true); - } - LOG.info("Creating Home directory : " + homeDirectory); - fs.mkdirs(homeDirectory); - changePermission(user,homeDirectory, fs); - Path stagingArea = - new Path(conf.get("mapreduce.jobtracker.staging.root.dir", - "/tmp/hadoop/mapred/staging")); + if (!fs.exists(homeDirectory)) { + LOG.info("Creating Home directory : " + homeDirectory); + fs.mkdirs(homeDirectory); + changePermission(user, homeDirectory, fs); + + } + changePermission(user, homeDirectory, fs); + Path stagingArea = new Path( + conf.get("mapreduce.jobtracker.staging.root.dir", + "/tmp/hadoop/mapred/staging")); LOG.info("Creating Staging root directory : " + stagingArea); fs.mkdirs(stagingArea); fs.setPermission(stagingArea, new FsPermission((short) 0777)); @@ -87,7 +110,7 @@ public class GridmixTestUtils { } static void changePermission(String user, Path homeDirectory, FileSystem fs) - throws IOException { + throws IOException { fs.setOwner(homeDirectory, user, ""); } } diff --git a/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestDistCacheEmulation.java b/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestDistCacheEmulation.java new file mode 100644 index 00000000000..597bea2f10f --- /dev/null +++ b/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestDistCacheEmulation.java @@ -0,0 +1,430 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.gridmix; + +import static org.junit.Assert.*; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.MapContext; +import org.apache.hadoop.mapreduce.MapReduceTestUtil; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.task.MapContextImpl; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Validate emulation of distributed cache load in gridmix simulated jobs. + * + */ +public class TestDistCacheEmulation { + + private DistributedCacheEmulator dce = null; + + @BeforeClass + public static void init() throws IOException { + GridmixTestUtils.initCluster(TestDistCacheEmulation.class); + File target=new File("target"+File.separator+TestDistCacheEmulation.class.getName()); + if(!target.exists()){ + assertTrue(target.mkdirs()); + } + + } + + @AfterClass + public static void shutDown() throws IOException { + GridmixTestUtils.shutdownCluster(); + } + + /** + * Validate the dist cache files generated by GenerateDistCacheData job. + * + * @param jobConf + * configuration of GenerateDistCacheData job. + * @param sortedFileSizes + * array of sorted distributed cache file sizes + * @throws IOException + * @throws FileNotFoundException + */ + private void validateDistCacheData(Configuration jobConf, + long[] sortedFileSizes) throws FileNotFoundException, IOException { + Path distCachePath = dce.getDistributedCacheDir(); + String filesListFile = jobConf + .get(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_LIST); + FileSystem fs = FileSystem.get(jobConf); + + // Validate the existence of Distributed Cache files list file directly + // under distributed cache directory + Path listFile = new Path(filesListFile); + assertTrue("Path of Distributed Cache files list file is wrong.", + distCachePath.equals(listFile.getParent().makeQualified(fs.getUri(), fs.getWorkingDirectory()))); + + // Delete the dist cache files list file + assertTrue( + "Failed to delete distributed Cache files list file " + listFile, + fs.delete(listFile,true)); + + List fileSizes = new ArrayList(); + for (long size : sortedFileSizes) { + fileSizes.add(size); + } + // validate dist cache files after deleting the 'files list file' + validateDistCacheFiles(fileSizes, distCachePath); + } + + /** + * Validate private/public distributed cache files. + * + * @param filesSizesExpected + * list of sizes of expected dist cache files + * @param distCacheDir + * the distributed cache dir to be validated + * @throws IOException + * @throws FileNotFoundException + */ + private void validateDistCacheFiles(List filesSizesExpected, Path distCacheDir) + throws FileNotFoundException, IOException { + // RemoteIterator iter = + FileStatus[] statuses = GridmixTestUtils.dfs.listStatus(distCacheDir); + int numFiles = filesSizesExpected.size(); + assertEquals("Number of files under distributed cache dir is wrong.", + numFiles, statuses.length); + for (int i = 0; i < numFiles; i++) { + FileStatus stat = statuses[i]; + assertTrue("File size of distributed cache file " + + stat.getPath().toUri().getPath() + " is wrong.", + filesSizesExpected.remove(stat.getLen())); + + FsPermission perm = stat.getPermission(); + assertEquals("Wrong permissions for distributed cache file " + + stat.getPath().toUri().getPath(), new FsPermission((short) 0644), + perm); + } + } + + /** + * Configures 5 HDFS-based dist cache files and 1 local-FS-based dist cache + * file in the given Configuration object conf. + * + * @param conf + * configuration where dist cache config properties are to be set + * @return array of sorted HDFS-based distributed cache file sizes + * @throws IOException + */ + private long[] configureDummyDistCacheFiles(Configuration conf) + throws IOException { + String user = UserGroupInformation.getCurrentUser().getShortUserName(); + conf.set("user.name", user); + + // Set some dummy dist cache files in gridmix configuration so that they go + // into the configuration of JobStory objects. + String[] distCacheFiles = { "hdfs:///tmp/file1.txt", + "/tmp/" + user + "/.staging/job_1/file2.txt", + "hdfs:///user/user1/file3.txt", "/home/user2/file4.txt", + "subdir1/file5.txt", "subdir2/file6.gz" }; + + String[] fileSizes = { "400", "2500", "700", "1200", "1500", "500" }; + + String[] visibilities = { "true", "false", "false", "true", "true", "false" }; + String[] timeStamps = { "1234", "2345", "34567", "5434", "125", "134" }; + + // DistributedCache.setCacheFiles(fileCaches, conf); + conf.setStrings(MRJobConfig.CACHE_FILES, distCacheFiles); + conf.setStrings(MRJobConfig.CACHE_FILES_SIZES, fileSizes); + conf.setStrings(JobContext.CACHE_FILE_VISIBILITIES, visibilities); + conf.setStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS, timeStamps); + + // local FS based dist cache file whose path contains /.staging is + // not created on HDFS. So file size 2500 is not added to sortedFileSizes. + long[] sortedFileSizes = new long[] { 1500, 1200, 700, 500, 400 }; + return sortedFileSizes; + } + + /** + * Runs setupGenerateDistCacheData() on a new DistrbutedCacheEmulator and and + * returns the jobConf. Fills the array sortedFileSizes that can + * be used for validation. Validation of exit code from + * setupGenerateDistCacheData() is done. + * + * @param generate + * true if -generate option is specified + * @param sortedFileSizes + * sorted HDFS-based distributed cache file sizes + * @throws IOException + * @throws InterruptedException + */ + private Configuration runSetupGenerateDistCacheData(boolean generate, + long[] sortedFileSizes) throws IOException, InterruptedException { + Configuration conf = new Configuration(); + long[] fileSizes = configureDummyDistCacheFiles(conf); + System.arraycopy(fileSizes, 0, sortedFileSizes, 0, fileSizes.length); + + // Job stories of all 3 jobs will have same dist cache files in their + // configurations + final int numJobs = 3; + DebugJobProducer jobProducer = new DebugJobProducer(numJobs, conf); + + Configuration jobConf = GridmixTestUtils.mrvl.getConfig(); + Path ioPath = new Path("testSetupGenerateDistCacheData") + .makeQualified(GridmixTestUtils.dfs.getUri(),GridmixTestUtils.dfs.getWorkingDirectory()); + FileSystem fs = FileSystem.get(jobConf); + if (fs.exists(ioPath)) { + fs.delete(ioPath, true); + } + FileSystem.mkdirs(fs, ioPath, new FsPermission((short) 0777)); + + dce = createDistributedCacheEmulator(jobConf, ioPath, generate); + int exitCode = dce.setupGenerateDistCacheData(jobProducer); + int expectedExitCode = generate ? 0 + : Gridmix.MISSING_DIST_CACHE_FILES_ERROR; + assertEquals("setupGenerateDistCacheData failed.", expectedExitCode, + exitCode); + + // reset back + resetDistCacheConfigProperties(jobConf); + return jobConf; + } + + /** + * Reset the config properties related to Distributed Cache in the given job + * configuration jobConf. + * + * @param jobConf + * job configuration + */ + private void resetDistCacheConfigProperties(Configuration jobConf) { + // reset current/latest property names + jobConf.setStrings(MRJobConfig.CACHE_FILES, ""); + jobConf.setStrings(MRJobConfig.CACHE_FILES_SIZES, ""); + jobConf.setStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS, ""); + jobConf.setStrings(JobContext.CACHE_FILE_VISIBILITIES, ""); + // reset old property names + jobConf.setStrings("mapred.cache.files", ""); + jobConf.setStrings("mapred.cache.files.filesizes", ""); + jobConf.setStrings("mapred.cache.files.visibilities", ""); + jobConf.setStrings("mapred.cache.files.timestamps", ""); + } + + /** + * Validate GenerateDistCacheData job if it creates dist cache files properly. + * + * @throws Exception + */ + @Test (timeout=200000) + public void testGenerateDistCacheData() throws Exception { + long[] sortedFileSizes = new long[5]; + Configuration jobConf = runSetupGenerateDistCacheData(true, sortedFileSizes); + GridmixJob gridmixJob = new GenerateDistCacheData(jobConf); + Job job = gridmixJob.call(); + assertEquals("Number of reduce tasks in GenerateDistCacheData is not 0.", + 0, job.getNumReduceTasks()); + assertTrue("GenerateDistCacheData job failed.", + job.waitForCompletion(false)); + validateDistCacheData(jobConf, sortedFileSizes); + } + + /** + * Validate setupGenerateDistCacheData by validating

  • permissions of the + * distributed cache directories and
  • content of the generated sequence + * file. This includes validation of dist cache file paths and their file + * sizes. + */ + private void validateSetupGenDC(Configuration jobConf, long[] sortedFileSizes) + throws IOException, InterruptedException { + // build things needed for validation + long sumOfFileSizes = 0; + for (int i = 0; i < sortedFileSizes.length; i++) { + sumOfFileSizes += sortedFileSizes[i]; + } + + FileSystem fs = FileSystem.get(jobConf); + assertEquals("Number of distributed cache files to be generated is wrong.", + sortedFileSizes.length, + jobConf.getInt(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_COUNT, -1)); + assertEquals("Total size of dist cache files to be generated is wrong.", + sumOfFileSizes, + jobConf.getLong(GenerateDistCacheData.GRIDMIX_DISTCACHE_BYTE_COUNT, -1)); + Path filesListFile = new Path( + jobConf.get(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_LIST)); + FileStatus stat = fs.getFileStatus(filesListFile); + assertEquals("Wrong permissions of dist Cache files list file " + + filesListFile, new FsPermission((short) 0644), stat.getPermission()); + + InputSplit split = new FileSplit(filesListFile, 0, stat.getLen(), + (String[]) null); + TaskAttemptContext taskContext = MapReduceTestUtil + .createDummyMapTaskAttemptContext(jobConf); + RecordReader reader = new GenerateDistCacheData.GenDCDataFormat() + .createRecordReader(split, taskContext); + MapContext mapContext = new MapContextImpl( + jobConf, taskContext.getTaskAttemptID(), reader, null, null, + MapReduceTestUtil.createDummyReporter(), split); + reader.initialize(split, mapContext); + + // start validating setupGenerateDistCacheData + doValidateSetupGenDC(reader, fs, sortedFileSizes); + } + + /** + * Validate setupGenerateDistCacheData by validating
  • permissions of the + * distributed cache directory and
  • content of the generated sequence file. + * This includes validation of dist cache file paths and their file sizes. + */ + private void doValidateSetupGenDC( + RecordReader reader, FileSystem fs, + long[] sortedFileSizes) throws IOException, InterruptedException { + + // Validate permissions of dist cache directory + Path distCacheDir = dce.getDistributedCacheDir(); + assertEquals( + "Wrong permissions for distributed cache dir " + distCacheDir, + fs.getFileStatus(distCacheDir).getPermission().getOtherAction() + .and(FsAction.EXECUTE), FsAction.EXECUTE); + + // Validate the content of the sequence file generated by + // dce.setupGenerateDistCacheData(). + LongWritable key = new LongWritable(); + BytesWritable val = new BytesWritable(); + for (int i = 0; i < sortedFileSizes.length; i++) { + assertTrue("Number of files written to the sequence file by " + + "setupGenerateDistCacheData is less than the expected.", + reader.nextKeyValue()); + key = reader.getCurrentKey(); + val = reader.getCurrentValue(); + long fileSize = key.get(); + String file = new String(val.getBytes(), 0, val.getLength()); + + // Dist Cache files should be sorted based on file size. + assertEquals("Dist cache file size is wrong.", sortedFileSizes[i], + fileSize); + + // Validate dist cache file path. + + // parent dir of dist cache file + Path parent = new Path(file).getParent().makeQualified(fs.getUri(),fs.getWorkingDirectory()); + // should exist in dist cache dir + assertTrue("Public dist cache file path is wrong.", + distCacheDir.equals(parent)); + } + } + + /** + * Test if DistributedCacheEmulator's setup of GenerateDistCacheData is + * working as expected. + * + * @throws IOException + * @throws InterruptedException + */ + @Test (timeout=20000) + public void testSetupGenerateDistCacheData() throws IOException, + InterruptedException { + long[] sortedFileSizes = new long[5]; + Configuration jobConf = runSetupGenerateDistCacheData(true, sortedFileSizes); + validateSetupGenDC(jobConf, sortedFileSizes); + + // Verify if correct exit code is seen when -generate option is missing and + // distributed cache files are missing in the expected path. + runSetupGenerateDistCacheData(false, sortedFileSizes); + } + + /** + * Create DistributedCacheEmulator object and do the initialization by calling + * init() on it with dummy trace. Also configure the pseudo local FS. + */ + private DistributedCacheEmulator createDistributedCacheEmulator( + Configuration conf, Path ioPath, boolean generate) throws IOException { + DistributedCacheEmulator dce = new DistributedCacheEmulator(conf, ioPath); + JobCreator jobCreator = JobCreator.getPolicy(conf, JobCreator.LOADJOB); + jobCreator.setDistCacheEmulator(dce); + dce.init("dummytrace", jobCreator, generate); + return dce; + } + + /** + * Test the configuration property for disabling/enabling emulation of + * distributed cache load. + */ + @Test (timeout=2000) + public void testDistCacheEmulationConfigurability() throws IOException { + Configuration jobConf = GridmixTestUtils.mrvl.getConfig(); + Path ioPath = new Path("testDistCacheEmulationConfigurability") + .makeQualified(GridmixTestUtils.dfs.getUri(),GridmixTestUtils.dfs.getWorkingDirectory()); + FileSystem fs = FileSystem.get(jobConf); + FileSystem.mkdirs(fs, ioPath, new FsPermission((short) 0777)); + + // default config + dce = createDistributedCacheEmulator(jobConf, ioPath, false); + assertTrue("Default configuration of " + + DistributedCacheEmulator.GRIDMIX_EMULATE_DISTRIBUTEDCACHE + + " is wrong.", dce.shouldEmulateDistCacheLoad()); + + // config property set to false + jobConf.setBoolean( + DistributedCacheEmulator.GRIDMIX_EMULATE_DISTRIBUTEDCACHE, false); + dce = createDistributedCacheEmulator(jobConf, ioPath, false); + assertFalse("Disabling of emulation of distributed cache load by setting " + + DistributedCacheEmulator.GRIDMIX_EMULATE_DISTRIBUTEDCACHE + + " to false is not working.", dce.shouldEmulateDistCacheLoad()); + } +/** + * test method configureDistCacheFiles + * + */ + @Test (timeout=2000) + public void testDistCacheEmulator() throws Exception { + + Configuration conf = new Configuration(); + configureDummyDistCacheFiles(conf); + File ws = new File("target" + File.separator + this.getClass().getName()); + Path ioPath = new Path(ws.getAbsolutePath()); + + DistributedCacheEmulator dce = new DistributedCacheEmulator(conf, ioPath); + JobConf jobConf = new JobConf(conf); + jobConf.setUser(UserGroupInformation.getCurrentUser().getShortUserName()); + File fin=new File("src"+File.separator+"test"+File.separator+"resources"+File.separator+"data"+File.separator+"wordcount.json"); + dce.init(fin.getAbsolutePath(), JobCreator.LOADJOB, true); + dce.configureDistCacheFiles(conf, jobConf); + + String[] caches=conf.getStrings(MRJobConfig.CACHE_FILES); + String[] tmpfiles=conf.getStrings("tmpfiles"); + // this method should fill caches AND tmpfiles from MRJobConfig.CACHE_FILES property + assertEquals(6, ((caches==null?0:caches.length)+(tmpfiles==null?0:tmpfiles.length))); + } +} diff --git a/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestGridMixClasses.java b/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestGridMixClasses.java new file mode 100644 index 00000000000..ef1265b903f --- /dev/null +++ b/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestGridMixClasses.java @@ -0,0 +1,989 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.gridmix; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.CustomOutputCommitter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.mapred.RawKeyValueIterator; +import org.apache.hadoop.mapred.gridmix.GridmixKey.Spec; +import org.apache.hadoop.mapred.gridmix.SleepJob.SleepReducer; +import org.apache.hadoop.mapred.gridmix.SleepJob.SleepSplit; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.MapContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.ReduceContext; +import org.apache.hadoop.mapreduce.StatusReporter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.Mapper.Context; +import org.apache.hadoop.mapreduce.counters.GenericCounter; +import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; +import org.apache.hadoop.mapreduce.lib.map.WrappedMapper; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer; +import org.apache.hadoop.mapreduce.task.MapContextImpl; +import org.apache.hadoop.mapreduce.task.ReduceContextImpl; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl.DummyReporter; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.tools.rumen.JobStory; +import org.apache.hadoop.tools.rumen.JobStoryProducer; +import org.apache.hadoop.tools.rumen.ResourceUsageMetrics; +import org.apache.hadoop.tools.rumen.ZombieJobProducer; +import org.apache.hadoop.util.Progress; +import org.junit.Assert; +import org.junit.Test; + +import static org.mockito.Mockito.*; + +import static org.junit.Assert.*; + +public class TestGridMixClasses { + private static final Log LOG = LogFactory.getLog(TestGridMixClasses.class); + + /* + * simple test LoadSplit (getters,copy, write, read...) + */ + @Test (timeout=1000) + public void testLoadSplit() throws Exception { + + LoadSplit test = getLoadSplit(); + + ByteArrayOutputStream data = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(data); + test.write(out); + LoadSplit copy = new LoadSplit(); + copy.readFields(new DataInputStream(new ByteArrayInputStream(data + .toByteArray()))); + + // data should be the same + assertEquals(test.getId(), copy.getId()); + assertEquals(test.getMapCount(), copy.getMapCount()); + assertEquals(test.getInputRecords(), copy.getInputRecords()); + + assertEquals(test.getOutputBytes()[0], copy.getOutputBytes()[0]); + assertEquals(test.getOutputRecords()[0], copy.getOutputRecords()[0]); + assertEquals(test.getReduceBytes(0), copy.getReduceBytes(0)); + assertEquals(test.getReduceRecords(0), copy.getReduceRecords(0)); + assertEquals(test.getMapResourceUsageMetrics().getCumulativeCpuUsage(), + copy.getMapResourceUsageMetrics().getCumulativeCpuUsage()); + assertEquals(test.getReduceResourceUsageMetrics(0).getCumulativeCpuUsage(), + copy.getReduceResourceUsageMetrics(0).getCumulativeCpuUsage()); + + } + + /* + * simple test GridmixSplit (copy, getters, write, read..) + */ + @Test (timeout=1000) + public void testGridmixSplit() throws Exception { + Path[] files = {new Path("one"), new Path("two")}; + long[] start = {1, 2}; + long[] lengths = {100, 200}; + String[] locations = {"locOne", "loctwo"}; + + CombineFileSplit cfSplit = new CombineFileSplit(files, start, lengths, + locations); + ResourceUsageMetrics metrics = new ResourceUsageMetrics(); + metrics.setCumulativeCpuUsage(200); + + double[] reduceBytes = {8.1d, 8.2d}; + double[] reduceRecords = {9.1d, 9.2d}; + long[] reduceOutputBytes = {101L, 102L}; + long[] reduceOutputRecords = {111L, 112L}; + + GridmixSplit test = new GridmixSplit(cfSplit, 2, 3, 4L, 5L, 6L, 7L, + reduceBytes, reduceRecords, reduceOutputBytes, reduceOutputRecords); + + ByteArrayOutputStream data = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(data); + test.write(out); + GridmixSplit copy = new GridmixSplit(); + copy.readFields(new DataInputStream(new ByteArrayInputStream(data + .toByteArray()))); + + // data should be the same + assertEquals(test.getId(), copy.getId()); + assertEquals(test.getMapCount(), copy.getMapCount()); + assertEquals(test.getInputRecords(), copy.getInputRecords()); + + assertEquals(test.getOutputBytes()[0], copy.getOutputBytes()[0]); + assertEquals(test.getOutputRecords()[0], copy.getOutputRecords()[0]); + assertEquals(test.getReduceBytes(0), copy.getReduceBytes(0)); + assertEquals(test.getReduceRecords(0), copy.getReduceRecords(0)); + + } + + /* + * test LoadMapper loadMapper should write to writer record for each reduce + */ + @SuppressWarnings({"rawtypes", "unchecked"}) + @Test (timeout=10000) + public void testLoadMapper() throws Exception { + + Configuration conf = new Configuration(); + conf.setInt(JobContext.NUM_REDUCES, 2); + + CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true); + conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true); + + TaskAttemptID taskId = new TaskAttemptID(); + RecordReader reader = new FakeRecordReader(); + + LoadRecordGkGrWriter writer = new LoadRecordGkGrWriter(); + + OutputCommitter committer = new CustomOutputCommitter(); + StatusReporter reporter = new TaskAttemptContextImpl.DummyReporter(); + LoadSplit split = getLoadSplit(); + + MapContext mapContext = new MapContextImpl( + conf, taskId, reader, writer, committer, reporter, split); + // context + Context ctx = new WrappedMapper() + .getMapContext(mapContext); + + reader.initialize(split, ctx); + ctx.getConfiguration().setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true); + CompressionEmulationUtil.setCompressionEmulationEnabled( + ctx.getConfiguration(), true); + + LoadJob.LoadMapper mapper = new LoadJob.LoadMapper(); + // setup, map, clean + mapper.run(ctx); + + Map data = writer.getData(); + // check result + assertEquals(2, data.size()); + + } + + private LoadSplit getLoadSplit() throws Exception { + + Path[] files = {new Path("one"), new Path("two")}; + long[] start = {1, 2}; + long[] lengths = {100, 200}; + String[] locations = {"locOne", "loctwo"}; + + CombineFileSplit cfSplit = new CombineFileSplit(files, start, lengths, + locations); + ResourceUsageMetrics metrics = new ResourceUsageMetrics(); + metrics.setCumulativeCpuUsage(200); + ResourceUsageMetrics[] rMetrics = {metrics}; + + double[] reduceBytes = {8.1d, 8.2d}; + double[] reduceRecords = {9.1d, 9.2d}; + long[] reduceOutputBytes = {101L, 102L}; + long[] reduceOutputRecords = {111L, 112L}; + + return new LoadSplit(cfSplit, 2, 1, 4L, 5L, 6L, 7L, + reduceBytes, reduceRecords, reduceOutputBytes, reduceOutputRecords, + metrics, rMetrics); + } + + private class FakeRecordLLReader extends + RecordReader { + + int counter = 10; + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + counter--; + return counter > 0; + } + + @Override + public LongWritable getCurrentKey() throws IOException, + InterruptedException { + + return new LongWritable(counter); + } + + @Override + public LongWritable getCurrentValue() throws IOException, + InterruptedException { + return new LongWritable(counter * 10); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return counter / 10.0f; + } + + @Override + public void close() throws IOException { + // restore data + counter = 10; + } + } + + private class FakeRecordReader extends + RecordReader { + + int counter = 10; + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + counter--; + return counter > 0; + } + + @Override + public NullWritable getCurrentKey() throws IOException, + InterruptedException { + + return NullWritable.get(); + } + + @Override + public GridmixRecord getCurrentValue() throws IOException, + InterruptedException { + return new GridmixRecord(100, 100L); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return counter / 10.0f; + } + + @Override + public void close() throws IOException { + // restore data + counter = 10; + } + } + + private class LoadRecordGkGrWriter extends + RecordWriter { + private Map data = new HashMap(); + + @Override + public void write(GridmixKey key, GridmixRecord value) throws IOException, + InterruptedException { + data.put(key, value); + } + + @Override + public void close(TaskAttemptContext context) throws IOException, + InterruptedException { + } + + public Map getData() { + return data; + } + + } + + private class LoadRecordGkNullWriter extends + RecordWriter { + private Map data = new HashMap(); + + @Override + public void write(GridmixKey key, NullWritable value) throws IOException, + InterruptedException { + data.put(key, value); + } + + @Override + public void close(TaskAttemptContext context) throws IOException, + InterruptedException { + } + + public Map getData() { + return data; + } + + } + + private class LoadRecordWriter extends + RecordWriter { + private Map data = new HashMap(); + + @Override + public void write(NullWritable key, GridmixRecord value) + throws IOException, InterruptedException { + data.put(key, value); + } + + @Override + public void close(TaskAttemptContext context) throws IOException, + InterruptedException { + } + + public Map getData() { + return data; + } + + } + + /* + * test LoadSortComparator + */ + @Test (timeout=1000) + public void testLoadJobLoadSortComparator() throws Exception { + LoadJob.LoadSortComparator test = new LoadJob.LoadSortComparator(); + + ByteArrayOutputStream data = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(data); + WritableUtils.writeVInt(dos, 2); + WritableUtils.writeVInt(dos, 1); + WritableUtils.writeVInt(dos, 4); + WritableUtils.writeVInt(dos, 7); + WritableUtils.writeVInt(dos, 4); + + byte[] b1 = data.toByteArray(); + + byte[] b2 = data.toByteArray(); + + // the same data should be equals + assertEquals(0, test.compare(b1, 0, 1, b2, 0, 1)); + b2[2] = 5; + // compare like GridMixKey first byte: shift count -1=4-5 + assertEquals(-1, test.compare(b1, 0, 1, b2, 0, 1)); + b2[2] = 2; + // compare like GridMixKey first byte: shift count 2=4-2 + assertEquals(2, test.compare(b1, 0, 1, b2, 0, 1)); + // compare arrays by first byte witch offset (2-1) because 4==4 + b2[2] = 4; + assertEquals(1, test.compare(b1, 0, 1, b2, 1, 1)); + + } + + /* + * test SpecGroupingComparator + */ + @Test (timeout=1000) + public void testGridmixJobSpecGroupingComparator() throws Exception { + GridmixJob.SpecGroupingComparator test = new GridmixJob.SpecGroupingComparator(); + + ByteArrayOutputStream data = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(data); + WritableUtils.writeVInt(dos, 2); + WritableUtils.writeVInt(dos, 1); + // 0: REDUCE SPEC + WritableUtils.writeVInt(dos, 0); + WritableUtils.writeVInt(dos, 7); + WritableUtils.writeVInt(dos, 4); + + byte[] b1 = data.toByteArray(); + + byte[] b2 = data.toByteArray(); + + // the same object should be equals + assertEquals(0, test.compare(b1, 0, 1, b2, 0, 1)); + b2[2] = 1; + // for Reduce + assertEquals(-1, test.compare(b1, 0, 1, b2, 0, 1)); + // by Reduce spec + b2[2] = 1; // 1: DATA SPEC + assertEquals(-1, test.compare(b1, 0, 1, b2, 0, 1)); + // compare GridmixKey the same objects should be equals + assertEquals(0, test.compare(new GridmixKey(GridmixKey.DATA, 100, 2), + new GridmixKey(GridmixKey.DATA, 100, 2))); + // REDUSE SPEC + assertEquals(-1, test.compare( + new GridmixKey(GridmixKey.REDUCE_SPEC, 100, 2), new GridmixKey( + GridmixKey.DATA, 100, 2))); + assertEquals(1, test.compare(new GridmixKey(GridmixKey.DATA, 100, 2), + new GridmixKey(GridmixKey.REDUCE_SPEC, 100, 2))); + // only DATA + assertEquals(2, test.compare(new GridmixKey(GridmixKey.DATA, 102, 2), + new GridmixKey(GridmixKey.DATA, 100, 2))); + + } + + /* + * test CompareGridmixJob only equals and compare + */ + @Test (timeout=10000) + public void testCompareGridmixJob() throws Exception { + Configuration conf = new Configuration(); + Path outRoot = new Path("target"); + JobStory jobDesc = mock(JobStory.class); + when(jobDesc.getName()).thenReturn("JobName"); + when(jobDesc.getJobConf()).thenReturn(new JobConf(conf)); + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + GridmixJob j1 = new LoadJob(conf, 1000L, jobDesc, outRoot, ugi, 0); + GridmixJob j2 = new LoadJob(conf, 1000L, jobDesc, outRoot, ugi, 0); + GridmixJob j3 = new LoadJob(conf, 1000L, jobDesc, outRoot, ugi, 1); + GridmixJob j4 = new LoadJob(conf, 1000L, jobDesc, outRoot, ugi, 1); + + assertTrue(j1.equals(j2)); + assertEquals(0, j1.compareTo(j2)); + // Only one parameter matters + assertFalse(j1.equals(j3)); + // compare id and submissionMillis + assertEquals(-1, j1.compareTo(j3)); + assertEquals(-1, j1.compareTo(j4)); + + } + + /* + * test ReadRecordFactory. should read all data from inputstream + */ + @Test (timeout=1000) + public void testReadRecordFactory() throws Exception { + + // RecordFactory factory, InputStream src, Configuration conf + RecordFactory rf = new FakeRecordFactory(); + FakeInputStream input = new FakeInputStream(); + ReadRecordFactory test = new ReadRecordFactory(rf, input, + new Configuration()); + GridmixKey key = new GridmixKey(GridmixKey.DATA, 100, 2); + GridmixRecord val = new GridmixRecord(200, 2); + while (test.next(key, val)) { + + } + // should be read 10* (GridmixKey.size +GridmixRecord.value) + assertEquals(3000, input.getCounter()); + // should be -1 because all data readed; + assertEquals(-1, rf.getProgress(), 0.01); + + test.close(); + } + + private class FakeRecordFactory extends RecordFactory { + + private int counter = 10; + + @Override + public void close() throws IOException { + + } + + @Override + public boolean next(GridmixKey key, GridmixRecord val) throws IOException { + counter--; + return counter >= 0; + } + + @Override + public float getProgress() throws IOException { + return counter; + } + + } + + private class FakeInputStream extends InputStream implements Seekable, + PositionedReadable { + private long counter; + + @Override + public int read() throws IOException { + return 0; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + int realLen = len - off; + counter += realLen; + for (int i = 0; i < b.length; i++) { + b[i] = 0; + } + return realLen; + } + + public long getCounter() { + return counter; + } + + @Override + public void seek(long pos) throws IOException { + + } + + @Override + public long getPos() throws IOException { + return counter; + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + return false; + } + + @Override + public int read(long position, byte[] buffer, int offset, int length) + throws IOException { + return 0; + } + + @Override + public void readFully(long position, byte[] buffer, int offset, int length) + throws IOException { + + } + + @Override + public void readFully(long position, byte[] buffer) throws IOException { + + } + } + + private class FakeFSDataInputStream extends FSDataInputStream { + + public FakeFSDataInputStream(InputStream in) throws IOException { + super(in); + + } + + } + + /* + * test LoadRecordReader. It class reads data from some files. + */ + @Test (timeout=1000) + public void testLoadJobLoadRecordReader() throws Exception { + LoadJob.LoadRecordReader test = new LoadJob.LoadRecordReader(); + Configuration conf = new Configuration(); + + FileSystem fs1 = mock(FileSystem.class); + when(fs1.open((Path) anyObject())).thenReturn( + new FakeFSDataInputStream(new FakeInputStream())); + Path p1 = mock(Path.class); + when(p1.getFileSystem((JobConf) anyObject())).thenReturn(fs1); + + FileSystem fs2 = mock(FileSystem.class); + when(fs2.open((Path) anyObject())).thenReturn( + new FakeFSDataInputStream(new FakeInputStream())); + Path p2 = mock(Path.class); + when(p2.getFileSystem((JobConf) anyObject())).thenReturn(fs2); + + Path[] paths = {p1, p2}; + + long[] start = {0, 0}; + long[] lengths = {1000, 1000}; + String[] locations = {"temp1", "temp2"}; + CombineFileSplit cfsplit = new CombineFileSplit(paths, start, lengths, + locations); + double[] reduceBytes = {100, 100}; + double[] reduceRecords = {2, 2}; + long[] reduceOutputBytes = {500, 500}; + long[] reduceOutputRecords = {2, 2}; + ResourceUsageMetrics metrics = new ResourceUsageMetrics(); + ResourceUsageMetrics[] rMetrics = {new ResourceUsageMetrics(), + new ResourceUsageMetrics()}; + LoadSplit input = new LoadSplit(cfsplit, 2, 3, 1500L, 2L, 3000L, 2L, + reduceBytes, reduceRecords, reduceOutputBytes, reduceOutputRecords, + metrics, rMetrics); + TaskAttemptID taskId = new TaskAttemptID(); + TaskAttemptContext ctx = new TaskAttemptContextImpl(conf, taskId); + test.initialize(input, ctx); + GridmixRecord gr = test.getCurrentValue(); + int counter = 0; + while (test.nextKeyValue()) { + gr = test.getCurrentValue(); + if (counter == 0) { + // read first file + assertEquals(0.5, test.getProgress(), 0.001); + } else if (counter == 1) { + // read second file + assertEquals(1.0, test.getProgress(), 0.001); + } + // + assertEquals(1000, gr.getSize()); + counter++; + } + assertEquals(1000, gr.getSize()); + // Two files have been read + assertEquals(2, counter); + + test.close(); + } + + /* + * test LoadReducer + */ + + @Test (timeout=1000) + public void testLoadJobLoadReducer() throws Exception { + LoadJob.LoadReducer test = new LoadJob.LoadReducer(); + + Configuration conf = new Configuration(); + conf.setInt(JobContext.NUM_REDUCES, 2); + CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true); + conf.setBoolean(FileOutputFormat.COMPRESS, true); + + CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true); + conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true); + TaskAttemptID taskid = new TaskAttemptID(); + + RawKeyValueIterator input = new FakeRawKeyValueIterator(); + + Counter counter = new GenericCounter(); + Counter inputValueCounter = new GenericCounter(); + LoadRecordWriter output = new LoadRecordWriter(); + + OutputCommitter committer = new CustomOutputCommitter(); + + StatusReporter reporter = new DummyReporter(); + RawComparator comparator = new FakeRawComparator(); + + ReduceContext reduceContext = new ReduceContextImpl( + conf, taskid, input, counter, inputValueCounter, output, committer, + reporter, comparator, GridmixKey.class, GridmixRecord.class); + // read for previous data + reduceContext.nextKeyValue(); + org.apache.hadoop.mapreduce.Reducer.Context context = new WrappedReducer() + .getReducerContext(reduceContext); + + // test.setup(context); + test.run(context); + // have been readed 9 records (-1 for previous) + assertEquals(9, counter.getValue()); + assertEquals(10, inputValueCounter.getValue()); + assertEquals(1, output.getData().size()); + GridmixRecord record = output.getData().values().iterator() + .next(); + + assertEquals(1593, record.getSize()); + } + + protected class FakeRawKeyValueIterator implements RawKeyValueIterator { + + int counter = 10; + + @Override + public DataInputBuffer getKey() throws IOException { + ByteArrayOutputStream dt = new ByteArrayOutputStream(); + GridmixKey key = new GridmixKey(GridmixKey.REDUCE_SPEC, 10 * counter, 1L); + Spec spec = new Spec(); + spec.rec_in = counter; + spec.rec_out = counter; + spec.bytes_out = counter * 100; + + key.setSpec(spec); + key.write(new DataOutputStream(dt)); + DataInputBuffer result = new DataInputBuffer(); + byte[] b = dt.toByteArray(); + result.reset(b, 0, b.length); + return result; + } + + @Override + public DataInputBuffer getValue() throws IOException { + ByteArrayOutputStream dt = new ByteArrayOutputStream(); + GridmixRecord key = new GridmixRecord(100, 1); + key.write(new DataOutputStream(dt)); + DataInputBuffer result = new DataInputBuffer(); + byte[] b = dt.toByteArray(); + result.reset(b, 0, b.length); + return result; + } + + @Override + public boolean next() throws IOException { + counter--; + return counter >= 0; + } + + @Override + public void close() throws IOException { + + } + + @Override + public Progress getProgress() { + return null; + } + + } + + private class FakeRawComparator implements RawComparator { + + @Override + public int compare(GridmixKey o1, GridmixKey o2) { + return o1.compareTo(o2); + } + + @Override + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + if ((l1 - s1) != (l2 - s2)) { + return (l1 - s1) - (l2 - s2); + } + int len = l1 - s1; + for (int i = 0; i < len; i++) { + if (b1[s1 + i] != b2[s2 + i]) { + return b1[s1 + i] - b2[s2 + i]; + } + } + return 0; + } + + } + + /* + * test SerialJobFactory + */ + @Test (timeout=40000) + public void testSerialReaderThread() throws Exception { + + Configuration conf = new Configuration(); + File fin = new File("src" + File.separator + "test" + File.separator + + "resources" + File.separator + "data" + File.separator + + "wordcount2.json"); + // read couple jobs from wordcount2.json + JobStoryProducer jobProducer = new ZombieJobProducer(new Path( + fin.getAbsolutePath()), null, conf); + CountDownLatch startFlag = new CountDownLatch(1); + UserResolver resolver = new SubmitterUserResolver(); + FakeJobSubmitter submitter = new FakeJobSubmitter(); + File ws = new File("target" + File.separator + this.getClass().getName()); + if (!ws.exists()) { + Assert.assertTrue(ws.mkdirs()); + } + + SerialJobFactory jobFactory = new SerialJobFactory(submitter, jobProducer, + new Path(ws.getAbsolutePath()), conf, startFlag, resolver); + + Path ioPath = new Path(ws.getAbsolutePath()); + jobFactory.setDistCacheEmulator(new DistributedCacheEmulator(conf, ioPath)); + Thread test = jobFactory.createReaderThread(); + test.start(); + Thread.sleep(1000); + // SerialReaderThread waits startFlag + assertEquals(0, submitter.getJobs().size()); + // start! + startFlag.countDown(); + while (test.isAlive()) { + Thread.sleep(1000); + jobFactory.update(null); + } + // submitter was called twice + assertEquals(2, submitter.getJobs().size()); + } + + private class FakeJobSubmitter extends JobSubmitter { + // counter for submitted jobs + private List jobs = new ArrayList(); + + public FakeJobSubmitter() { + super(null, 1, 1, null, null); + + } + + @Override + public void add(GridmixJob job) throws InterruptedException { + jobs.add(job); + } + + public List getJobs() { + return jobs; + } + } + + /* + * test SleepMapper + */ + @SuppressWarnings({"unchecked", "rawtypes"}) + @Test (timeout=10000) + public void testSleepMapper() throws Exception { + SleepJob.SleepMapper test = new SleepJob.SleepMapper(); + + Configuration conf = new Configuration(); + conf.setInt(JobContext.NUM_REDUCES, 2); + + CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true); + conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true); + TaskAttemptID taskId = new TaskAttemptID(); + FakeRecordLLReader reader = new FakeRecordLLReader(); + LoadRecordGkNullWriter writer = new LoadRecordGkNullWriter(); + OutputCommitter committer = new CustomOutputCommitter(); + StatusReporter reporter = new TaskAttemptContextImpl.DummyReporter(); + SleepSplit split = getSleepSplit(); + MapContext mapcontext = new MapContextImpl( + conf, taskId, reader, writer, committer, reporter, split); + Context context = new WrappedMapper() + .getMapContext(mapcontext); + + long start = System.currentTimeMillis(); + LOG.info("start:" + start); + LongWritable key = new LongWritable(start + 2000); + LongWritable value = new LongWritable(start + 2000); + // should slip 2 sec + test.map(key, value, context); + LOG.info("finish:" + System.currentTimeMillis()); + assertTrue(System.currentTimeMillis() >= (start + 2000)); + + test.cleanup(context); + assertEquals(1, writer.getData().size()); + } + + private SleepSplit getSleepSplit() throws Exception { + + String[] locations = {"locOne", "loctwo"}; + + long[] reduceDurations = {101L, 102L}; + + return new SleepSplit(0, 2000L, reduceDurations, 2, locations); + } + + /* + * test SleepReducer + */ + @Test (timeout=1000) + public void testSleepReducer() throws Exception { + Configuration conf = new Configuration(); + conf.setInt(JobContext.NUM_REDUCES, 2); + CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true); + conf.setBoolean(FileOutputFormat.COMPRESS, true); + + CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true); + conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true); + TaskAttemptID taskId = new TaskAttemptID(); + + RawKeyValueIterator input = new FakeRawKeyValueReducerIterator(); + + Counter counter = new GenericCounter(); + Counter inputValueCounter = new GenericCounter(); + RecordWriter output = new LoadRecordReduceWriter(); + + OutputCommitter committer = new CustomOutputCommitter(); + + StatusReporter reporter = new DummyReporter(); + RawComparator comparator = new FakeRawComparator(); + + ReduceContext reducecontext = new ReduceContextImpl( + conf, taskId, input, counter, inputValueCounter, output, committer, + reporter, comparator, GridmixKey.class, NullWritable.class); + org.apache.hadoop.mapreduce.Reducer.Context context = new WrappedReducer() + .getReducerContext(reducecontext); + + SleepReducer test = new SleepReducer(); + long start = System.currentTimeMillis(); + test.setup(context); + long sleeper = context.getCurrentKey().getReduceOutputBytes(); + // status has been changed + assertEquals("Sleeping... " + sleeper + " ms left", context.getStatus()); + // should sleep 0.9 sec + + assertTrue(System.currentTimeMillis() >= (start + sleeper)); + test.cleanup(context); + // status has been changed again + + assertEquals("Slept for " + sleeper, context.getStatus()); + + } + + private class LoadRecordReduceWriter extends + RecordWriter { + + @Override + public void write(NullWritable key, NullWritable value) throws IOException, + InterruptedException { + } + + @Override + public void close(TaskAttemptContext context) throws IOException, + InterruptedException { + } + + } + + protected class FakeRawKeyValueReducerIterator implements RawKeyValueIterator { + + int counter = 10; + + @Override + public DataInputBuffer getKey() throws IOException { + ByteArrayOutputStream dt = new ByteArrayOutputStream(); + GridmixKey key = new GridmixKey(GridmixKey.REDUCE_SPEC, 10 * counter, 1L); + Spec spec = new Spec(); + spec.rec_in = counter; + spec.rec_out = counter; + spec.bytes_out = counter * 100; + + key.setSpec(spec); + key.write(new DataOutputStream(dt)); + DataInputBuffer result = new DataInputBuffer(); + byte[] b = dt.toByteArray(); + result.reset(b, 0, b.length); + return result; + } + + @Override + public DataInputBuffer getValue() throws IOException { + ByteArrayOutputStream dt = new ByteArrayOutputStream(); + NullWritable key = NullWritable.get(); + key.write(new DataOutputStream(dt)); + DataInputBuffer result = new DataInputBuffer(); + byte[] b = dt.toByteArray(); + result.reset(b, 0, b.length); + return result; + } + + @Override + public boolean next() throws IOException { + counter--; + return counter >= 0; + } + + @Override + public void close() throws IOException { + + } + + @Override + public Progress getProgress() { + return null; + } + + } +} diff --git a/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java b/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java new file mode 100644 index 00000000000..f1800c177aa --- /dev/null +++ b/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.gridmix; + +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.tools.rumen.JobStory; +import org.apache.hadoop.tools.rumen.JobStoryProducer; +import org.apache.hadoop.util.ExitUtil; +import org.apache.log4j.Level; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.InputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.util.zip.GZIPInputStream; + +import static org.junit.Assert.*; + +public class TestGridmixSubmission extends CommonJobTest { + private static File inSpace = new File("src" + File.separator + "test" + + File.separator + "resources" + File.separator + "data"); + + + static { + ((Log4JLogger) LogFactory.getLog("org.apache.hadoop.mapred.gridmix")) + .getLogger().setLevel(Level.DEBUG); + } + + + @BeforeClass + public static void init() throws IOException { + GridmixTestUtils.initCluster(TestGridmixSubmission.class); + + System.setProperty("src.test.data", inSpace.getAbsolutePath()); + } + + @AfterClass + public static void shutDown() throws IOException { + GridmixTestUtils.shutdownCluster(); + } + + /** + * Verifies that the given {@code JobStory} corresponds to the checked-in + * WordCount {@code JobStory}. The verification is effected via JUnit + * assertions. + * + * @param js the candidate JobStory. + */ + private void verifyWordCountJobStory(JobStory js) { + assertNotNull("Null JobStory", js); + String expectedJobStory = "WordCount:johndoe:default:1285322645148:3:1"; + String actualJobStory = js.getName() + ":" + js.getUser() + ":" + + js.getQueueName() + ":" + js.getSubmissionTime() + ":" + + js.getNumberMaps() + ":" + js.getNumberReduces(); + assertEquals("Unexpected JobStory", expectedJobStory, actualJobStory); + } + + /** + * Expands a file compressed using {@code gzip}. + * + * @param fs the {@code FileSystem} corresponding to the given file. + * @param in the path to the compressed file. + * @param out the path to the uncompressed output. + * @throws Exception if there was an error during the operation. + */ + private void expandGzippedTrace(FileSystem fs, Path in, Path out) + throws Exception { + byte[] buff = new byte[4096]; + GZIPInputStream gis = new GZIPInputStream(fs.open(in)); + FSDataOutputStream fsdOs = fs.create(out); + int numRead; + while ((numRead = gis.read(buff, 0, buff.length)) != -1) { + fsdOs.write(buff, 0, numRead); + } + gis.close(); + fsdOs.close(); + } + + /** + * Tests the reading of traces in GridMix3. These traces are generated by + * Rumen and are in the JSON format. The traces can optionally be compressed + * and uncompressed traces can also be passed to GridMix3 via its standard + * input stream. The testing is effected via JUnit assertions. + * + * @throws Exception if there was an error. + */ + @Test (timeout=20000) + public void testTraceReader() throws Exception { + Configuration conf = new Configuration(); + FileSystem lfs = FileSystem.getLocal(conf); + Path rootInputDir = new Path(System.getProperty("src.test.data")); + rootInputDir = rootInputDir.makeQualified(lfs.getUri(), + lfs.getWorkingDirectory()); + Path rootTempDir = new Path(System.getProperty("test.build.data", + System.getProperty("java.io.tmpdir")), "testTraceReader"); + rootTempDir = rootTempDir.makeQualified(lfs.getUri(), + lfs.getWorkingDirectory()); + Path inputFile = new Path(rootInputDir, "wordcount.json.gz"); + Path tempFile = new Path(rootTempDir, "gridmix3-wc.json"); + + InputStream origStdIn = System.in; + InputStream tmpIs = null; + try { + DebugGridmix dgm = new DebugGridmix(); + JobStoryProducer jsp = dgm.createJobStoryProducer(inputFile.toString(), + conf); + + LOG.info("Verifying JobStory from compressed trace..."); + verifyWordCountJobStory(jsp.getNextJob()); + + expandGzippedTrace(lfs, inputFile, tempFile); + jsp = dgm.createJobStoryProducer(tempFile.toString(), conf); + LOG.info("Verifying JobStory from uncompressed trace..."); + verifyWordCountJobStory(jsp.getNextJob()); + + tmpIs = lfs.open(tempFile); + System.setIn(tmpIs); + LOG.info("Verifying JobStory from trace in standard input..."); + jsp = dgm.createJobStoryProducer("-", conf); + verifyWordCountJobStory(jsp.getNextJob()); + } finally { + System.setIn(origStdIn); + if (tmpIs != null) { + tmpIs.close(); + } + lfs.delete(rootTempDir, true); + } + } + + @Test (timeout=500000) + public void testReplaySubmit() throws Exception { + policy = GridmixJobSubmissionPolicy.REPLAY; + LOG.info(" Replay started at " + System.currentTimeMillis()); + doSubmission(null, false); + LOG.info(" Replay ended at " + System.currentTimeMillis()); + + } + + @Test (timeout=500000) + public void testStressSubmit() throws Exception { + policy = GridmixJobSubmissionPolicy.STRESS; + LOG.info(" Stress started at " + System.currentTimeMillis()); + doSubmission(null, false); + LOG.info(" Stress ended at " + System.currentTimeMillis()); + } + + // test empty request should be hint message + @Test (timeout=100000) + public void testMain() throws Exception { + + SecurityManager securityManager = System.getSecurityManager(); + + final ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + final PrintStream out = new PrintStream(bytes); + final PrintStream oldOut = System.out; + System.setErr(out); + ExitUtil.disableSystemExit(); + try { + String[] argv = new String[0]; + DebugGridmix.main(argv); + + } catch (ExitUtil.ExitException e) { + assertEquals("ExitException", e.getMessage()); + ExitUtil.resetFirstExitException(); + } finally { + System.setErr(oldOut); + System.setSecurityManager(securityManager); + } + String print = bytes.toString(); + // should be printed tip in std error stream + assertTrue(print + .contains("Usage: gridmix [-generate ] [-users URI] [-Dname=value ...] ")); + assertTrue(print.contains("e.g. gridmix -generate 100m foo -")); + } + + +} diff --git a/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestGridmixSummary.java b/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestGridmixSummary.java index 61e5ea05777..156e4383539 100644 --- a/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestGridmixSummary.java +++ b/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestGridmixSummary.java @@ -45,7 +45,7 @@ public class TestGridmixSummary { /** * Test {@link DataStatistics}. */ - @Test + @Test (timeout=20000) public void testDataStatistics() throws Exception { // test data-statistics getters with compression enabled DataStatistics stats = new DataStatistics(10, 2, true); @@ -133,7 +133,7 @@ public class TestGridmixSummary { /** * A fake {@link JobFactory}. */ - @SuppressWarnings("unchecked") + @SuppressWarnings("rawtypes") private static class FakeJobFactory extends JobFactory { /** * A fake {@link JobStoryProducer} for {@link FakeJobFactory}. @@ -166,8 +166,8 @@ public class TestGridmixSummary { /** * Test {@link ExecutionSummarizer}. */ - @Test - @SuppressWarnings("unchecked") + @Test (timeout=20000) + @SuppressWarnings({ "unchecked", "rawtypes" }) public void testExecutionSummarizer() throws IOException { Configuration conf = new Configuration(); @@ -359,8 +359,7 @@ public class TestGridmixSummary { /** * Test {@link ClusterSummarizer}. */ - @Test - @SuppressWarnings("deprecation") + @Test (timeout=20000) public void testClusterSummarizer() throws IOException { ClusterSummarizer cs = new ClusterSummarizer(); Configuration conf = new Configuration(); @@ -374,13 +373,13 @@ public class TestGridmixSummary { assertEquals("JT name mismatch", jt, cs.getJobTrackerInfo()); assertEquals("NN name mismatch", nn, cs.getNamenodeInfo()); - ClusterStats cstats = ClusterStats.getClusterStats(); + ClusterStats cStats = ClusterStats.getClusterStats(); conf.set(JTConfig.JT_IPC_ADDRESS, "local"); conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "local"); JobClient jc = new JobClient(conf); - cstats.setClusterMetric(jc.getClusterStatus()); + cStats.setClusterMetric(jc.getClusterStatus()); - cs.update(cstats); + cs.update(cStats); // test assertEquals("Cluster summary test failed!", 1, cs.getMaxMapTasks()); diff --git a/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestLoadJob.java b/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestLoadJob.java new file mode 100644 index 00000000000..69c3a793736 --- /dev/null +++ b/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestLoadJob.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

    + * http://www.apache.org/licenses/LICENSE-2.0 + *

    + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.gridmix; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.log4j.Level; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; + +/* + Test LoadJob Gridmix sends data to job and after that + */ +public class TestLoadJob extends CommonJobTest { + + public static final Log LOG = LogFactory.getLog(Gridmix.class); + + static { + ((Log4JLogger) LogFactory.getLog("org.apache.hadoop.mapred.gridmix")) + .getLogger().setLevel(Level.DEBUG); + ((Log4JLogger) LogFactory.getLog(StressJobFactory.class)).getLogger() + .setLevel(Level.DEBUG); + } + + + @BeforeClass + public static void init() throws IOException { + GridmixTestUtils.initCluster(TestLoadJob.class); + } + + @AfterClass + public static void shutDown() throws IOException { + GridmixTestUtils.shutdownCluster(); + } + + + /* + * test serial policy with LoadJob. Task should execute without exceptions + */ + @Test (timeout=500000) + public void testSerialSubmit() throws Exception { + policy = GridmixJobSubmissionPolicy.SERIAL; + LOG.info("Serial started at " + System.currentTimeMillis()); + doSubmission(JobCreator.LOADJOB.name(), false); + + LOG.info("Serial ended at " + System.currentTimeMillis()); + } + + /* + * test reply policy with LoadJob + */ + @Test (timeout=500000) + public void testReplaySubmit() throws Exception { + policy = GridmixJobSubmissionPolicy.REPLAY; + LOG.info(" Replay started at " + System.currentTimeMillis()); + doSubmission(JobCreator.LOADJOB.name(), false); + + LOG.info(" Replay ended at " + System.currentTimeMillis()); + } + + +} diff --git a/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestSleepJob.java b/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestSleepJob.java new file mode 100644 index 00000000000..95b88258c0a --- /dev/null +++ b/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestSleepJob.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

    + * http://www.apache.org/licenses/LICENSE-2.0 + *

    + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.gridmix; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.tools.rumen.JobStory; +import org.apache.log4j.Level; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; + +import static org.junit.Assert.*; + +public class TestSleepJob extends CommonJobTest { + + public static final Log LOG = LogFactory.getLog(Gridmix.class); + + static { + ((Log4JLogger) LogFactory.getLog("org.apache.hadoop.mapred.gridmix")) + .getLogger().setLevel(Level.DEBUG); + } + + static GridmixJobSubmissionPolicy policy = GridmixJobSubmissionPolicy.REPLAY; + + @BeforeClass + public static void init() throws IOException { + GridmixTestUtils.initCluster(TestSleepJob.class); + } + + @AfterClass + public static void shutDown() throws IOException { + GridmixTestUtils.shutdownCluster(); + } + + + /* + * test RandomLocation + */ + @Test (timeout=600000) + public void testRandomLocation() throws Exception { + UserGroupInformation ugi = UserGroupInformation.getLoginUser(); + + testRandomLocation(1, 10, ugi); + testRandomLocation(2, 10, ugi); + } + + @Test (timeout=600000) + public void testMapTasksOnlySleepJobs() throws Exception { + Configuration configuration = GridmixTestUtils.mrvl.getConfig(); + + DebugJobProducer jobProducer = new DebugJobProducer(5, configuration); + configuration.setBoolean(SleepJob.SLEEPJOB_MAPTASK_ONLY, true); + + UserGroupInformation ugi = UserGroupInformation.getLoginUser(); + JobStory story; + int seq = 1; + while ((story = jobProducer.getNextJob()) != null) { + GridmixJob gridmixJob = JobCreator.SLEEPJOB.createGridmixJob(configuration, 0, + story, new Path("ignored"), ugi, seq++); + gridmixJob.buildSplits(null); + Job job = gridmixJob.call(); + assertEquals(0, job.getNumReduceTasks()); + } + jobProducer.close(); + assertEquals(6, seq); + } + + // test Serial submit + @Test (timeout=600000) + public void testSerialSubmit() throws Exception { + // set policy + policy = GridmixJobSubmissionPolicy.SERIAL; + LOG.info("Serial started at " + System.currentTimeMillis()); + doSubmission(JobCreator.SLEEPJOB.name(), false); + LOG.info("Serial ended at " + System.currentTimeMillis()); + } + + @Test (timeout=600000) + public void testReplaySubmit() throws Exception { + policy = GridmixJobSubmissionPolicy.REPLAY; + LOG.info(" Replay started at " + System.currentTimeMillis()); + doSubmission(JobCreator.SLEEPJOB.name(), false); + LOG.info(" Replay ended at " + System.currentTimeMillis()); + } + + @Test (timeout=600000) + public void testStressSubmit() throws Exception { + policy = GridmixJobSubmissionPolicy.STRESS; + LOG.info(" Replay started at " + System.currentTimeMillis()); + doSubmission(JobCreator.SLEEPJOB.name(), false); + LOG.info(" Replay ended at " + System.currentTimeMillis()); + } + + private void testRandomLocation(int locations, int njobs, + UserGroupInformation ugi) throws Exception { + Configuration configuration = new Configuration(); + + DebugJobProducer jobProducer = new DebugJobProducer(njobs, configuration); + Configuration jconf = GridmixTestUtils.mrvl.getConfig(); + jconf.setInt(JobCreator.SLEEPJOB_RANDOM_LOCATIONS, locations); + + JobStory story; + int seq = 1; + while ((story = jobProducer.getNextJob()) != null) { + GridmixJob gridmixJob = JobCreator.SLEEPJOB.createGridmixJob(jconf, 0, + story, new Path("ignored"), ugi, seq++); + gridmixJob.buildSplits(null); + List splits = new SleepJob.SleepInputFormat() + .getSplits(gridmixJob.getJob()); + for (InputSplit split : splits) { + assertEquals(locations, split.getLocations().length); + } + } + jobProducer.close(); + } + +} diff --git a/hadoop-tools/hadoop-gridmix/src/test/resources/data/wordcount.json b/hadoop-tools/hadoop-gridmix/src/test/resources/data/wordcount.json new file mode 100644 index 00000000000..1b7ccf860b6 --- /dev/null +++ b/hadoop-tools/hadoop-gridmix/src/test/resources/data/wordcount.json @@ -0,0 +1,414 @@ +{ + "priority" : "NORMAL", + "jobID" : "job_201009241532_0001", + "user" : "johndoe", + "jobName" : "WordCount", + "mapTasks" : [ { + "startTime" : 1285322651360, + "taskID" : "task_201009241532_0001_m_000000", + "taskType" : "MAP", + "attempts" : [ { + "location" : null, + "hostName" : "/default-rack/foo.example.com", + "startTime" : 1285322651366, + "finishTime" : 1285322658262, + "result" : "SUCCESS", + "attemptID" : "attempt_201009241532_0001_m_000000_0", + "shuffleFinished" : -1, + "sortFinished" : -1, + "hdfsBytesRead" : 704270, + "hdfsBytesWritten" : -1, + "fileBytesRead" : -1, + "fileBytesWritten" : 48266, + "mapInputRecords" : 13427, + "mapOutputBytes" : 1182333, + "mapOutputRecords" : 126063, + "combineInputRecords" : 126063, + "reduceInputGroups" : -1, + "reduceInputRecords" : -1, + "reduceShuffleBytes" : -1, + "reduceOutputRecords" : -1, + "spilledRecords" : 6612, + "mapInputBytes" : -1 + } ], + "finishTime" : 1285322660778, + "preferredLocations" : [ { + "layers" : [ "default-rack", "foo.example.com" ] + } ], + "taskStatus" : "SUCCESS", + "inputBytes" : 704270, + "inputRecords" : 13427, + "outputBytes" : 48266, + "outputRecords" : 126063 + }, { + "startTime" : 1285322651361, + "taskID" : "task_201009241532_0001_m_000001", + "taskType" : "MAP", + "attempts" : [ { + "location" : null, + "hostName" : "/default-rack/foo.example.com", + "startTime" : 1285322651378, + "finishTime" : 1285322657906, + "result" : "SUCCESS", + "attemptID" : "attempt_201009241532_0001_m_000001_0", + "shuffleFinished" : -1, + "sortFinished" : -1, + "hdfsBytesRead" : 577214, + "hdfsBytesWritten" : -1, + "fileBytesRead" : -1, + "fileBytesWritten" : 58143, + "mapInputRecords" : 13015, + "mapOutputBytes" : 985534, + "mapOutputRecords" : 108400, + "combineInputRecords" : 108400, + "reduceInputGroups" : -1, + "reduceInputRecords" : -1, + "reduceShuffleBytes" : -1, + "reduceOutputRecords" : -1, + "spilledRecords" : 8214, + "mapInputBytes" : -1 + } ], + "finishTime" : 1285322660781, + "preferredLocations" : [ { + "layers" : [ "default-rack", "foo.example.com" ] + } ], + "taskStatus" : "SUCCESS", + "inputBytes" : 577214, + "inputRecords" : 13015, + "outputBytes" : 58143, + "outputRecords" : 108400 + }, { + "startTime" : 1285322660789, + "taskID" : "task_201009241532_0001_m_000002", + "taskType" : "MAP", + "attempts" : [ { + "location" : null, + "hostName" : "/default-rack/foo.example.com", + "startTime" : 1285322660807, + "finishTime" : 1285322664865, + "result" : "SUCCESS", + "attemptID" : "attempt_201009241532_0001_m_000002_0", + "shuffleFinished" : -1, + "sortFinished" : -1, + "hdfsBytesRead" : 163907, + "hdfsBytesWritten" : -1, + "fileBytesRead" : -1, + "fileBytesWritten" : 21510, + "mapInputRecords" : 3736, + "mapOutputBytes" : 275796, + "mapOutputRecords" : 30528, + "combineInputRecords" : 30528, + "reduceInputGroups" : -1, + "reduceInputRecords" : -1, + "reduceShuffleBytes" : -1, + "reduceOutputRecords" : -1, + "spilledRecords" : 3040, + "mapInputBytes" : -1 + } ], + "finishTime" : 1285322666805, + "preferredLocations" : [ { + "layers" : [ "default-rack", "foo.example.com" ] + } ], + "taskStatus" : "SUCCESS", + "inputBytes" : 163907, + "inputRecords" : 3736, + "outputBytes" : 21510, + "outputRecords" : 30528 + } ], + "finishTime" : 1285322675837, + "reduceTasks" : [ { + "startTime" : 1285322660790, + "taskID" : "task_201009241532_0001_r_000000", + "taskType" : "REDUCE", + "attempts" : [ { + "location" : null, + "hostName" : "/default-rack/foo.example.com", + "startTime" : 1285322660807, + "finishTime" : 1285322670759, + "result" : "SUCCESS", + "attemptID" : "attempt_201009241532_0001_r_000000_0", + "shuffleFinished" : 1285322667962, + "sortFinished" : 1285322668146, + "hdfsBytesRead" : -1, + "hdfsBytesWritten" : 122793, + "fileBytesRead" : 111026, + "fileBytesWritten" : 111026, + "mapInputRecords" : -1, + "mapOutputBytes" : -1, + "mapOutputRecords" : -1, + "combineInputRecords" : 0, + "reduceInputGroups" : 11713, + "reduceInputRecords" : 17866, + "reduceShuffleBytes" : 127823, + "reduceOutputRecords" : 11713, + "spilledRecords" : 17866, + "mapInputBytes" : -1 + } ], + "finishTime" : 1285322672821, + "preferredLocations" : [ ], + "taskStatus" : "SUCCESS", + "inputBytes" : 127823, + "inputRecords" : 17866, + "outputBytes" : 122793, + "outputRecords" : 11713 + } ], + "submitTime" : 1285322645148, + "launchTime" : 1285322645614, + "totalMaps" : 3, + "totalReduces" : 1, + "otherTasks" : [ { + "startTime" : 1285322648294, + "taskID" : "task_201009241532_0001_m_000004", + "taskType" : "SETUP", + "attempts" : [ { + "location" : null, + "hostName" : "/default-rack/foo.example.com", + "startTime" : 1285322648482, + "finishTime" : 1285322649588, + "result" : "SUCCESS", + "attemptID" : "attempt_201009241532_0001_m_000004_0", + "shuffleFinished" : -1, + "sortFinished" : -1, + "hdfsBytesRead" : -1, + "hdfsBytesWritten" : -1, + "fileBytesRead" : -1, + "fileBytesWritten" : -1, + "mapInputRecords" : -1, + "mapOutputBytes" : -1, + "mapOutputRecords" : -1, + "combineInputRecords" : -1, + "reduceInputGroups" : -1, + "reduceInputRecords" : -1, + "reduceShuffleBytes" : -1, + "reduceOutputRecords" : -1, + "spilledRecords" : 0, + "mapInputBytes" : -1 + } ], + "finishTime" : 1285322651351, + "preferredLocations" : [ ], + "taskStatus" : "SUCCESS", + "inputBytes" : -1, + "inputRecords" : -1, + "outputBytes" : -1, + "outputRecords" : -1 + }, { + "startTime" : 1285322672829, + "taskID" : "task_201009241532_0001_m_000003", + "taskType" : "CLEANUP", + "attempts" : [ { + "location" : null, + "hostName" : "/default-rack/foo.example.com", + "startTime" : 1285322672838, + "finishTime" : 1285322673971, + "result" : "SUCCESS", + "attemptID" : "attempt_201009241532_0001_m_000003_0", + "shuffleFinished" : -1, + "sortFinished" : -1, + "hdfsBytesRead" : -1, + "hdfsBytesWritten" : -1, + "fileBytesRead" : -1, + "fileBytesWritten" : -1, + "mapInputRecords" : -1, + "mapOutputBytes" : -1, + "mapOutputRecords" : -1, + "combineInputRecords" : -1, + "reduceInputGroups" : -1, + "reduceInputRecords" : -1, + "reduceShuffleBytes" : -1, + "reduceOutputRecords" : -1, + "spilledRecords" : 0, + "mapInputBytes" : -1 + } ], + "finishTime" : 1285322675835, + "preferredLocations" : [ ], + "taskStatus" : "SUCCESS", + "inputBytes" : -1, + "inputRecords" : -1, + "outputBytes" : -1, + "outputRecords" : -1 + } ], + "computonsPerMapInputByte" : -1, + "computonsPerMapOutputByte" : -1, + "computonsPerReduceInputByte" : -1, + "computonsPerReduceOutputByte" : -1, + "heapMegabytes" : 1024, + "outcome" : "SUCCESS", + "jobtype" : "JAVA", + "directDependantJobs" : [ ], + "successfulMapAttemptCDFs" : [ { + "maximum" : 9223372036854775807, + "minimum" : -9223372036854775808, + "rankings" : [ ], + "numberValues" : 0 + }, { + "maximum" : 9223372036854775807, + "minimum" : -9223372036854775808, + "rankings" : [ ], + "numberValues" : 0 + }, { + "maximum" : 9223372036854775807, + "minimum" : -9223372036854775808, + "rankings" : [ ], + "numberValues" : 0 + }, { + "maximum" : 6896, + "minimum" : 4058, + "rankings" : [ { + "datum" : 4058, + "relativeRanking" : 0.05 + }, { + "datum" : 4058, + "relativeRanking" : 0.1 + }, { + "datum" : 4058, + "relativeRanking" : 0.15 + }, { + "datum" : 4058, + "relativeRanking" : 0.2 + }, { + "datum" : 4058, + "relativeRanking" : 0.25 + }, { + "datum" : 4058, + "relativeRanking" : 0.3 + }, { + "datum" : 4058, + "relativeRanking" : 0.35 + }, { + "datum" : 4058, + "relativeRanking" : 0.4 + }, { + "datum" : 4058, + "relativeRanking" : 0.45 + }, { + "datum" : 4058, + "relativeRanking" : 0.5 + }, { + "datum" : 4058, + "relativeRanking" : 0.55 + }, { + "datum" : 4058, + "relativeRanking" : 0.6 + }, { + "datum" : 4058, + "relativeRanking" : 0.65 + }, { + "datum" : 6528, + "relativeRanking" : 0.7 + }, { + "datum" : 6528, + "relativeRanking" : 0.75 + }, { + "datum" : 6528, + "relativeRanking" : 0.8 + }, { + "datum" : 6528, + "relativeRanking" : 0.85 + }, { + "datum" : 6528, + "relativeRanking" : 0.9 + }, { + "datum" : 6528, + "relativeRanking" : 0.95 + } ], + "numberValues" : 3 + } ], + "failedMapAttemptCDFs" : [ { + "maximum" : 9223372036854775807, + "minimum" : -9223372036854775808, + "rankings" : [ ], + "numberValues" : 0 + }, { + "maximum" : 9223372036854775807, + "minimum" : -9223372036854775808, + "rankings" : [ ], + "numberValues" : 0 + }, { + "maximum" : 9223372036854775807, + "minimum" : -9223372036854775808, + "rankings" : [ ], + "numberValues" : 0 + }, { + "maximum" : 9223372036854775807, + "minimum" : -9223372036854775808, + "rankings" : [ ], + "numberValues" : 0 + } ], + "successfulReduceAttemptCDF" : { + "maximum" : 9952, + "minimum" : 9952, + "rankings" : [ { + "datum" : 9952, + "relativeRanking" : 0.05 + }, { + "datum" : 9952, + "relativeRanking" : 0.1 + }, { + "datum" : 9952, + "relativeRanking" : 0.15 + }, { + "datum" : 9952, + "relativeRanking" : 0.2 + }, { + "datum" : 9952, + "relativeRanking" : 0.25 + }, { + "datum" : 9952, + "relativeRanking" : 0.3 + }, { + "datum" : 9952, + "relativeRanking" : 0.35 + }, { + "datum" : 9952, + "relativeRanking" : 0.4 + }, { + "datum" : 9952, + "relativeRanking" : 0.45 + }, { + "datum" : 9952, + "relativeRanking" : 0.5 + }, { + "datum" : 9952, + "relativeRanking" : 0.55 + }, { + "datum" : 9952, + "relativeRanking" : 0.6 + }, { + "datum" : 9952, + "relativeRanking" : 0.65 + }, { + "datum" : 9952, + "relativeRanking" : 0.7 + }, { + "datum" : 9952, + "relativeRanking" : 0.75 + }, { + "datum" : 9952, + "relativeRanking" : 0.8 + }, { + "datum" : 9952, + "relativeRanking" : 0.85 + }, { + "datum" : 9952, + "relativeRanking" : 0.9 + }, { + "datum" : 9952, + "relativeRanking" : 0.95 + } ], + "numberValues" : 1 + }, + "failedReduceAttemptCDF" : { + "maximum" : 9223372036854775807, + "minimum" : -9223372036854775808, + "rankings" : [ ], + "numberValues" : 0 + }, + "mapperTriesToSucceed" : [ 1.0 ], + "failedMapperFraction" : 0.0, + "relativeTime" : 0, + "queue" : "default", + "clusterMapMB" : -1, + "clusterReduceMB" : -1, + "jobMapMB" : 1024, + "jobReduceMB" : 1024 +} diff --git a/hadoop-tools/hadoop-gridmix/src/test/resources/data/wordcount2.json b/hadoop-tools/hadoop-gridmix/src/test/resources/data/wordcount2.json new file mode 100644 index 00000000000..87fcfb92ee9 --- /dev/null +++ b/hadoop-tools/hadoop-gridmix/src/test/resources/data/wordcount2.json @@ -0,0 +1,828 @@ +{ + "priority" : "NORMAL", + "jobID" : "job_201009241532_0001", + "user" : "johndoe", + "jobName" : "WordCount", + "mapTasks" : [ { + "startTime" : 1285322651360, + "taskID" : "task_201009241532_0001_m_000000", + "taskType" : "MAP", + "attempts" : [ { + "location" : null, + "hostName" : "/default-rack/foo.example.com", + "startTime" : 1285322651366, + "finishTime" : 1285322658262, + "result" : "SUCCESS", + "attemptID" : "attempt_201009241532_0001_m_000000_0", + "shuffleFinished" : -1, + "sortFinished" : -1, + "hdfsBytesRead" : 704270, + "hdfsBytesWritten" : -1, + "fileBytesRead" : -1, + "fileBytesWritten" : 48266, + "mapInputRecords" : 13427, + "mapOutputBytes" : 1182333, + "mapOutputRecords" : 126063, + "combineInputRecords" : 126063, + "reduceInputGroups" : -1, + "reduceInputRecords" : -1, + "reduceShuffleBytes" : -1, + "reduceOutputRecords" : -1, + "spilledRecords" : 6612, + "mapInputBytes" : -1 + } ], + "finishTime" : 1285322660778, + "preferredLocations" : [ { + "layers" : [ "default-rack", "foo.example.com" ] + } ], + "taskStatus" : "SUCCESS", + "inputBytes" : 704270, + "inputRecords" : 13427, + "outputBytes" : 48266, + "outputRecords" : 126063 + }, { + "startTime" : 1285322651361, + "taskID" : "task_201009241532_0001_m_000001", + "taskType" : "MAP", + "attempts" : [ { + "location" : null, + "hostName" : "/default-rack/foo.example.com", + "startTime" : 1285322651378, + "finishTime" : 1285322657906, + "result" : "SUCCESS", + "attemptID" : "attempt_201009241532_0001_m_000001_0", + "shuffleFinished" : -1, + "sortFinished" : -1, + "hdfsBytesRead" : 577214, + "hdfsBytesWritten" : -1, + "fileBytesRead" : -1, + "fileBytesWritten" : 58143, + "mapInputRecords" : 13015, + "mapOutputBytes" : 985534, + "mapOutputRecords" : 108400, + "combineInputRecords" : 108400, + "reduceInputGroups" : -1, + "reduceInputRecords" : -1, + "reduceShuffleBytes" : -1, + "reduceOutputRecords" : -1, + "spilledRecords" : 8214, + "mapInputBytes" : -1 + } ], + "finishTime" : 1285322660781, + "preferredLocations" : [ { + "layers" : [ "default-rack", "foo.example.com" ] + } ], + "taskStatus" : "SUCCESS", + "inputBytes" : 577214, + "inputRecords" : 13015, + "outputBytes" : 58143, + "outputRecords" : 108400 + }, { + "startTime" : 1285322660789, + "taskID" : "task_201009241532_0001_m_000002", + "taskType" : "MAP", + "attempts" : [ { + "location" : null, + "hostName" : "/default-rack/foo.example.com", + "startTime" : 1285322660807, + "finishTime" : 1285322664865, + "result" : "SUCCESS", + "attemptID" : "attempt_201009241532_0001_m_000002_0", + "shuffleFinished" : -1, + "sortFinished" : -1, + "hdfsBytesRead" : 163907, + "hdfsBytesWritten" : -1, + "fileBytesRead" : -1, + "fileBytesWritten" : 21510, + "mapInputRecords" : 3736, + "mapOutputBytes" : 275796, + "mapOutputRecords" : 30528, + "combineInputRecords" : 30528, + "reduceInputGroups" : -1, + "reduceInputRecords" : -1, + "reduceShuffleBytes" : -1, + "reduceOutputRecords" : -1, + "spilledRecords" : 3040, + "mapInputBytes" : -1 + } ], + "finishTime" : 1285322666805, + "preferredLocations" : [ { + "layers" : [ "default-rack", "foo.example.com" ] + } ], + "taskStatus" : "SUCCESS", + "inputBytes" : 163907, + "inputRecords" : 3736, + "outputBytes" : 21510, + "outputRecords" : 30528 + } ], + "finishTime" : 1285322675837, + "reduceTasks" : [ { + "startTime" : 1285322660790, + "taskID" : "task_201009241532_0001_r_000000", + "taskType" : "REDUCE", + "attempts" : [ { + "location" : null, + "hostName" : "/default-rack/foo.example.com", + "startTime" : 1285322660807, + "finishTime" : 1285322670759, + "result" : "SUCCESS", + "attemptID" : "attempt_201009241532_0001_r_000000_0", + "shuffleFinished" : 1285322667962, + "sortFinished" : 1285322668146, + "hdfsBytesRead" : -1, + "hdfsBytesWritten" : 122793, + "fileBytesRead" : 111026, + "fileBytesWritten" : 111026, + "mapInputRecords" : -1, + "mapOutputBytes" : -1, + "mapOutputRecords" : -1, + "combineInputRecords" : 0, + "reduceInputGroups" : 11713, + "reduceInputRecords" : 17866, + "reduceShuffleBytes" : 127823, + "reduceOutputRecords" : 11713, + "spilledRecords" : 17866, + "mapInputBytes" : -1 + } ], + "finishTime" : 1285322672821, + "preferredLocations" : [ ], + "taskStatus" : "SUCCESS", + "inputBytes" : 127823, + "inputRecords" : 17866, + "outputBytes" : 122793, + "outputRecords" : 11713 + } ], + "submitTime" : 1285322645148, + "launchTime" : 1285322645614, + "totalMaps" : 3, + "totalReduces" : 1, + "otherTasks" : [ { + "startTime" : 1285322648294, + "taskID" : "task_201009241532_0001_m_000004", + "taskType" : "SETUP", + "attempts" : [ { + "location" : null, + "hostName" : "/default-rack/foo.example.com", + "startTime" : 1285322648482, + "finishTime" : 1285322649588, + "result" : "SUCCESS", + "attemptID" : "attempt_201009241532_0001_m_000004_0", + "shuffleFinished" : -1, + "sortFinished" : -1, + "hdfsBytesRead" : -1, + "hdfsBytesWritten" : -1, + "fileBytesRead" : -1, + "fileBytesWritten" : -1, + "mapInputRecords" : -1, + "mapOutputBytes" : -1, + "mapOutputRecords" : -1, + "combineInputRecords" : -1, + "reduceInputGroups" : -1, + "reduceInputRecords" : -1, + "reduceShuffleBytes" : -1, + "reduceOutputRecords" : -1, + "spilledRecords" : 0, + "mapInputBytes" : -1 + } ], + "finishTime" : 1285322651351, + "preferredLocations" : [ ], + "taskStatus" : "SUCCESS", + "inputBytes" : -1, + "inputRecords" : -1, + "outputBytes" : -1, + "outputRecords" : -1 + }, { + "startTime" : 1285322672829, + "taskID" : "task_201009241532_0001_m_000003", + "taskType" : "CLEANUP", + "attempts" : [ { + "location" : null, + "hostName" : "/default-rack/foo.example.com", + "startTime" : 1285322672838, + "finishTime" : 1285322673971, + "result" : "SUCCESS", + "attemptID" : "attempt_201009241532_0001_m_000003_0", + "shuffleFinished" : -1, + "sortFinished" : -1, + "hdfsBytesRead" : -1, + "hdfsBytesWritten" : -1, + "fileBytesRead" : -1, + "fileBytesWritten" : -1, + "mapInputRecords" : -1, + "mapOutputBytes" : -1, + "mapOutputRecords" : -1, + "combineInputRecords" : -1, + "reduceInputGroups" : -1, + "reduceInputRecords" : -1, + "reduceShuffleBytes" : -1, + "reduceOutputRecords" : -1, + "spilledRecords" : 0, + "mapInputBytes" : -1 + } ], + "finishTime" : 1285322675835, + "preferredLocations" : [ ], + "taskStatus" : "SUCCESS", + "inputBytes" : -1, + "inputRecords" : -1, + "outputBytes" : -1, + "outputRecords" : -1 + } ], + "computonsPerMapInputByte" : -1, + "computonsPerMapOutputByte" : -1, + "computonsPerReduceInputByte" : -1, + "computonsPerReduceOutputByte" : -1, + "heapMegabytes" : 1024, + "outcome" : "SUCCESS", + "jobtype" : "JAVA", + "directDependantJobs" : [ ], + "successfulMapAttemptCDFs" : [ { + "maximum" : 9223372036854775807, + "minimum" : -9223372036854775808, + "rankings" : [ ], + "numberValues" : 0 + }, { + "maximum" : 9223372036854775807, + "minimum" : -9223372036854775808, + "rankings" : [ ], + "numberValues" : 0 + }, { + "maximum" : 9223372036854775807, + "minimum" : -9223372036854775808, + "rankings" : [ ], + "numberValues" : 0 + }, { + "maximum" : 6896, + "minimum" : 4058, + "rankings" : [ { + "datum" : 4058, + "relativeRanking" : 0.05 + }, { + "datum" : 4058, + "relativeRanking" : 0.1 + }, { + "datum" : 4058, + "relativeRanking" : 0.15 + }, { + "datum" : 4058, + "relativeRanking" : 0.2 + }, { + "datum" : 4058, + "relativeRanking" : 0.25 + }, { + "datum" : 4058, + "relativeRanking" : 0.3 + }, { + "datum" : 4058, + "relativeRanking" : 0.35 + }, { + "datum" : 4058, + "relativeRanking" : 0.4 + }, { + "datum" : 4058, + "relativeRanking" : 0.45 + }, { + "datum" : 4058, + "relativeRanking" : 0.5 + }, { + "datum" : 4058, + "relativeRanking" : 0.55 + }, { + "datum" : 4058, + "relativeRanking" : 0.6 + }, { + "datum" : 4058, + "relativeRanking" : 0.65 + }, { + "datum" : 6528, + "relativeRanking" : 0.7 + }, { + "datum" : 6528, + "relativeRanking" : 0.75 + }, { + "datum" : 6528, + "relativeRanking" : 0.8 + }, { + "datum" : 6528, + "relativeRanking" : 0.85 + }, { + "datum" : 6528, + "relativeRanking" : 0.9 + }, { + "datum" : 6528, + "relativeRanking" : 0.95 + } ], + "numberValues" : 3 + } ], + "failedMapAttemptCDFs" : [ { + "maximum" : 9223372036854775807, + "minimum" : -9223372036854775808, + "rankings" : [ ], + "numberValues" : 0 + }, { + "maximum" : 9223372036854775807, + "minimum" : -9223372036854775808, + "rankings" : [ ], + "numberValues" : 0 + }, { + "maximum" : 9223372036854775807, + "minimum" : -9223372036854775808, + "rankings" : [ ], + "numberValues" : 0 + }, { + "maximum" : 9223372036854775807, + "minimum" : -9223372036854775808, + "rankings" : [ ], + "numberValues" : 0 + } ], + "successfulReduceAttemptCDF" : { + "maximum" : 9952, + "minimum" : 9952, + "rankings" : [ { + "datum" : 9952, + "relativeRanking" : 0.05 + }, { + "datum" : 9952, + "relativeRanking" : 0.1 + }, { + "datum" : 9952, + "relativeRanking" : 0.15 + }, { + "datum" : 9952, + "relativeRanking" : 0.2 + }, { + "datum" : 9952, + "relativeRanking" : 0.25 + }, { + "datum" : 9952, + "relativeRanking" : 0.3 + }, { + "datum" : 9952, + "relativeRanking" : 0.35 + }, { + "datum" : 9952, + "relativeRanking" : 0.4 + }, { + "datum" : 9952, + "relativeRanking" : 0.45 + }, { + "datum" : 9952, + "relativeRanking" : 0.5 + }, { + "datum" : 9952, + "relativeRanking" : 0.55 + }, { + "datum" : 9952, + "relativeRanking" : 0.6 + }, { + "datum" : 9952, + "relativeRanking" : 0.65 + }, { + "datum" : 9952, + "relativeRanking" : 0.7 + }, { + "datum" : 9952, + "relativeRanking" : 0.75 + }, { + "datum" : 9952, + "relativeRanking" : 0.8 + }, { + "datum" : 9952, + "relativeRanking" : 0.85 + }, { + "datum" : 9952, + "relativeRanking" : 0.9 + }, { + "datum" : 9952, + "relativeRanking" : 0.95 + } ], + "numberValues" : 1 + }, + "failedReduceAttemptCDF" : { + "maximum" : 9223372036854775807, + "minimum" : -9223372036854775808, + "rankings" : [ ], + "numberValues" : 0 + }, + "mapperTriesToSucceed" : [ 1.0 ], + "failedMapperFraction" : 0.0, + "relativeTime" : 0, + "queue" : "default", + "clusterMapMB" : -1, + "clusterReduceMB" : -1, + "jobMapMB" : 1024, + "jobReduceMB" : 1024 +} +{ + "priority" : "NORMAL", + "jobID" : "job_201009241532_0001", + "user" : "johndoe", + "jobName" : "WordCount", + "mapTasks" : [ { + "startTime" : 1285322651360, + "taskID" : "task_201009241532_0001_m_000000", + "taskType" : "MAP", + "attempts" : [ { + "location" : null, + "hostName" : "/default-rack/foo.example.com", + "startTime" : 1285322651366, + "finishTime" : 1285322658262, + "result" : "SUCCESS", + "attemptID" : "attempt_201009241532_0001_m_000000_0", + "shuffleFinished" : -1, + "sortFinished" : -1, + "hdfsBytesRead" : 704270, + "hdfsBytesWritten" : -1, + "fileBytesRead" : -1, + "fileBytesWritten" : 48266, + "mapInputRecords" : 13427, + "mapOutputBytes" : 1182333, + "mapOutputRecords" : 126063, + "combineInputRecords" : 126063, + "reduceInputGroups" : -1, + "reduceInputRecords" : -1, + "reduceShuffleBytes" : -1, + "reduceOutputRecords" : -1, + "spilledRecords" : 6612, + "mapInputBytes" : -1 + } ], + "finishTime" : 1285322660778, + "preferredLocations" : [ { + "layers" : [ "default-rack", "foo.example.com" ] + } ], + "taskStatus" : "SUCCESS", + "inputBytes" : 704270, + "inputRecords" : 13427, + "outputBytes" : 48266, + "outputRecords" : 126063 + }, { + "startTime" : 1285322651361, + "taskID" : "task_201009241532_0001_m_000001", + "taskType" : "MAP", + "attempts" : [ { + "location" : null, + "hostName" : "/default-rack/foo.example.com", + "startTime" : 1285322651378, + "finishTime" : 1285322657906, + "result" : "SUCCESS", + "attemptID" : "attempt_201009241532_0001_m_000001_0", + "shuffleFinished" : -1, + "sortFinished" : -1, + "hdfsBytesRead" : 577214, + "hdfsBytesWritten" : -1, + "fileBytesRead" : -1, + "fileBytesWritten" : 58143, + "mapInputRecords" : 13015, + "mapOutputBytes" : 985534, + "mapOutputRecords" : 108400, + "combineInputRecords" : 108400, + "reduceInputGroups" : -1, + "reduceInputRecords" : -1, + "reduceShuffleBytes" : -1, + "reduceOutputRecords" : -1, + "spilledRecords" : 8214, + "mapInputBytes" : -1 + } ], + "finishTime" : 1285322660781, + "preferredLocations" : [ { + "layers" : [ "default-rack", "foo.example.com" ] + } ], + "taskStatus" : "SUCCESS", + "inputBytes" : 577214, + "inputRecords" : 13015, + "outputBytes" : 58143, + "outputRecords" : 108400 + }, { + "startTime" : 1285322660789, + "taskID" : "task_201009241532_0001_m_000002", + "taskType" : "MAP", + "attempts" : [ { + "location" : null, + "hostName" : "/default-rack/foo.example.com", + "startTime" : 1285322660807, + "finishTime" : 1285322664865, + "result" : "SUCCESS", + "attemptID" : "attempt_201009241532_0001_m_000002_0", + "shuffleFinished" : -1, + "sortFinished" : -1, + "hdfsBytesRead" : 163907, + "hdfsBytesWritten" : -1, + "fileBytesRead" : -1, + "fileBytesWritten" : 21510, + "mapInputRecords" : 3736, + "mapOutputBytes" : 275796, + "mapOutputRecords" : 30528, + "combineInputRecords" : 30528, + "reduceInputGroups" : -1, + "reduceInputRecords" : -1, + "reduceShuffleBytes" : -1, + "reduceOutputRecords" : -1, + "spilledRecords" : 3040, + "mapInputBytes" : -1 + } ], + "finishTime" : 1285322666805, + "preferredLocations" : [ { + "layers" : [ "default-rack", "foo.example.com" ] + } ], + "taskStatus" : "SUCCESS", + "inputBytes" : 163907, + "inputRecords" : 3736, + "outputBytes" : 21510, + "outputRecords" : 30528 + } ], + "finishTime" : 1285322675837, + "reduceTasks" : [ { + "startTime" : 1285322660790, + "taskID" : "task_201009241532_0001_r_000000", + "taskType" : "REDUCE", + "attempts" : [ { + "location" : null, + "hostName" : "/default-rack/foo.example.com", + "startTime" : 1285322660807, + "finishTime" : 1285322670759, + "result" : "SUCCESS", + "attemptID" : "attempt_201009241532_0001_r_000000_0", + "shuffleFinished" : 1285322667962, + "sortFinished" : 1285322668146, + "hdfsBytesRead" : -1, + "hdfsBytesWritten" : 122793, + "fileBytesRead" : 111026, + "fileBytesWritten" : 111026, + "mapInputRecords" : -1, + "mapOutputBytes" : -1, + "mapOutputRecords" : -1, + "combineInputRecords" : 0, + "reduceInputGroups" : 11713, + "reduceInputRecords" : 17866, + "reduceShuffleBytes" : 127823, + "reduceOutputRecords" : 11713, + "spilledRecords" : 17866, + "mapInputBytes" : -1 + } ], + "finishTime" : 1285322672821, + "preferredLocations" : [ ], + "taskStatus" : "SUCCESS", + "inputBytes" : 127823, + "inputRecords" : 17866, + "outputBytes" : 122793, + "outputRecords" : 11713 + } ], + "submitTime" : 1285322645148, + "launchTime" : 1285322645614, + "totalMaps" : 3, + "totalReduces" : 1, + "otherTasks" : [ { + "startTime" : 1285322648294, + "taskID" : "task_201009241532_0001_m_000004", + "taskType" : "SETUP", + "attempts" : [ { + "location" : null, + "hostName" : "/default-rack/foo.example.com", + "startTime" : 1285322648482, + "finishTime" : 1285322649588, + "result" : "SUCCESS", + "attemptID" : "attempt_201009241532_0001_m_000004_0", + "shuffleFinished" : -1, + "sortFinished" : -1, + "hdfsBytesRead" : -1, + "hdfsBytesWritten" : -1, + "fileBytesRead" : -1, + "fileBytesWritten" : -1, + "mapInputRecords" : -1, + "mapOutputBytes" : -1, + "mapOutputRecords" : -1, + "combineInputRecords" : -1, + "reduceInputGroups" : -1, + "reduceInputRecords" : -1, + "reduceShuffleBytes" : -1, + "reduceOutputRecords" : -1, + "spilledRecords" : 0, + "mapInputBytes" : -1 + } ], + "finishTime" : 1285322651351, + "preferredLocations" : [ ], + "taskStatus" : "SUCCESS", + "inputBytes" : -1, + "inputRecords" : -1, + "outputBytes" : -1, + "outputRecords" : -1 + }, { + "startTime" : 1285322672829, + "taskID" : "task_201009241532_0001_m_000003", + "taskType" : "CLEANUP", + "attempts" : [ { + "location" : null, + "hostName" : "/default-rack/foo.example.com", + "startTime" : 1285322672838, + "finishTime" : 1285322673971, + "result" : "SUCCESS", + "attemptID" : "attempt_201009241532_0001_m_000003_0", + "shuffleFinished" : -1, + "sortFinished" : -1, + "hdfsBytesRead" : -1, + "hdfsBytesWritten" : -1, + "fileBytesRead" : -1, + "fileBytesWritten" : -1, + "mapInputRecords" : -1, + "mapOutputBytes" : -1, + "mapOutputRecords" : -1, + "combineInputRecords" : -1, + "reduceInputGroups" : -1, + "reduceInputRecords" : -1, + "reduceShuffleBytes" : -1, + "reduceOutputRecords" : -1, + "spilledRecords" : 0, + "mapInputBytes" : -1 + } ], + "finishTime" : 1285322675835, + "preferredLocations" : [ ], + "taskStatus" : "SUCCESS", + "inputBytes" : -1, + "inputRecords" : -1, + "outputBytes" : -1, + "outputRecords" : -1 + } ], + "computonsPerMapInputByte" : -1, + "computonsPerMapOutputByte" : -1, + "computonsPerReduceInputByte" : -1, + "computonsPerReduceOutputByte" : -1, + "heapMegabytes" : 1024, + "outcome" : "SUCCESS", + "jobtype" : "JAVA", + "directDependantJobs" : [ ], + "successfulMapAttemptCDFs" : [ { + "maximum" : 9223372036854775807, + "minimum" : -9223372036854775808, + "rankings" : [ ], + "numberValues" : 0 + }, { + "maximum" : 9223372036854775807, + "minimum" : -9223372036854775808, + "rankings" : [ ], + "numberValues" : 0 + }, { + "maximum" : 9223372036854775807, + "minimum" : -9223372036854775808, + "rankings" : [ ], + "numberValues" : 0 + }, { + "maximum" : 6896, + "minimum" : 4058, + "rankings" : [ { + "datum" : 4058, + "relativeRanking" : 0.05 + }, { + "datum" : 4058, + "relativeRanking" : 0.1 + }, { + "datum" : 4058, + "relativeRanking" : 0.15 + }, { + "datum" : 4058, + "relativeRanking" : 0.2 + }, { + "datum" : 4058, + "relativeRanking" : 0.25 + }, { + "datum" : 4058, + "relativeRanking" : 0.3 + }, { + "datum" : 4058, + "relativeRanking" : 0.35 + }, { + "datum" : 4058, + "relativeRanking" : 0.4 + }, { + "datum" : 4058, + "relativeRanking" : 0.45 + }, { + "datum" : 4058, + "relativeRanking" : 0.5 + }, { + "datum" : 4058, + "relativeRanking" : 0.55 + }, { + "datum" : 4058, + "relativeRanking" : 0.6 + }, { + "datum" : 4058, + "relativeRanking" : 0.65 + }, { + "datum" : 6528, + "relativeRanking" : 0.7 + }, { + "datum" : 6528, + "relativeRanking" : 0.75 + }, { + "datum" : 6528, + "relativeRanking" : 0.8 + }, { + "datum" : 6528, + "relativeRanking" : 0.85 + }, { + "datum" : 6528, + "relativeRanking" : 0.9 + }, { + "datum" : 6528, + "relativeRanking" : 0.95 + } ], + "numberValues" : 3 + } ], + "failedMapAttemptCDFs" : [ { + "maximum" : 9223372036854775807, + "minimum" : -9223372036854775808, + "rankings" : [ ], + "numberValues" : 0 + }, { + "maximum" : 9223372036854775807, + "minimum" : -9223372036854775808, + "rankings" : [ ], + "numberValues" : 0 + }, { + "maximum" : 9223372036854775807, + "minimum" : -9223372036854775808, + "rankings" : [ ], + "numberValues" : 0 + }, { + "maximum" : 9223372036854775807, + "minimum" : -9223372036854775808, + "rankings" : [ ], + "numberValues" : 0 + } ], + "successfulReduceAttemptCDF" : { + "maximum" : 9952, + "minimum" : 9952, + "rankings" : [ { + "datum" : 9952, + "relativeRanking" : 0.05 + }, { + "datum" : 9952, + "relativeRanking" : 0.1 + }, { + "datum" : 9952, + "relativeRanking" : 0.15 + }, { + "datum" : 9952, + "relativeRanking" : 0.2 + }, { + "datum" : 9952, + "relativeRanking" : 0.25 + }, { + "datum" : 9952, + "relativeRanking" : 0.3 + }, { + "datum" : 9952, + "relativeRanking" : 0.35 + }, { + "datum" : 9952, + "relativeRanking" : 0.4 + }, { + "datum" : 9952, + "relativeRanking" : 0.45 + }, { + "datum" : 9952, + "relativeRanking" : 0.5 + }, { + "datum" : 9952, + "relativeRanking" : 0.55 + }, { + "datum" : 9952, + "relativeRanking" : 0.6 + }, { + "datum" : 9952, + "relativeRanking" : 0.65 + }, { + "datum" : 9952, + "relativeRanking" : 0.7 + }, { + "datum" : 9952, + "relativeRanking" : 0.75 + }, { + "datum" : 9952, + "relativeRanking" : 0.8 + }, { + "datum" : 9952, + "relativeRanking" : 0.85 + }, { + "datum" : 9952, + "relativeRanking" : 0.9 + }, { + "datum" : 9952, + "relativeRanking" : 0.95 + } ], + "numberValues" : 1 + }, + "failedReduceAttemptCDF" : { + "maximum" : 9223372036854775807, + "minimum" : -9223372036854775808, + "rankings" : [ ], + "numberValues" : 0 + }, + "mapperTriesToSucceed" : [ 1.0 ], + "failedMapperFraction" : 0.0, + "relativeTime" : 0, + "queue" : "default", + "clusterMapMB" : -1, + "clusterReduceMB" : -1, + "jobMapMB" : 1024, + "jobReduceMB" : 1024 +}