From 8a2073cc61699f5692fcf638f4bae4d1c544870a Mon Sep 17 00:00:00 2001 From: Amar Kamat Date: Thu, 23 Feb 2012 10:41:07 +0000 Subject: [PATCH] MAPREDUCE-3787. [Gridmix] Optimize job monitoring and STRESS mode for faster job submission. (amarrk) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1292736 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 2 + .../mapred/gridmix/ExecutionSummarizer.java | 44 +-- .../apache/hadoop/mapred/gridmix/Gridmix.java | 49 +++- .../hadoop/mapred/gridmix/GridmixJob.java | 9 + .../hadoop/mapred/gridmix/JobFactory.java | 27 +- .../hadoop/mapred/gridmix/JobMonitor.java | 187 ++++++++----- .../hadoop/mapred/gridmix/JobSubmitter.java | 49 +++- .../hadoop/mapred/gridmix/Statistics.java | 125 ++++++--- .../mapred/gridmix/StressJobFactory.java | 252 ++++++++++++++---- .../mapred/gridmix/TestGridmixStatistics.java | 217 +++++++++++++++ .../mapred/gridmix/TestGridmixSubmission.java | 5 +- .../mapred/gridmix/TestGridmixSummary.java | 29 +- .../hadoop/mapred/gridmix/TestSleepJob.java | 5 +- .../documentation/content/xdocs/gridmix.xml | 16 ++ 14 files changed, 806 insertions(+), 210 deletions(-) create mode 100644 hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixStatistics.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 338ac5232ba..b87a71807d7 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -14,6 +14,8 @@ Trunk (unreleased changes) (Plamen Jeliazkov via shv) IMPROVEMENTS + MAPREDUCE-3787. [Gridmix] Optimize job monitoring and STRESS mode for + faster job submission. (amarrk) MAPREDUCE-3481. [Gridmix] Improve Gridmix STRESS mode. (amarrk) diff --git a/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ExecutionSummarizer.java b/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ExecutionSummarizer.java index fc362c5643a..8f9d434eb90 100644 --- a/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ExecutionSummarizer.java +++ b/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ExecutionSummarizer.java @@ -53,6 +53,7 @@ class ExecutionSummarizer implements StatListener { private int numJobsInInputTrace; private int totalSuccessfulJobs; private int totalFailedJobs; + private int totalLostJobs; private int totalMapTasksLaunched; private int totalReduceTasksLaunched; private long totalSimulationTime; @@ -90,31 +91,32 @@ void start(Configuration conf) { simulationStartTime = System.currentTimeMillis(); } - private void processJobState(JobStats stats) throws Exception { + private void processJobState(JobStats stats) { Job job = stats.getJob(); - if (job.isSuccessful()) { - ++totalSuccessfulJobs; - } else { - ++totalFailedJobs; + try { + if (job.isSuccessful()) { + ++totalSuccessfulJobs; + } else { + ++totalFailedJobs; + } + } catch (Exception e) { + // this behavior is consistent with job-monitor which marks the job as + // complete (lost) if the status polling bails out + ++totalLostJobs; } } - private void processJobTasks(JobStats stats) throws Exception { + private void processJobTasks(JobStats stats) { totalMapTasksLaunched += stats.getNoOfMaps(); - Job job = stats.getJob(); - totalReduceTasksLaunched += job.getNumReduceTasks(); + totalReduceTasksLaunched += stats.getNoOfReds(); } private void process(JobStats stats) { - try { - // process the job run state - processJobState(stats); - - // process the tasks information - processJobTasks(stats); - } catch (Exception e) { - LOG.info("Error in processing job " + stats.getJob().getJobID() + "."); - } + // process the job run state + processJobState(stats); + + // process the tasks information + processJobTasks(stats); } @Override @@ -191,6 +193,8 @@ public String toString() { .append(getNumSuccessfulJobs()); builder.append("\nTotal number of failed jobs: ") .append(getNumFailedJobs()); + builder.append("\nTotal number of lost jobs: ") + .append(getNumLostJobs()); builder.append("\nTotal number of map tasks launched: ") .append(getNumMapTasksLaunched()); builder.append("\nTotal number of reduce task launched: ") @@ -266,8 +270,12 @@ protected int getNumFailedJobs() { return totalFailedJobs; } + protected int getNumLostJobs() { + return totalLostJobs; + } + protected int getNumSubmittedJobs() { - return totalSuccessfulJobs + totalFailedJobs; + return totalSuccessfulJobs + totalFailedJobs + totalLostJobs; } protected int getNumMapTasksLaunched() { diff --git a/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java b/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java index b4a0e0b5e2d..6ff922753ca 100644 --- a/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java +++ b/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.mapred.gridmix.GenerateData.DataStatistics; +import org.apache.hadoop.mapred.gridmix.Statistics.JobStats; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; @@ -93,6 +94,31 @@ public class Gridmix extends Configured implements Tool { */ public static final String GRIDMIX_USR_RSV = "gridmix.user.resolve.class"; + /** + * The configuration key which determines the duration for which the + * job-monitor sleeps while polling for job status. + * This value should be specified in milliseconds. + */ + public static final String GRIDMIX_JOBMONITOR_SLEEPTIME_MILLIS = + "gridmix.job-monitor.sleep-time-ms"; + + /** + * Default value for {@link #GRIDMIX_JOBMONITOR_SLEEPTIME_MILLIS}. + */ + public static final int GRIDMIX_JOBMONITOR_SLEEPTIME_MILLIS_DEFAULT = 500; + + /** + * The configuration key which determines the total number of job-status + * monitoring threads. + */ + public static final String GRIDMIX_JOBMONITOR_THREADS = + "gridmix.job-monitor.thread-count"; + + /** + * Default value for {@link #GRIDMIX_JOBMONITOR_THREADS}. + */ + public static final int GRIDMIX_JOBMONITOR_THREADS_DEFAULT = 1; + /** * Configuration property set in simulated job's configuration whose value is * set to the corresponding original job's name. This is not configurable by @@ -185,8 +211,13 @@ void launchGridmixJob(GridmixJob job) submitter.add(job); // TODO add listeners, use for job dependencies - TimeUnit.SECONDS.sleep(10); try { + while (!job.isSubmitted()) { + try { + Thread.sleep(100); // sleep + } catch (InterruptedException ie) {} + } + // wait for completion job.getJob().waitForCompletion(false); } catch (ClassNotFoundException e) { throw new IOException("Internal error", e); @@ -241,7 +272,7 @@ private void startThreads(Configuration conf, String traceIn, Path ioPath, GridmixJobSubmissionPolicy policy = getJobSubmissionPolicy(conf); LOG.info(" Submission policy is " + policy.name()); statistics = new Statistics(conf, policy.getPollingInterval(), startFlag); - monitor = createJobMonitor(statistics); + monitor = createJobMonitor(statistics, conf); int noOfSubmitterThreads = (policy == GridmixJobSubmissionPolicy.SERIAL) ? 1 @@ -276,8 +307,13 @@ private void startThreads(Configuration conf, String traceIn, Path ioPath, } } - protected JobMonitor createJobMonitor(Statistics stats) throws IOException { - return new JobMonitor(stats); + protected JobMonitor createJobMonitor(Statistics stats, Configuration conf) + throws IOException { + int delay = conf.getInt(GRIDMIX_JOBMONITOR_SLEEPTIME_MILLIS, + GRIDMIX_JOBMONITOR_SLEEPTIME_MILLIS_DEFAULT); + int numThreads = conf.getInt(GRIDMIX_JOBMONITOR_THREADS, + GRIDMIX_JOBMONITOR_THREADS_DEFAULT); + return new JobMonitor(delay, TimeUnit.MILLISECONDS, stats, numThreads); } protected JobSubmitter createJobSubmitter(JobMonitor monitor, int threads, @@ -571,12 +607,13 @@ public void run() { if (monitor == null) { return; } - List remainingJobs = monitor.getRemainingJobs(); + List remainingJobs = monitor.getRemainingJobs(); if (remainingJobs.isEmpty()) { return; } LOG.info("Killing running jobs..."); - for (Job job : remainingJobs) { + for (JobStats stats : remainingJobs) { + Job job = stats.getJob(); try { if (!job.isComplete()) { job.killJob(); diff --git a/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java b/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java index 77ec697872f..b2f4a4e7a46 100644 --- a/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java +++ b/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java @@ -72,6 +72,7 @@ protected Formatter initialValue() { } }; + private boolean submitted; protected final int seq; protected final Path outdir; protected final Job job; @@ -412,6 +413,14 @@ JobStory getJobDesc() { return jobdesc; } + void setSubmitted() { + submitted = true; + } + + boolean isSubmitted() { + return submitted; + } + static void pushDescription(int seq, List splits) { if (null != descCache.putIfAbsent(seq, splits)) { throw new IllegalArgumentException("Description exists for id " + seq); diff --git a/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java b/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java index b4737cf8d6b..f5404e842ff 100644 --- a/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java +++ b/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.mapred.gridmix; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -35,6 +36,8 @@ import java.io.IOException; import java.io.InputStream; +import java.util.List; +import java.util.ArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.atomic.AtomicInteger; @@ -179,19 +182,33 @@ private JobStory getNextJobFromTrace() throws IOException { protected JobStory getNextJobFiltered() throws IOException { JobStory job = getNextJobFromTrace(); + // filter out the following jobs + // - unsuccessful jobs + // - jobs with missing submit-time + // - reduce only jobs + // These jobs are not yet supported in Gridmix while (job != null && (job.getOutcome() != Pre21JobHistoryConstants.Values.SUCCESS || - job.getSubmissionTime() < 0)) { + job.getSubmissionTime() < 0 || job.getNumberMaps() == 0)) { if (LOG.isDebugEnabled()) { - String reason = null; + List reason = new ArrayList(); if (job.getOutcome() != Pre21JobHistoryConstants.Values.SUCCESS) { - reason = "STATE (" + job.getOutcome().name() + ") "; + reason.add("STATE (" + job.getOutcome().name() + ")"); } if (job.getSubmissionTime() < 0) { - reason += "SUBMISSION-TIME (" + job.getSubmissionTime() + ")"; + reason.add("SUBMISSION-TIME (" + job.getSubmissionTime() + ")"); } + if (job.getNumberMaps() == 0) { + reason.add("ZERO-MAPS-JOB"); + } + + // TODO This should never happen. Probably we missed something! + if (reason.size() == 0) { + reason.add("N/A"); + } + LOG.debug("Ignoring job " + job.getJobID() + " from the input trace." - + " Reason: " + reason == null ? "N/A" : reason); + + " Reason: " + StringUtils.join(reason, ",")); } job = getNextJobFromTrace(); } diff --git a/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java b/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java index af7331ca16d..64e0c7a0bea 100644 --- a/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java +++ b/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java @@ -24,37 +24,47 @@ import java.util.List; import java.util.Queue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapred.gridmix.Statistics.JobStats; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobStatus; /** - * Component accepting submitted, running jobs and responsible for - * monitoring jobs for success and failure. Once a job is submitted, it is - * polled for status until complete. If a job is complete, then the monitor - * thread returns immediately to the queue. If not, the monitor will sleep - * for some duration. + * Component accepting submitted, running {@link Statistics.JobStats} and + * responsible for monitoring jobs for success and failure. Once a job is + * submitted, it is polled for status until complete. If a job is complete, + * then the monitor thread returns immediately to the queue. If not, the monitor + * will sleep for some duration. + * + * {@link JobMonitor} can be configured to use multiple threads for polling + * the job statuses. Use {@link Gridmix#GRIDMIX_JOBMONITOR_THREADS} to specify + * the total number of monitoring threads. + * + * The duration for which a monitoring thread sleeps if the first job in the + * queue is running can also be configured. Use + * {@link Gridmix#GRIDMIX_JOBMONITOR_SLEEPTIME_MILLIS} to specify a custom + * value. */ -class JobMonitor implements Gridmix.Component { +class JobMonitor implements Gridmix.Component { public static final Log LOG = LogFactory.getLog(JobMonitor.class); - private final Queue mJobs; - private final MonitorThread mThread; - private final BlockingQueue runningJobs; + private final Queue mJobs; + private ExecutorService executor; + private int numPollingThreads; + private final BlockingQueue runningJobs; private final long pollDelayMillis; private Statistics statistics; private boolean graceful = false; private boolean shutdown = false; - public JobMonitor(Statistics statistics) { - this(5,TimeUnit.SECONDS, statistics); - } - /** * Create a JobMonitor that sleeps for the specified duration after * polling a still-running job. @@ -62,30 +72,37 @@ public JobMonitor(Statistics statistics) { * @param unit Time unit for pollDelaySec (rounded to milliseconds) * @param statistics StatCollector , listener to job completion. */ - public JobMonitor(int pollDelay, TimeUnit unit, Statistics statistics) { - mThread = new MonitorThread(); - runningJobs = new LinkedBlockingQueue(); - mJobs = new LinkedList(); + public JobMonitor(int pollDelay, TimeUnit unit, Statistics statistics, + int numPollingThreads) { + executor = Executors.newCachedThreadPool(); + this.numPollingThreads = numPollingThreads; + runningJobs = new LinkedBlockingQueue(); + mJobs = new LinkedList(); this.pollDelayMillis = TimeUnit.MILLISECONDS.convert(pollDelay, unit); this.statistics = statistics; } /** - * Add a job to the polling queue. + * Add a running job's status to the polling queue. */ - public void add(Job job) throws InterruptedException { - runningJobs.put(job); + public void add(JobStats job) throws InterruptedException { + synchronized (runningJobs) { + runningJobs.put(job); + } } /** - * Add a submission failed job , such that it can be communicated + * Add a submission failed job's status, such that it can be communicated * back to serial. * TODO: Cleaner solution for this problem * @param job */ - public void submissionFailed(Job job) { - LOG.info("Job submission failed notification for job " + job.getJobID()); - this.statistics.add(job); + public void submissionFailed(JobStats job) { + String jobID = job.getJob().getConfiguration().get(Gridmix.ORIGINAL_JOB_ID); + LOG.info("Job submission failed notification for job " + jobID); + synchronized (statistics) { + this.statistics.add(job); + } } /** @@ -108,12 +125,9 @@ protected void onFailure(Job job) { * @throws IllegalStateException If monitoring thread is still running. * @return Any jobs submitted and not known to have completed. */ - List getRemainingJobs() { - if (mThread.isAlive()) { - LOG.warn("Internal error: Polling running monitor for jobs"); - } + List getRemainingJobs() { synchronized (mJobs) { - return new ArrayList(mJobs); + return new ArrayList(mJobs); } } @@ -123,19 +137,8 @@ List getRemainingJobs() { */ private class MonitorThread extends Thread { - public MonitorThread() { - super("GridmixJobMonitor"); - } - - /** - * Check a job for success or failure. - */ - public void process(Job job) throws IOException, InterruptedException { - if (job.isSuccessful()) { - onSuccess(job); - } else { - onFailure(job); - } + public MonitorThread(int i) { + super("GridmixJobMonitor-" + i); } @Override @@ -144,10 +147,12 @@ public void run() { boolean shutdown; while (true) { try { - synchronized (mJobs) { - graceful = JobMonitor.this.graceful; - shutdown = JobMonitor.this.shutdown; - runningJobs.drainTo(mJobs); + synchronized (runningJobs) { + synchronized (mJobs) { + graceful = JobMonitor.this.graceful; + shutdown = JobMonitor.this.shutdown; + runningJobs.drainTo(mJobs); + } } // shutdown conditions; either shutdown requested and all jobs @@ -155,26 +160,63 @@ public void run() { // submitted jobs not in the monitored set if (shutdown) { if (!graceful) { - while (!runningJobs.isEmpty()) { - synchronized (mJobs) { - runningJobs.drainTo(mJobs); + synchronized (runningJobs) { + while (!runningJobs.isEmpty()) { + synchronized (mJobs) { + runningJobs.drainTo(mJobs); + } } } break; - } else if (mJobs.isEmpty()) { - break; + } + + synchronized (mJobs) { + if (graceful && mJobs.isEmpty()) { + break; + } } } - while (!mJobs.isEmpty()) { - Job job; - synchronized (mJobs) { - job = mJobs.poll(); - } + JobStats jobStats = null; + synchronized (mJobs) { + jobStats = mJobs.poll(); + } + while (jobStats != null) { + Job job = jobStats.getJob(); + try { - if (job.isComplete()) { - process(job); - statistics.add(job); - continue; + // get the job status + long start = System.currentTimeMillis(); + JobStatus status = job.getStatus(); // cache the job status + long end = System.currentTimeMillis(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Status polling for job " + job.getJobID() + " took " + + (end-start) + "ms."); + } + + // update the job progress + jobStats.updateJobStatus(status); + + // if the job is complete, let others know + if (status.isJobComplete()) { + if (status.getState() == JobStatus.State.SUCCEEDED) { + onSuccess(job); + } else { + onFailure(job); + } + synchronized (statistics) { + statistics.add(jobStats); + } + } else { + // add the running job back and break + synchronized (mJobs) { + if (!mJobs.offer(jobStats)) { + LOG.error("Lost job " + (null == job.getJobName() + ? "" : job.getJobName())); // should never + // happen + } + } + break; } } catch (IOException e) { if (e.getCause() instanceof ClosedByInterruptException) { @@ -186,18 +228,19 @@ public void run() { } else { LOG.warn("Lost job " + (null == job.getJobName() ? "" : job.getJobName()), e); - continue; + synchronized (statistics) { + statistics.add(jobStats); + } } } + + // get the next job synchronized (mJobs) { - if (!mJobs.offer(job)) { - LOG.error("Lost job " + (null == job.getJobName() - ? "" : job.getJobName())); // should never - // happen - } + jobStats = mJobs.poll(); } - break; } + + // sleep for a while before checking again try { TimeUnit.MILLISECONDS.sleep(pollDelayMillis); } catch (InterruptedException e) { @@ -215,7 +258,9 @@ public void run() { * Start the internal, monitoring thread. */ public void start() { - mThread.start(); + for (int i = 0; i < numPollingThreads; ++i) { + executor.execute(new MonitorThread(i)); + } } /** @@ -224,7 +269,7 @@ public void start() { * if no form of shutdown has been requested. */ public void join(long millis) throws InterruptedException { - mThread.join(millis); + executor.awaitTermination(millis, TimeUnit.MILLISECONDS); } /** @@ -236,7 +281,7 @@ public void abort() { graceful = false; shutdown = true; } - mThread.interrupt(); + executor.shutdown(); } /** @@ -248,7 +293,7 @@ public void shutdown() { graceful = true; shutdown = true; } - mThread.interrupt(); + executor.shutdown(); } } diff --git a/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java b/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java index 62dd9fad417..868ba234811 100644 --- a/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java +++ b/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java @@ -28,6 +28,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapred.gridmix.Statistics.JobStats; /** * Component accepting deserialized job traces, computing split data, and @@ -46,6 +47,7 @@ class JobSubmitter implements Gridmix.Component { private final JobMonitor monitor; private final ExecutorService sched; private volatile boolean shutdown = false; + private final int queueDepth; /** * Initialize the submission component with downstream monitor and pool of @@ -61,6 +63,7 @@ class JobSubmitter implements Gridmix.Component { */ public JobSubmitter(JobMonitor monitor, int threads, int queueDepth, FilePool inputDir, Statistics statistics) { + this.queueDepth = queueDepth; sem = new Semaphore(queueDepth); sched = new ThreadPoolExecutor(threads, threads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); @@ -79,19 +82,25 @@ public SubmitTask(GridmixJob job) { this.job = job; } public void run() { + JobStats stats = + Statistics.generateJobStats(job.getJob(), job.getJobDesc()); try { // pre-compute split information try { + long start = System.currentTimeMillis(); job.buildSplits(inputDir); + long end = System.currentTimeMillis(); + LOG.info("[JobSubmitter] Time taken to build splits for job " + + job.getJob().getJobID() + ": " + (end - start) + " ms."); } catch (IOException e) { LOG.warn("Failed to submit " + job.getJob().getJobName() + " as " + job.getUgi(), e); - monitor.submissionFailed(job.getJob()); + monitor.submissionFailed(stats); return; } catch (Exception e) { LOG.warn("Failed to submit " + job.getJob().getJobName() + " as " + job.getUgi(), e); - monitor.submissionFailed(job.getJob()); + monitor.submissionFailed(stats); return; } // Sleep until deadline @@ -102,10 +111,28 @@ public void run() { } try { // submit job - monitor.add(job.call()); - statistics.addJobStats(job.getJob(), job.getJobDesc()); - LOG.debug("SUBMIT " + job + "@" + System.currentTimeMillis() + - " (" + job.getJob().getJobID() + ")"); + long start = System.currentTimeMillis(); + job.call(); + long end = System.currentTimeMillis(); + LOG.info("[JobSubmitter] Time taken to submit the job " + + job.getJob().getJobID() + ": " + (end - start) + " ms."); + + // mark it as submitted + job.setSubmitted(); + + // add to the monitor + monitor.add(stats); + + // add to the statistics + statistics.addJobStats(stats); + if (LOG.isDebugEnabled()) { + String jobID = + job.getJob().getConfiguration().get(Gridmix.ORIGINAL_JOB_ID); + LOG.debug("Original job '" + jobID + "' is being simulated as '" + + job.getJob().getJobID() + "'"); + LOG.debug("SUBMIT " + job + "@" + System.currentTimeMillis() + + " (" + job.getJob().getJobID() + ")"); + } } catch (IOException e) { LOG.warn("Failed to submit " + job.getJob().getJobName() + " as " + job.getUgi(), e); @@ -113,21 +140,21 @@ public void run() { throw new InterruptedException("Failed to submit " + job.getJob().getJobName()); } - monitor.submissionFailed(job.getJob()); + monitor.submissionFailed(stats); } catch (ClassNotFoundException e) { LOG.warn("Failed to submit " + job.getJob().getJobName(), e); - monitor.submissionFailed(job.getJob()); + monitor.submissionFailed(stats); } } catch (InterruptedException e) { // abort execution, remove splits if nesc // TODO release ThdLoc GridmixJob.pullDescription(job.id()); Thread.currentThread().interrupt(); - monitor.submissionFailed(job.getJob()); + monitor.submissionFailed(stats); } catch(Exception e) { //Due to some exception job wasnt submitted. LOG.info(" Job " + job.getJob().getJobID() + " submission failed " , e); - monitor.submissionFailed(job.getJob()); + monitor.submissionFailed(stats); } finally { sem.release(); } @@ -141,6 +168,8 @@ public void add(final GridmixJob job) throws InterruptedException { final boolean addToQueue = !shutdown; if (addToQueue) { final SubmitTask task = new SubmitTask(job); + LOG.info("Total number of queued jobs: " + + (queueDepth - sem.availablePermits())); sem.acquire(); try { sched.execute(task); diff --git a/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java b/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java index 54f1730cf26..322d7555a51 100644 --- a/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java +++ b/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java @@ -25,6 +25,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.gridmix.Gridmix.Component; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.tools.rumen.JobStory; @@ -43,12 +44,12 @@ /** * Component collecting the stats required by other components * to make decisions. - * Single thread Collector tries to collec the stats. - * Each of thread poll updates certain datastructure(Currently ClusterStats). - * Components interested in these datastructure, need to register. - * StatsCollector notifies each of the listeners. + * Single thread collector tries to collect the stats (currently cluster stats) + * and caches it internally. + * Components interested in these stats need to register themselves and will get + * notified either on every job completion event or some fixed time interval. */ -public class Statistics implements Component { +public class Statistics implements Component { public static final Log LOG = LogFactory.getLog(Statistics.class); private final StatCollector statistics = new StatCollector(); @@ -62,10 +63,16 @@ public class Statistics implements Component { private final List> jobStatListeners = new CopyOnWriteArrayList>(); - //List of jobids and noofMaps for each job - private static final Map jobMaps = - new ConcurrentHashMap(); + // A map of job-sequence-id to job-stats of submitted jobs + private static final Map submittedJobsMap = + new ConcurrentHashMap(); + + // total number of map tasks submitted + private static volatile int numMapsSubmitted = 0; + // total number of reduce tasks submitted + private static volatile int numReducesSubmitted = 0; + private int completedJobsInCurrentInterval = 0; private final int jtPollingInterval; private volatile boolean shutdown = false; @@ -92,41 +99,65 @@ public JobClient run() throws IOException { this.startFlag = startFlag; } - public void addJobStats(Job job, JobStory jobdesc) { + /** + * Generates a job stats. + */ + public static JobStats generateJobStats(Job job, JobStory jobdesc) { int seq = GridmixJob.getJobSeqId(job); - if (seq < 0) { - LOG.info("Not tracking job " + job.getJobName() - + " as seq id is less than zero: " + seq); - return; + // bail out if job description is missing for a job to be simulated + if (seq >= 0 && jobdesc == null) { + throw new IllegalArgumentException("JobStory not available for job " + + job.getJobID()); } - int maps = 0; - int reds = 0; - if (jobdesc == null) { - throw new IllegalArgumentException( - " JobStory not available for job " + job.getJobName()); - } else { + int maps = -1; + int reds = -1; + if (jobdesc != null) { + // Note that the ZombieJob will return a >= 0 value maps = jobdesc.getNumberMaps(); reds = jobdesc.getNumberReduces(); } - JobStats stats = new JobStats(maps, reds, job); - jobMaps.put(seq,stats); + return new JobStats(maps, reds, job); + } + + /** + * Add a submitted job for monitoring. + */ + public void addJobStats(JobStats stats) { + int seq = GridmixJob.getJobSeqId(stats.getJob()); + if (seq < 0) { + LOG.info("Not tracking job " + stats.getJob().getJobName() + + " as seq id is less than zero: " + seq); + return; + } + submittedJobsMap.put(seq, stats); + numMapsSubmitted += stats.getNoOfMaps(); + numReducesSubmitted += stats.getNoOfReds(); } /** * Used by JobMonitor to add the completed job. */ @Override - public void add(Job job) { - //This thread will be notified initially by jobmonitor incase of + public void add(Statistics.JobStats job) { + //This thread will be notified initially by job-monitor incase of //data generation. Ignore that as we are getting once the input is //generated. if (!statistics.isAlive()) { return; } - JobStats stat = jobMaps.remove(GridmixJob.getJobSeqId(job)); - - if (stat == null) return; + JobStats stat = submittedJobsMap.remove(GridmixJob.getJobSeqId(job.getJob())); + + // stat cannot be null + if (stat == null) { + LOG.error("[Statistics] Missing entry for job " + + job.getJob().getJobID()); + return; + } + + // update the total number of submitted map/reduce task count + numMapsSubmitted -= stat.getNoOfMaps(); + numReducesSubmitted -= stat.getNoOfReds(); completedJobsInCurrentInterval++; //check if we have reached the maximum level of job completions. @@ -238,7 +269,7 @@ public void join(long millis) throws InterruptedException { @Override public void shutdown() { shutdown = true; - jobMaps.clear(); + submittedJobsMap.clear(); clusterStatlisteners.clear(); jobStatListeners.clear(); statistics.interrupt(); @@ -247,7 +278,7 @@ public void shutdown() { @Override public void abort() { shutdown = true; - jobMaps.clear(); + submittedJobsMap.clear(); clusterStatlisteners.clear(); jobStatListeners.clear(); statistics.interrupt(); @@ -259,9 +290,10 @@ public void abort() { * TODO: In future we need to extend this to send more information. */ static class JobStats { - private int noOfMaps; - private int noOfReds; - private Job job; + private final int noOfMaps; + private final int noOfReds; + private JobStatus currentStatus; + private final Job job; public JobStats(int noOfMaps,int numOfReds, Job job){ this.job = job; @@ -284,6 +316,20 @@ public int getNoOfReds() { public Job getJob() { return job; } + + /** + * Update the job statistics. + */ + public synchronized void updateJobStatus(JobStatus status) { + this.currentStatus = status; + } + + /** + * Get the current job status. + */ + public synchronized JobStatus getJobStatus() { + return currentStatus; + } } static class ClusterStats { @@ -316,15 +362,28 @@ public ClusterStatus getStatus() { } int getNumRunningJob() { - return jobMaps.size(); + return submittedJobsMap.size(); } /** * @return runningWatitingJobs */ static Collection getRunningJobStats() { - return jobMaps.values(); + return submittedJobsMap.values(); } + /** + * Returns the total number of submitted map tasks + */ + static int getSubmittedMapTasks() { + return numMapsSubmitted; + } + + /** + * Returns the total number of submitted reduce tasks + */ + static int getSubmittedReduceTasks() { + return numReducesSubmitted; + } } } diff --git a/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java b/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java index d78d6313331..df17ebd19e8 100644 --- a/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java +++ b/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java @@ -25,11 +25,15 @@ import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.gridmix.Statistics.ClusterStats; import org.apache.hadoop.mapred.gridmix.Statistics.JobStats; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.tools.rumen.JobStory; import org.apache.hadoop.tools.rumen.JobStoryProducer; import java.io.IOException; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; @@ -87,6 +91,13 @@ public class StressJobFactory extends JobFactory { "gridmix.throttle.jobs-to-tracker-ratio"; final float maxJobTrackerRatio; + /** + * Represents a list of blacklisted jobs. Jobs are blacklisted when either + * they are complete or their status cannot be obtained. Stress mode will + * ignore blacklisted jobs from its overload computation. + */ + private Set blacklistedJobs = new HashSet(); + /** * Creating a new instance does not start the thread. * @@ -145,42 +156,66 @@ public void run() { try { startFlag.await(); if (Thread.currentThread().isInterrupted()) { + LOG.warn("[STRESS] Interrupted before start!. Exiting.."); return; } LOG.info("START STRESS @ " + System.currentTimeMillis()); while (!Thread.currentThread().isInterrupted()) { try { while (loadStatus.overloaded()) { + // update the overload status if (LOG.isDebugEnabled()) { - LOG.debug("Cluster overloaded in run! Sleeping..."); + LOG.debug("Updating the overload status."); } - // sleep try { - Thread.sleep(1000); - } catch (InterruptedException ie) { + checkLoadAndGetSlotsToBackfill(); + } catch (IOException ioe) { + LOG.warn("[STRESS] Check failed!", ioe); return; } + + // if the cluster is still overloaded, then sleep + if (loadStatus.overloaded()) { + if (LOG.isDebugEnabled()) { + LOG.debug("[STRESS] Cluster overloaded in run! Sleeping..."); + } + + // sleep + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + LOG.warn("[STRESS] Interrupted while sleeping! Exiting.", ie); + return; + } + } } while (!loadStatus.overloaded()) { if (LOG.isDebugEnabled()) { - LOG.debug("Cluster underloaded in run! Stressing..."); + LOG.debug("[STRESS] Cluster underloaded in run! Stressing..."); } try { //TODO This in-line read can block submission for large jobs. final JobStory job = getNextJobFiltered(); if (null == job) { + LOG.warn("[STRESS] Finished consuming the input trace. " + + "Exiting.."); return; } if (LOG.isDebugEnabled()) { LOG.debug("Job Selected: " + job.getJobID()); } - submitter.add( - jobCreator.createGridmixJob( - conf, 0L, job, scratch, - userResolver.getTargetUgi( - UserGroupInformation.createRemoteUser(job.getUser())), - sequence.getAndIncrement())); + + UserGroupInformation ugi = + UserGroupInformation.createRemoteUser(job.getUser()); + UserGroupInformation tgtUgi = userResolver.getTargetUgi(ugi); + GridmixJob tJob = + jobCreator.createGridmixJob(conf, 0L, job, scratch, + tgtUgi, sequence.getAndIncrement()); + + // submit the job + submitter.add(tJob); + // TODO: We need to take care of scenario when one map/reduce // takes more than 1 slot. @@ -198,7 +233,7 @@ public void run() { loadStatus.decrementJobLoad(1); } catch (IOException e) { - LOG.error("Error while submitting the job ", e); + LOG.error("[STRESS] Error while submitting the job ", e); error = e; return; } @@ -209,6 +244,7 @@ public void run() { } } } catch (InterruptedException e) { + LOG.error("[STRESS] Interrupted in the main block!", e); return; } finally { IOUtils.cleanup(null, jobProducer); @@ -224,9 +260,17 @@ public void run() { */ @Override public void update(Statistics.ClusterStats item) { - ClusterStatus clusterMetrics = item.getStatus(); + ClusterStatus clusterStatus = item.getStatus(); try { - checkLoadAndGetSlotsToBackfill(item, clusterMetrics); + // update the max cluster map/reduce task capacity + loadStatus.updateMapCapacity(clusterStatus.getMaxMapTasks()); + + loadStatus.updateReduceCapacity(clusterStatus.getMaxReduceTasks()); + + int numTrackers = clusterStatus.getTaskTrackers(); + int jobLoad = + (int) (maxJobTrackerRatio * numTrackers) - item.getNumRunningJob(); + loadStatus.updateJobLoad(jobLoad); } catch (Exception e) { LOG.error("Couldn't get the new Status",e); } @@ -258,22 +302,8 @@ float calcEffectiveIncompleteReduceTasks(int reduceSlotCapacity, * @param clusterStatus Cluster status * @throws java.io.IOException */ - private void checkLoadAndGetSlotsToBackfill( - ClusterStats stats, ClusterStatus clusterStatus) throws IOException, InterruptedException { - - // update the max cluster capacity incase its updated - int mapCapacity = clusterStatus.getMaxMapTasks(); - loadStatus.updateMapCapacity(mapCapacity); - - int reduceCapacity = clusterStatus.getMaxReduceTasks(); - - loadStatus.updateReduceCapacity(reduceCapacity); - - int numTrackers = clusterStatus.getTaskTrackers(); - - int jobLoad = - (int) (maxJobTrackerRatio * numTrackers) - stats.getNumRunningJob(); - loadStatus.updateJobLoad(jobLoad); + protected void checkLoadAndGetSlotsToBackfill() + throws IOException, InterruptedException { if (loadStatus.getJobLoad() <= 0) { if (LOG.isDebugEnabled()) { LOG.debug(System.currentTimeMillis() + " [JobLoad] Overloaded is " @@ -283,17 +313,143 @@ private void checkLoadAndGetSlotsToBackfill( return; // stop calculation because we know it is overloaded. } - float incompleteMapTasks = 0; // include pending & running map tasks. - for (JobStats job : ClusterStats.getRunningJobStats()) { - float mapProgress = job.getJob().mapProgress(); - int noOfMaps = job.getNoOfMaps(); - incompleteMapTasks += - calcEffectiveIncompleteMapTasks(mapCapacity, noOfMaps, mapProgress); + int mapCapacity = loadStatus.getMapCapacity(); + int reduceCapacity = loadStatus.getReduceCapacity(); + + // return if the cluster status is not set + if (mapCapacity < 0 || reduceCapacity < 0) { + // note that, by default, the overload status is true + // missing cluster status will result into blocking of job submission + return; } - int mapSlotsBackFill = - (int) ((overloadMapTaskMapSlotRatio * mapCapacity) - incompleteMapTasks); - loadStatus.updateMapLoad(mapSlotsBackFill); + // Determine the max permissible map & reduce task load + int maxMapLoad = (int) (overloadMapTaskMapSlotRatio * mapCapacity); + int maxReduceLoad = + (int) (overloadReduceTaskReduceSlotRatio * reduceCapacity); + + // compute the total number of map & reduce tasks submitted + int totalMapTasks = ClusterStats.getSubmittedMapTasks(); + int totalReduceTasks = ClusterStats.getSubmittedReduceTasks(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Total submitted map tasks: " + totalMapTasks); + LOG.debug("Total submitted reduce tasks: " + totalReduceTasks); + LOG.debug("Max map load: " + maxMapLoad); + LOG.debug("Max reduce load: " + maxReduceLoad); + } + + // generate a pessimistic bound on the max running+pending map tasks + // this check is to avoid the heavy-duty actual map load calculation + int mapSlotsBackFill = (int) (maxMapLoad - totalMapTasks); + + // generate a pessimistic bound on the max running+pending reduce tasks + // this check is to avoid the heavy-duty actual reduce load calculation + int reduceSlotsBackFill = (int) (maxReduceLoad - totalReduceTasks); + + // maintain a list of seen job ids + Set seenJobIDs = new HashSet(); + + // check if the total number of submitted map/reduce tasks exceeds the + // permissible limit + if (totalMapTasks > maxMapLoad || totalReduceTasks > maxReduceLoad) { + // if yes, calculate the real load + float incompleteMapTasks = 0; // include pending & running map tasks. + float incompleteReduceTasks = 0; // include pending & running reduce tasks + + for (JobStats job : ClusterStats.getRunningJobStats()) { + JobID id = job.getJob().getJobID(); + seenJobIDs.add(id); + + // Note that this is a hack! Ideally, ClusterStats.getRunningJobStats() + // should be smart enough to take care of completed jobs. + if (blacklistedJobs.contains(id)) { + LOG.warn("Ignoring blacklisted job: " + id); + continue; + } + + int noOfMaps = job.getNoOfMaps(); + int noOfReduces = job.getNoOfReds(); + + // consider polling for jobs where maps>0 and reds>0 + // TODO: What about setup/cleanup tasks for cases where m=0 and r=0 + // What otherwise? + if (noOfMaps > 0 || noOfReduces > 0) { + // get the job's status + JobStatus status = job.getJobStatus(); + + // blacklist completed jobs and continue + if (status != null && status.isJobComplete()) { + LOG.warn("Blacklisting completed job: " + id); + blacklistedJobs.add(id); + continue; + } + + // get the map and reduce tasks' progress + float mapProgress = 0f; + float reduceProgress = 0f; + + // check if the status is missing (this can happen for unpolled jobs) + if (status != null) { + mapProgress = status.getMapProgress(); + reduceProgress = status.getReduceProgress(); + } + + incompleteMapTasks += + calcEffectiveIncompleteMapTasks(mapCapacity, noOfMaps, mapProgress); + + // bail out early + int currentMapSlotsBackFill = (int) (maxMapLoad - incompleteMapTasks); + if (currentMapSlotsBackFill <= 0) { + // reset the reduce task load since we are bailing out + incompleteReduceTasks = totalReduceTasks; + if (LOG.isDebugEnabled()) { + LOG.debug("Terminating overload check due to high map load."); + } + break; + } + + // compute the real reduce load + if (noOfReduces > 0) { + incompleteReduceTasks += + calcEffectiveIncompleteReduceTasks(reduceCapacity, noOfReduces, + reduceProgress); + } + + // bail out early + int currentReduceSlotsBackFill = + (int) (maxReduceLoad - incompleteReduceTasks); + if (currentReduceSlotsBackFill <= 0) { + // reset the map task load since we are bailing out + incompleteMapTasks = totalMapTasks; + if (LOG.isDebugEnabled()) { + LOG.debug("Terminating overload check due to high reduce load."); + } + break; + } + } else { + LOG.warn("Blacklisting empty job: " + id); + blacklistedJobs.add(id); + } + } + + // calculate the real map load on the cluster + mapSlotsBackFill = (int) (maxMapLoad - incompleteMapTasks); + + // calculate the real reduce load on the cluster + reduceSlotsBackFill = (int)(maxReduceLoad - incompleteReduceTasks); + + // clean up the backlisted set to keep the memory footprint minimal + // retain only the jobs that are seen in this cycle + blacklistedJobs.retainAll(seenJobIDs); + if (LOG.isDebugEnabled() && blacklistedJobs.size() > 0) { + LOG.debug("Blacklisted jobs count: " + blacklistedJobs.size()); + } + } + + // update + loadStatus.updateMapLoad(mapSlotsBackFill); + loadStatus.updateReduceLoad(reduceSlotsBackFill); if (loadStatus.getMapLoad() <= 0) { if (LOG.isDebugEnabled()) { @@ -303,23 +459,7 @@ private void checkLoadAndGetSlotsToBackfill( } return; // stop calculation because we know it is overloaded. } - - float incompleteReduceTasks = 0; // include pending & running reduce tasks. - for (JobStats job : ClusterStats.getRunningJobStats()) { - // Cached the num-reds value in JobStats - int noOfReduces = job.getNoOfReds(); - if (noOfReduces > 0) { - float reduceProgress = job.getJob().reduceProgress(); - incompleteReduceTasks += - calcEffectiveIncompleteReduceTasks(reduceCapacity, noOfReduces, - reduceProgress); - } - } - int reduceSlotsBackFill = - (int)((overloadReduceTaskReduceSlotRatio * reduceCapacity) - - incompleteReduceTasks); - loadStatus.updateReduceLoad(reduceSlotsBackFill); if (loadStatus.getReduceLoad() <= 0) { if (LOG.isDebugEnabled()) { LOG.debug(System.currentTimeMillis() + " [REDUCE-LOAD] Overloaded is " @@ -445,7 +585,7 @@ private synchronized void updateOverloadStatus() { || (numJobsBackfill <= 0)); } - public synchronized boolean overloaded() { + public boolean overloaded() { return overloaded.get(); } diff --git a/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixStatistics.java b/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixStatistics.java new file mode 100644 index 00000000000..f0df5fb03af --- /dev/null +++ b/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixStatistics.java @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.gridmix; + +import java.util.concurrent.CountDownLatch; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.gridmix.Statistics.ClusterStats; +import org.apache.hadoop.mapred.gridmix.Statistics.JobStats; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.tools.rumen.JobStory; +import org.apache.hadoop.tools.rumen.TaskAttemptInfo; +import org.apache.hadoop.tools.rumen.TaskInfo; +import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values; +import org.junit.Test; +import static org.junit.Assert.*; + +/** + * Test the Gridmix's {@link Statistics} class. + */ +public class TestGridmixStatistics { + /** + * Test {@link Statistics.JobStats}. + */ + @Test + @SuppressWarnings("deprecation") + public void testJobStats() throws Exception { + Job job = new Job() {}; + JobStats stats = new JobStats(1, 2, job); + assertEquals("Incorrect num-maps", 1, stats.getNoOfMaps()); + assertEquals("Incorrect num-reds", 2, stats.getNoOfReds()); + assertTrue("Incorrect job", job == stats.getJob()); + assertNull("Unexpected job status", stats.getJobStatus()); + + // add a new status + JobStatus status = new JobStatus(); + stats.updateJobStatus(status); + assertNotNull("Missing job status", stats.getJobStatus()); + assertTrue("Incorrect job status", status == stats.getJobStatus()); + } + + private static JobStory getCustomJobStory(final int numMaps, + final int numReds) { + return new JobStory() { + @Override + public InputSplit[] getInputSplits() { + return null; + } + @Override + public JobConf getJobConf() { + return null; + } + @Override + public JobID getJobID() { + return null; + } + @Override + public TaskAttemptInfo getMapTaskAttemptInfoAdjusted(int arg0, int arg1, + int arg2) { + return null; + } + @Override + public String getName() { + return null; + } + @Override + public int getNumberMaps() { + return numMaps; + } + @Override + public int getNumberReduces() { + return numReds; + } + @Override + public Values getOutcome() { + return null; + } + @Override + public String getQueueName() { + return null; + } + @Override + public long getSubmissionTime() { + return 0; + } + @Override + public TaskAttemptInfo getTaskAttemptInfo(TaskType arg0, int arg1, + int arg2) { + return null; + } + @Override + public TaskInfo getTaskInfo(TaskType arg0, int arg1) { + return null; + } + @Override + public String getUser() { + return null; + } + }; + } + + /** + * Test {@link Statistics}. + */ + @Test + @SuppressWarnings("deprecation") + public void testStatistics() throws Exception { + // test job stats generation + Configuration conf = new Configuration(); + + // test dummy jobs like data-generation etc + Job job = new Job(conf) { + }; + JobStats stats = Statistics.generateJobStats(job, null); + testJobStats(stats, -1, -1, null, job); + + // add a job desc with 2 map and 1 reduce task + conf.setInt(GridmixJob.GRIDMIX_JOB_SEQ, 1); + + // test dummy jobs like data-generation etc + job = new Job(conf) { + }; + JobStory zjob = getCustomJobStory(2, 1); + stats = Statistics.generateJobStats(job, zjob); + testJobStats(stats, 2, 1, null, job); + + // add a job status + JobStatus jStatus = new JobStatus(); + stats.updateJobStatus(jStatus); + testJobStats(stats, 2, 1, jStatus, job); + + + // start the statistics + CountDownLatch startFlag = new CountDownLatch(1); // prevents the collector + // thread from starting + Statistics statistics = new Statistics(new JobConf(), 0, startFlag); + statistics.start(); + + testClusterStats(0, 0, 0); + + // add to the statistics object + statistics.addJobStats(stats); + testClusterStats(2, 1, 1); + + // add another job + JobStory zjob2 = getCustomJobStory(10, 5); + conf.setInt(GridmixJob.GRIDMIX_JOB_SEQ, 2); + job = new Job(conf) { + }; + + JobStats stats2 = Statistics.generateJobStats(job, zjob2); + statistics.addJobStats(stats2); + testClusterStats(12, 6, 2); + + // finish off one job + statistics.add(stats2); + testClusterStats(2, 1, 1); + + // finish off the other job + statistics.add(stats); + testClusterStats(0, 0, 0); + + statistics.shutdown(); + } + + // test the job stats + private static void testJobStats(JobStats stats, int numMaps, int numReds, + JobStatus jStatus, Job job) { + assertEquals("Incorrect num map tasks", numMaps, stats.getNoOfMaps()); + assertEquals("Incorrect num reduce tasks", numReds, stats.getNoOfReds()); + + if (job != null) { + assertNotNull("Missing job", job); + } + // check running job + assertTrue("Incorrect job", job == stats.getJob()); + + if (jStatus != null) { + assertNotNull("Missing job status", jStatus); + } + // check job stats + assertTrue("Incorrect job status", jStatus == stats.getJobStatus()); + } + + // test the cluster stats + private static void testClusterStats(int numSubmittedMapTasks, + int numSubmittedReduceTasks, + int numSubmittedJobs) { + assertEquals("Incorrect count of total number of submitted map tasks", + numSubmittedMapTasks, ClusterStats.getSubmittedMapTasks()); + assertEquals("Incorrect count of total number of submitted reduce tasks", + numSubmittedReduceTasks, + ClusterStats.getSubmittedReduceTasks()); + assertEquals("Incorrect submitted jobs", + numSubmittedJobs, ClusterStats.getRunningJobStats().size()); + } +} diff --git a/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java b/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java index 22b742678f6..5e03d613331 100644 --- a/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java +++ b/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java @@ -26,6 +26,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.zip.GZIPInputStream; import org.apache.hadoop.conf.Configuration; @@ -96,7 +97,7 @@ static class TestMonitor extends JobMonitor { private final BlockingQueue retiredJobs; public TestMonitor(int expected, Statistics stats) { - super(stats); + super(5, TimeUnit.SECONDS, stats, 1); this.expected = expected; retiredJobs = new LinkedBlockingQueue(); } @@ -349,7 +350,7 @@ public void checkMonitor(Configuration conf) throws Exception { } @Override - protected JobMonitor createJobMonitor(Statistics stats) { + protected JobMonitor createJobMonitor(Statistics stats, Configuration conf){ monitor = new TestMonitor(NJOBS + 1, stats); return monitor; } diff --git a/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSummary.java b/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSummary.java index 64af603bec5..f49617fd5cf 100644 --- a/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSummary.java +++ b/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSummary.java @@ -193,7 +193,7 @@ public void testExecutionSummarizer() throws IOException { es.update(null); assertEquals("ExecutionSummarizer init failed", 0, es.getSimulationStartTime()); - testExecutionSummarizer(0, 0, 0, 0, 0, 0, es); + testExecutionSummarizer(0, 0, 0, 0, 0, 0, 0, es); long simStartTime = System.currentTimeMillis(); es.start(null); @@ -203,14 +203,24 @@ public void testExecutionSummarizer() throws IOException { es.getSimulationStartTime() <= System.currentTimeMillis()); // test with job stats - JobStats stats = generateFakeJobStats(1, 10, true); + JobStats stats = generateFakeJobStats(1, 10, true, false); es.update(stats); - testExecutionSummarizer(1, 10, 0, 1, 1, 0, es); + testExecutionSummarizer(1, 10, 0, 1, 1, 0, 0, es); // test with failed job - stats = generateFakeJobStats(5, 1, false); + stats = generateFakeJobStats(5, 1, false, false); es.update(stats); - testExecutionSummarizer(6, 11, 0, 2, 1, 1, es); + testExecutionSummarizer(6, 11, 0, 2, 1, 1, 0, es); + + // test with successful but lost job + stats = generateFakeJobStats(1, 1, true, true); + es.update(stats); + testExecutionSummarizer(7, 12, 0, 3, 1, 1, 1, es); + + // test with failed but lost job + stats = generateFakeJobStats(2, 2, false, true); + es.update(stats); + testExecutionSummarizer(9, 14, 0, 4, 1, 1, 2, es); // test finalize // define a fake job factory @@ -306,7 +316,7 @@ public void testExecutionSummarizer() throws IOException { // test the ExecutionSummarizer private static void testExecutionSummarizer(int numMaps, int numReds, int totalJobsInTrace, int totalJobSubmitted, int numSuccessfulJob, - int numFailedJobs, ExecutionSummarizer es) { + int numFailedJobs, int numLostJobs, ExecutionSummarizer es) { assertEquals("ExecutionSummarizer test failed [num-maps]", numMaps, es.getNumMapTasksLaunched()); assertEquals("ExecutionSummarizer test failed [num-reducers]", @@ -319,12 +329,14 @@ private static void testExecutionSummarizer(int numMaps, int numReds, numSuccessfulJob, es.getNumSuccessfulJobs()); assertEquals("ExecutionSummarizer test failed [num-failed jobs]", numFailedJobs, es.getNumFailedJobs()); + assertEquals("ExecutionSummarizer test failed [num-lost jobs]", + numLostJobs, es.getNumLostJobs()); } // generate fake job stats @SuppressWarnings("deprecation") private static JobStats generateFakeJobStats(final int numMaps, - final int numReds, final boolean isSuccessful) + final int numReds, final boolean isSuccessful, final boolean lost) throws IOException { // A fake job Job fakeJob = new Job() { @@ -335,6 +347,9 @@ public int getNumReduceTasks() { @Override public boolean isSuccessful() throws IOException, InterruptedException { + if (lost) { + throw new IOException("Test failure!"); + } return isSuccessful; }; }; diff --git a/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java b/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java index 84f292e770c..9e278a84390 100644 --- a/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java +++ b/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java @@ -42,6 +42,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.*; @@ -74,7 +75,7 @@ static class TestMonitor extends JobMonitor { private final int expected; public TestMonitor(int expected, Statistics stats) { - super(stats); + super(5, TimeUnit.SECONDS, stats, 1); this.expected = expected; retiredJobs = new LinkedBlockingQueue(); } @@ -102,7 +103,7 @@ static class DebugGridmix extends Gridmix { private TestMonitor monitor; @Override - protected JobMonitor createJobMonitor(Statistics stats) { + protected JobMonitor createJobMonitor(Statistics stats, Configuration c) { monitor = new TestMonitor(NJOBS + 1, stats); return monitor; } diff --git a/hadoop-mapreduce-project/src/docs/src/documentation/content/xdocs/gridmix.xml b/hadoop-mapreduce-project/src/docs/src/documentation/content/xdocs/gridmix.xml index 8be94c03df4..410ca407fc3 100644 --- a/hadoop-mapreduce-project/src/docs/src/documentation/content/xdocs/gridmix.xml +++ b/hadoop-mapreduce-project/src/docs/src/documentation/content/xdocs/gridmix.xml @@ -206,6 +206,22 @@ hadoop jar <gridmix-jar> org.apache.hadoop.mapred.gridmix.Gridmix \ options using the values obtained from the original task (i.e via trace). + + + + gridmix.job-monitor.thread-count + + Total number of threads to use for polling for jobs' status. The + default value is 1. + + + + + gridmix.job-monitor.sleep-time-ms + + The time each Gridmix status poller thread will sleep before + starting the next cycle. The default value is 500 milliseconds. +