MAPREDUCE-3787. [Gridmix] Optimize job monitoring and STRESS mode for faster job submission. (amarrk)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1292736 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Amar Kamat 2012-02-23 10:41:07 +00:00
parent f11b0788df
commit 8a2073cc61
14 changed files with 806 additions and 210 deletions

View File

@ -14,6 +14,8 @@ Trunk (unreleased changes)
(Plamen Jeliazkov via shv) (Plamen Jeliazkov via shv)
IMPROVEMENTS IMPROVEMENTS
MAPREDUCE-3787. [Gridmix] Optimize job monitoring and STRESS mode for
faster job submission. (amarrk)
MAPREDUCE-3481. [Gridmix] Improve Gridmix STRESS mode. (amarrk) MAPREDUCE-3481. [Gridmix] Improve Gridmix STRESS mode. (amarrk)

View File

@ -53,6 +53,7 @@ class ExecutionSummarizer implements StatListener<JobStats> {
private int numJobsInInputTrace; private int numJobsInInputTrace;
private int totalSuccessfulJobs; private int totalSuccessfulJobs;
private int totalFailedJobs; private int totalFailedJobs;
private int totalLostJobs;
private int totalMapTasksLaunched; private int totalMapTasksLaunched;
private int totalReduceTasksLaunched; private int totalReduceTasksLaunched;
private long totalSimulationTime; private long totalSimulationTime;
@ -90,31 +91,32 @@ class ExecutionSummarizer implements StatListener<JobStats> {
simulationStartTime = System.currentTimeMillis(); simulationStartTime = System.currentTimeMillis();
} }
private void processJobState(JobStats stats) throws Exception { private void processJobState(JobStats stats) {
Job job = stats.getJob(); Job job = stats.getJob();
if (job.isSuccessful()) { try {
++totalSuccessfulJobs; if (job.isSuccessful()) {
} else { ++totalSuccessfulJobs;
++totalFailedJobs; } else {
++totalFailedJobs;
}
} catch (Exception e) {
// this behavior is consistent with job-monitor which marks the job as
// complete (lost) if the status polling bails out
++totalLostJobs;
} }
} }
private void processJobTasks(JobStats stats) throws Exception { private void processJobTasks(JobStats stats) {
totalMapTasksLaunched += stats.getNoOfMaps(); totalMapTasksLaunched += stats.getNoOfMaps();
Job job = stats.getJob(); totalReduceTasksLaunched += stats.getNoOfReds();
totalReduceTasksLaunched += job.getNumReduceTasks();
} }
private void process(JobStats stats) { private void process(JobStats stats) {
try { // process the job run state
// process the job run state processJobState(stats);
processJobState(stats);
// process the tasks information
// process the tasks information processJobTasks(stats);
processJobTasks(stats);
} catch (Exception e) {
LOG.info("Error in processing job " + stats.getJob().getJobID() + ".");
}
} }
@Override @Override
@ -191,6 +193,8 @@ class ExecutionSummarizer implements StatListener<JobStats> {
.append(getNumSuccessfulJobs()); .append(getNumSuccessfulJobs());
builder.append("\nTotal number of failed jobs: ") builder.append("\nTotal number of failed jobs: ")
.append(getNumFailedJobs()); .append(getNumFailedJobs());
builder.append("\nTotal number of lost jobs: ")
.append(getNumLostJobs());
builder.append("\nTotal number of map tasks launched: ") builder.append("\nTotal number of map tasks launched: ")
.append(getNumMapTasksLaunched()); .append(getNumMapTasksLaunched());
builder.append("\nTotal number of reduce task launched: ") builder.append("\nTotal number of reduce task launched: ")
@ -266,8 +270,12 @@ class ExecutionSummarizer implements StatListener<JobStats> {
return totalFailedJobs; return totalFailedJobs;
} }
protected int getNumLostJobs() {
return totalLostJobs;
}
protected int getNumSubmittedJobs() { protected int getNumSubmittedJobs() {
return totalSuccessfulJobs + totalFailedJobs; return totalSuccessfulJobs + totalFailedJobs + totalLostJobs;
} }
protected int getNumMapTasksLaunched() { protected int getNumMapTasksLaunched() {

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapred.gridmix.GenerateData.DataStatistics; import org.apache.hadoop.mapred.gridmix.GenerateData.DataStatistics;
import org.apache.hadoop.mapred.gridmix.Statistics.JobStats;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
@ -93,6 +94,31 @@ public class Gridmix extends Configured implements Tool {
*/ */
public static final String GRIDMIX_USR_RSV = "gridmix.user.resolve.class"; public static final String GRIDMIX_USR_RSV = "gridmix.user.resolve.class";
/**
* The configuration key which determines the duration for which the
* job-monitor sleeps while polling for job status.
* This value should be specified in milliseconds.
*/
public static final String GRIDMIX_JOBMONITOR_SLEEPTIME_MILLIS =
"gridmix.job-monitor.sleep-time-ms";
/**
* Default value for {@link #GRIDMIX_JOBMONITOR_SLEEPTIME_MILLIS}.
*/
public static final int GRIDMIX_JOBMONITOR_SLEEPTIME_MILLIS_DEFAULT = 500;
/**
* The configuration key which determines the total number of job-status
* monitoring threads.
*/
public static final String GRIDMIX_JOBMONITOR_THREADS =
"gridmix.job-monitor.thread-count";
/**
* Default value for {@link #GRIDMIX_JOBMONITOR_THREADS}.
*/
public static final int GRIDMIX_JOBMONITOR_THREADS_DEFAULT = 1;
/** /**
* Configuration property set in simulated job's configuration whose value is * Configuration property set in simulated job's configuration whose value is
* set to the corresponding original job's name. This is not configurable by * set to the corresponding original job's name. This is not configurable by
@ -185,8 +211,13 @@ public class Gridmix extends Configured implements Tool {
submitter.add(job); submitter.add(job);
// TODO add listeners, use for job dependencies // TODO add listeners, use for job dependencies
TimeUnit.SECONDS.sleep(10);
try { try {
while (!job.isSubmitted()) {
try {
Thread.sleep(100); // sleep
} catch (InterruptedException ie) {}
}
// wait for completion
job.getJob().waitForCompletion(false); job.getJob().waitForCompletion(false);
} catch (ClassNotFoundException e) { } catch (ClassNotFoundException e) {
throw new IOException("Internal error", e); throw new IOException("Internal error", e);
@ -241,7 +272,7 @@ public class Gridmix extends Configured implements Tool {
GridmixJobSubmissionPolicy policy = getJobSubmissionPolicy(conf); GridmixJobSubmissionPolicy policy = getJobSubmissionPolicy(conf);
LOG.info(" Submission policy is " + policy.name()); LOG.info(" Submission policy is " + policy.name());
statistics = new Statistics(conf, policy.getPollingInterval(), startFlag); statistics = new Statistics(conf, policy.getPollingInterval(), startFlag);
monitor = createJobMonitor(statistics); monitor = createJobMonitor(statistics, conf);
int noOfSubmitterThreads = int noOfSubmitterThreads =
(policy == GridmixJobSubmissionPolicy.SERIAL) (policy == GridmixJobSubmissionPolicy.SERIAL)
? 1 ? 1
@ -276,8 +307,13 @@ public class Gridmix extends Configured implements Tool {
} }
} }
protected JobMonitor createJobMonitor(Statistics stats) throws IOException { protected JobMonitor createJobMonitor(Statistics stats, Configuration conf)
return new JobMonitor(stats); throws IOException {
int delay = conf.getInt(GRIDMIX_JOBMONITOR_SLEEPTIME_MILLIS,
GRIDMIX_JOBMONITOR_SLEEPTIME_MILLIS_DEFAULT);
int numThreads = conf.getInt(GRIDMIX_JOBMONITOR_THREADS,
GRIDMIX_JOBMONITOR_THREADS_DEFAULT);
return new JobMonitor(delay, TimeUnit.MILLISECONDS, stats, numThreads);
} }
protected JobSubmitter createJobSubmitter(JobMonitor monitor, int threads, protected JobSubmitter createJobSubmitter(JobMonitor monitor, int threads,
@ -571,12 +607,13 @@ public class Gridmix extends Configured implements Tool {
if (monitor == null) { if (monitor == null) {
return; return;
} }
List<Job> remainingJobs = monitor.getRemainingJobs(); List<JobStats> remainingJobs = monitor.getRemainingJobs();
if (remainingJobs.isEmpty()) { if (remainingJobs.isEmpty()) {
return; return;
} }
LOG.info("Killing running jobs..."); LOG.info("Killing running jobs...");
for (Job job : remainingJobs) { for (JobStats stats : remainingJobs) {
Job job = stats.getJob();
try { try {
if (!job.isComplete()) { if (!job.isComplete()) {
job.killJob(); job.killJob();

View File

@ -72,6 +72,7 @@ abstract class GridmixJob implements Callable<Job>, Delayed {
} }
}; };
private boolean submitted;
protected final int seq; protected final int seq;
protected final Path outdir; protected final Path outdir;
protected final Job job; protected final Job job;
@ -412,6 +413,14 @@ abstract class GridmixJob implements Callable<Job>, Delayed {
return jobdesc; return jobdesc;
} }
void setSubmitted() {
submitted = true;
}
boolean isSubmitted() {
return submitted;
}
static void pushDescription(int seq, List<InputSplit> splits) { static void pushDescription(int seq, List<InputSplit> splits) {
if (null != descCache.putIfAbsent(seq, splits)) { if (null != descCache.putIfAbsent(seq, splits)) {
throw new IllegalArgumentException("Description exists for id " + seq); throw new IllegalArgumentException("Description exists for id " + seq);

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.mapred.gridmix; package org.apache.hadoop.mapred.gridmix;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -35,6 +36,8 @@ import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -179,19 +182,33 @@ abstract class JobFactory<T> implements Gridmix.Component<Void>,StatListener<T>
protected JobStory getNextJobFiltered() throws IOException { protected JobStory getNextJobFiltered() throws IOException {
JobStory job = getNextJobFromTrace(); JobStory job = getNextJobFromTrace();
// filter out the following jobs
// - unsuccessful jobs
// - jobs with missing submit-time
// - reduce only jobs
// These jobs are not yet supported in Gridmix
while (job != null && while (job != null &&
(job.getOutcome() != Pre21JobHistoryConstants.Values.SUCCESS || (job.getOutcome() != Pre21JobHistoryConstants.Values.SUCCESS ||
job.getSubmissionTime() < 0)) { job.getSubmissionTime() < 0 || job.getNumberMaps() == 0)) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
String reason = null; List<String> reason = new ArrayList<String>();
if (job.getOutcome() != Pre21JobHistoryConstants.Values.SUCCESS) { if (job.getOutcome() != Pre21JobHistoryConstants.Values.SUCCESS) {
reason = "STATE (" + job.getOutcome().name() + ") "; reason.add("STATE (" + job.getOutcome().name() + ")");
} }
if (job.getSubmissionTime() < 0) { if (job.getSubmissionTime() < 0) {
reason += "SUBMISSION-TIME (" + job.getSubmissionTime() + ")"; reason.add("SUBMISSION-TIME (" + job.getSubmissionTime() + ")");
} }
if (job.getNumberMaps() == 0) {
reason.add("ZERO-MAPS-JOB");
}
// TODO This should never happen. Probably we missed something!
if (reason.size() == 0) {
reason.add("N/A");
}
LOG.debug("Ignoring job " + job.getJobID() + " from the input trace." LOG.debug("Ignoring job " + job.getJobID() + " from the input trace."
+ " Reason: " + reason == null ? "N/A" : reason); + " Reason: " + StringUtils.join(reason, ","));
} }
job = getNextJobFromTrace(); job = getNextJobFromTrace();
} }

View File

@ -24,37 +24,47 @@ import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.gridmix.Statistics.JobStats;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobStatus;
/** /**
* Component accepting submitted, running jobs and responsible for * Component accepting submitted, running {@link Statistics.JobStats} and
* monitoring jobs for success and failure. Once a job is submitted, it is * responsible for monitoring jobs for success and failure. Once a job is
* polled for status until complete. If a job is complete, then the monitor * submitted, it is polled for status until complete. If a job is complete,
* thread returns immediately to the queue. If not, the monitor will sleep * then the monitor thread returns immediately to the queue. If not, the monitor
* for some duration. * will sleep for some duration.
*
* {@link JobMonitor} can be configured to use multiple threads for polling
* the job statuses. Use {@link Gridmix#GRIDMIX_JOBMONITOR_THREADS} to specify
* the total number of monitoring threads.
*
* The duration for which a monitoring thread sleeps if the first job in the
* queue is running can also be configured. Use
* {@link Gridmix#GRIDMIX_JOBMONITOR_SLEEPTIME_MILLIS} to specify a custom
* value.
*/ */
class JobMonitor implements Gridmix.Component<Job> { class JobMonitor implements Gridmix.Component<JobStats> {
public static final Log LOG = LogFactory.getLog(JobMonitor.class); public static final Log LOG = LogFactory.getLog(JobMonitor.class);
private final Queue<Job> mJobs; private final Queue<JobStats> mJobs;
private final MonitorThread mThread; private ExecutorService executor;
private final BlockingQueue<Job> runningJobs; private int numPollingThreads;
private final BlockingQueue<JobStats> runningJobs;
private final long pollDelayMillis; private final long pollDelayMillis;
private Statistics statistics; private Statistics statistics;
private boolean graceful = false; private boolean graceful = false;
private boolean shutdown = false; private boolean shutdown = false;
public JobMonitor(Statistics statistics) {
this(5,TimeUnit.SECONDS, statistics);
}
/** /**
* Create a JobMonitor that sleeps for the specified duration after * Create a JobMonitor that sleeps for the specified duration after
* polling a still-running job. * polling a still-running job.
@ -62,30 +72,37 @@ class JobMonitor implements Gridmix.Component<Job> {
* @param unit Time unit for pollDelaySec (rounded to milliseconds) * @param unit Time unit for pollDelaySec (rounded to milliseconds)
* @param statistics StatCollector , listener to job completion. * @param statistics StatCollector , listener to job completion.
*/ */
public JobMonitor(int pollDelay, TimeUnit unit, Statistics statistics) { public JobMonitor(int pollDelay, TimeUnit unit, Statistics statistics,
mThread = new MonitorThread(); int numPollingThreads) {
runningJobs = new LinkedBlockingQueue<Job>(); executor = Executors.newCachedThreadPool();
mJobs = new LinkedList<Job>(); this.numPollingThreads = numPollingThreads;
runningJobs = new LinkedBlockingQueue<JobStats>();
mJobs = new LinkedList<JobStats>();
this.pollDelayMillis = TimeUnit.MILLISECONDS.convert(pollDelay, unit); this.pollDelayMillis = TimeUnit.MILLISECONDS.convert(pollDelay, unit);
this.statistics = statistics; this.statistics = statistics;
} }
/** /**
* Add a job to the polling queue. * Add a running job's status to the polling queue.
*/ */
public void add(Job job) throws InterruptedException { public void add(JobStats job) throws InterruptedException {
runningJobs.put(job); synchronized (runningJobs) {
runningJobs.put(job);
}
} }
/** /**
* Add a submission failed job , such that it can be communicated * Add a submission failed job's status, such that it can be communicated
* back to serial. * back to serial.
* TODO: Cleaner solution for this problem * TODO: Cleaner solution for this problem
* @param job * @param job
*/ */
public void submissionFailed(Job job) { public void submissionFailed(JobStats job) {
LOG.info("Job submission failed notification for job " + job.getJobID()); String jobID = job.getJob().getConfiguration().get(Gridmix.ORIGINAL_JOB_ID);
this.statistics.add(job); LOG.info("Job submission failed notification for job " + jobID);
synchronized (statistics) {
this.statistics.add(job);
}
} }
/** /**
@ -108,12 +125,9 @@ class JobMonitor implements Gridmix.Component<Job> {
* @throws IllegalStateException If monitoring thread is still running. * @throws IllegalStateException If monitoring thread is still running.
* @return Any jobs submitted and not known to have completed. * @return Any jobs submitted and not known to have completed.
*/ */
List<Job> getRemainingJobs() { List<JobStats> getRemainingJobs() {
if (mThread.isAlive()) {
LOG.warn("Internal error: Polling running monitor for jobs");
}
synchronized (mJobs) { synchronized (mJobs) {
return new ArrayList<Job>(mJobs); return new ArrayList<JobStats>(mJobs);
} }
} }
@ -123,19 +137,8 @@ class JobMonitor implements Gridmix.Component<Job> {
*/ */
private class MonitorThread extends Thread { private class MonitorThread extends Thread {
public MonitorThread() { public MonitorThread(int i) {
super("GridmixJobMonitor"); super("GridmixJobMonitor-" + i);
}
/**
* Check a job for success or failure.
*/
public void process(Job job) throws IOException, InterruptedException {
if (job.isSuccessful()) {
onSuccess(job);
} else {
onFailure(job);
}
} }
@Override @Override
@ -144,10 +147,12 @@ class JobMonitor implements Gridmix.Component<Job> {
boolean shutdown; boolean shutdown;
while (true) { while (true) {
try { try {
synchronized (mJobs) { synchronized (runningJobs) {
graceful = JobMonitor.this.graceful; synchronized (mJobs) {
shutdown = JobMonitor.this.shutdown; graceful = JobMonitor.this.graceful;
runningJobs.drainTo(mJobs); shutdown = JobMonitor.this.shutdown;
runningJobs.drainTo(mJobs);
}
} }
// shutdown conditions; either shutdown requested and all jobs // shutdown conditions; either shutdown requested and all jobs
@ -155,26 +160,63 @@ class JobMonitor implements Gridmix.Component<Job> {
// submitted jobs not in the monitored set // submitted jobs not in the monitored set
if (shutdown) { if (shutdown) {
if (!graceful) { if (!graceful) {
while (!runningJobs.isEmpty()) { synchronized (runningJobs) {
synchronized (mJobs) { while (!runningJobs.isEmpty()) {
runningJobs.drainTo(mJobs); synchronized (mJobs) {
runningJobs.drainTo(mJobs);
}
} }
} }
break; break;
} else if (mJobs.isEmpty()) { }
break;
synchronized (mJobs) {
if (graceful && mJobs.isEmpty()) {
break;
}
} }
} }
while (!mJobs.isEmpty()) { JobStats jobStats = null;
Job job; synchronized (mJobs) {
synchronized (mJobs) { jobStats = mJobs.poll();
job = mJobs.poll(); }
} while (jobStats != null) {
Job job = jobStats.getJob();
try { try {
if (job.isComplete()) { // get the job status
process(job); long start = System.currentTimeMillis();
statistics.add(job); JobStatus status = job.getStatus(); // cache the job status
continue; long end = System.currentTimeMillis();
if (LOG.isDebugEnabled()) {
LOG.debug("Status polling for job " + job.getJobID() + " took "
+ (end-start) + "ms.");
}
// update the job progress
jobStats.updateJobStatus(status);
// if the job is complete, let others know
if (status.isJobComplete()) {
if (status.getState() == JobStatus.State.SUCCEEDED) {
onSuccess(job);
} else {
onFailure(job);
}
synchronized (statistics) {
statistics.add(jobStats);
}
} else {
// add the running job back and break
synchronized (mJobs) {
if (!mJobs.offer(jobStats)) {
LOG.error("Lost job " + (null == job.getJobName()
? "<unknown>" : job.getJobName())); // should never
// happen
}
}
break;
} }
} catch (IOException e) { } catch (IOException e) {
if (e.getCause() instanceof ClosedByInterruptException) { if (e.getCause() instanceof ClosedByInterruptException) {
@ -186,18 +228,19 @@ class JobMonitor implements Gridmix.Component<Job> {
} else { } else {
LOG.warn("Lost job " + (null == job.getJobName() LOG.warn("Lost job " + (null == job.getJobName()
? "<unknown>" : job.getJobName()), e); ? "<unknown>" : job.getJobName()), e);
continue; synchronized (statistics) {
statistics.add(jobStats);
}
} }
} }
// get the next job
synchronized (mJobs) { synchronized (mJobs) {
if (!mJobs.offer(job)) { jobStats = mJobs.poll();
LOG.error("Lost job " + (null == job.getJobName()
? "<unknown>" : job.getJobName())); // should never
// happen
}
} }
break;
} }
// sleep for a while before checking again
try { try {
TimeUnit.MILLISECONDS.sleep(pollDelayMillis); TimeUnit.MILLISECONDS.sleep(pollDelayMillis);
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -215,7 +258,9 @@ class JobMonitor implements Gridmix.Component<Job> {
* Start the internal, monitoring thread. * Start the internal, monitoring thread.
*/ */
public void start() { public void start() {
mThread.start(); for (int i = 0; i < numPollingThreads; ++i) {
executor.execute(new MonitorThread(i));
}
} }
/** /**
@ -224,7 +269,7 @@ class JobMonitor implements Gridmix.Component<Job> {
* if no form of shutdown has been requested. * if no form of shutdown has been requested.
*/ */
public void join(long millis) throws InterruptedException { public void join(long millis) throws InterruptedException {
mThread.join(millis); executor.awaitTermination(millis, TimeUnit.MILLISECONDS);
} }
/** /**
@ -236,7 +281,7 @@ class JobMonitor implements Gridmix.Component<Job> {
graceful = false; graceful = false;
shutdown = true; shutdown = true;
} }
mThread.interrupt(); executor.shutdown();
} }
/** /**
@ -248,7 +293,7 @@ class JobMonitor implements Gridmix.Component<Job> {
graceful = true; graceful = true;
shutdown = true; shutdown = true;
} }
mThread.interrupt(); executor.shutdown();
} }
} }

View File

@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.gridmix.Statistics.JobStats;
/** /**
* Component accepting deserialized job traces, computing split data, and * Component accepting deserialized job traces, computing split data, and
@ -46,6 +47,7 @@ class JobSubmitter implements Gridmix.Component<GridmixJob> {
private final JobMonitor monitor; private final JobMonitor monitor;
private final ExecutorService sched; private final ExecutorService sched;
private volatile boolean shutdown = false; private volatile boolean shutdown = false;
private final int queueDepth;
/** /**
* Initialize the submission component with downstream monitor and pool of * Initialize the submission component with downstream monitor and pool of
@ -61,6 +63,7 @@ class JobSubmitter implements Gridmix.Component<GridmixJob> {
*/ */
public JobSubmitter(JobMonitor monitor, int threads, int queueDepth, public JobSubmitter(JobMonitor monitor, int threads, int queueDepth,
FilePool inputDir, Statistics statistics) { FilePool inputDir, Statistics statistics) {
this.queueDepth = queueDepth;
sem = new Semaphore(queueDepth); sem = new Semaphore(queueDepth);
sched = new ThreadPoolExecutor(threads, threads, 0L, sched = new ThreadPoolExecutor(threads, threads, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
@ -79,19 +82,25 @@ class JobSubmitter implements Gridmix.Component<GridmixJob> {
this.job = job; this.job = job;
} }
public void run() { public void run() {
JobStats stats =
Statistics.generateJobStats(job.getJob(), job.getJobDesc());
try { try {
// pre-compute split information // pre-compute split information
try { try {
long start = System.currentTimeMillis();
job.buildSplits(inputDir); job.buildSplits(inputDir);
long end = System.currentTimeMillis();
LOG.info("[JobSubmitter] Time taken to build splits for job "
+ job.getJob().getJobID() + ": " + (end - start) + " ms.");
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Failed to submit " + job.getJob().getJobName() + " as " LOG.warn("Failed to submit " + job.getJob().getJobName() + " as "
+ job.getUgi(), e); + job.getUgi(), e);
monitor.submissionFailed(job.getJob()); monitor.submissionFailed(stats);
return; return;
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Failed to submit " + job.getJob().getJobName() + " as " LOG.warn("Failed to submit " + job.getJob().getJobName() + " as "
+ job.getUgi(), e); + job.getUgi(), e);
monitor.submissionFailed(job.getJob()); monitor.submissionFailed(stats);
return; return;
} }
// Sleep until deadline // Sleep until deadline
@ -102,10 +111,28 @@ class JobSubmitter implements Gridmix.Component<GridmixJob> {
} }
try { try {
// submit job // submit job
monitor.add(job.call()); long start = System.currentTimeMillis();
statistics.addJobStats(job.getJob(), job.getJobDesc()); job.call();
LOG.debug("SUBMIT " + job + "@" + System.currentTimeMillis() + long end = System.currentTimeMillis();
" (" + job.getJob().getJobID() + ")"); LOG.info("[JobSubmitter] Time taken to submit the job "
+ job.getJob().getJobID() + ": " + (end - start) + " ms.");
// mark it as submitted
job.setSubmitted();
// add to the monitor
monitor.add(stats);
// add to the statistics
statistics.addJobStats(stats);
if (LOG.isDebugEnabled()) {
String jobID =
job.getJob().getConfiguration().get(Gridmix.ORIGINAL_JOB_ID);
LOG.debug("Original job '" + jobID + "' is being simulated as '"
+ job.getJob().getJobID() + "'");
LOG.debug("SUBMIT " + job + "@" + System.currentTimeMillis()
+ " (" + job.getJob().getJobID() + ")");
}
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Failed to submit " + job.getJob().getJobName() + " as " LOG.warn("Failed to submit " + job.getJob().getJobName() + " as "
+ job.getUgi(), e); + job.getUgi(), e);
@ -113,21 +140,21 @@ class JobSubmitter implements Gridmix.Component<GridmixJob> {
throw new InterruptedException("Failed to submit " + throw new InterruptedException("Failed to submit " +
job.getJob().getJobName()); job.getJob().getJobName());
} }
monitor.submissionFailed(job.getJob()); monitor.submissionFailed(stats);
} catch (ClassNotFoundException e) { } catch (ClassNotFoundException e) {
LOG.warn("Failed to submit " + job.getJob().getJobName(), e); LOG.warn("Failed to submit " + job.getJob().getJobName(), e);
monitor.submissionFailed(job.getJob()); monitor.submissionFailed(stats);
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
// abort execution, remove splits if nesc // abort execution, remove splits if nesc
// TODO release ThdLoc // TODO release ThdLoc
GridmixJob.pullDescription(job.id()); GridmixJob.pullDescription(job.id());
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
monitor.submissionFailed(job.getJob()); monitor.submissionFailed(stats);
} catch(Exception e) { } catch(Exception e) {
//Due to some exception job wasnt submitted. //Due to some exception job wasnt submitted.
LOG.info(" Job " + job.getJob().getJobID() + " submission failed " , e); LOG.info(" Job " + job.getJob().getJobID() + " submission failed " , e);
monitor.submissionFailed(job.getJob()); monitor.submissionFailed(stats);
} finally { } finally {
sem.release(); sem.release();
} }
@ -141,6 +168,8 @@ class JobSubmitter implements Gridmix.Component<GridmixJob> {
final boolean addToQueue = !shutdown; final boolean addToQueue = !shutdown;
if (addToQueue) { if (addToQueue) {
final SubmitTask task = new SubmitTask(job); final SubmitTask task = new SubmitTask(job);
LOG.info("Total number of queued jobs: "
+ (queueDepth - sem.availablePermits()));
sem.acquire(); sem.acquire();
try { try {
sched.execute(task); sched.execute(task);

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.gridmix.Gridmix.Component; import org.apache.hadoop.mapred.gridmix.Gridmix.Component;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.tools.rumen.JobStory; import org.apache.hadoop.tools.rumen.JobStory;
@ -43,12 +44,12 @@ import java.util.concurrent.locks.ReentrantLock;
/** /**
* Component collecting the stats required by other components * Component collecting the stats required by other components
* to make decisions. * to make decisions.
* Single thread Collector tries to collec the stats. * Single thread collector tries to collect the stats (currently cluster stats)
* Each of thread poll updates certain datastructure(Currently ClusterStats). * and caches it internally.
* Components interested in these datastructure, need to register. * Components interested in these stats need to register themselves and will get
* StatsCollector notifies each of the listeners. * notified either on every job completion event or some fixed time interval.
*/ */
public class Statistics implements Component<Job> { public class Statistics implements Component<Statistics.JobStats> {
public static final Log LOG = LogFactory.getLog(Statistics.class); public static final Log LOG = LogFactory.getLog(Statistics.class);
private final StatCollector statistics = new StatCollector(); private final StatCollector statistics = new StatCollector();
@ -62,10 +63,16 @@ public class Statistics implements Component<Job> {
private final List<StatListener<JobStats>> jobStatListeners = private final List<StatListener<JobStats>> jobStatListeners =
new CopyOnWriteArrayList<StatListener<JobStats>>(); new CopyOnWriteArrayList<StatListener<JobStats>>();
//List of jobids and noofMaps for each job // A map of job-sequence-id to job-stats of submitted jobs
private static final Map<Integer, JobStats> jobMaps = private static final Map<Integer, JobStats> submittedJobsMap =
new ConcurrentHashMap<Integer,JobStats>(); new ConcurrentHashMap<Integer, JobStats>();
// total number of map tasks submitted
private static volatile int numMapsSubmitted = 0;
// total number of reduce tasks submitted
private static volatile int numReducesSubmitted = 0;
private int completedJobsInCurrentInterval = 0; private int completedJobsInCurrentInterval = 0;
private final int jtPollingInterval; private final int jtPollingInterval;
private volatile boolean shutdown = false; private volatile boolean shutdown = false;
@ -92,41 +99,65 @@ public class Statistics implements Component<Job> {
this.startFlag = startFlag; this.startFlag = startFlag;
} }
public void addJobStats(Job job, JobStory jobdesc) { /**
* Generates a job stats.
*/
public static JobStats generateJobStats(Job job, JobStory jobdesc) {
int seq = GridmixJob.getJobSeqId(job); int seq = GridmixJob.getJobSeqId(job);
if (seq < 0) { // bail out if job description is missing for a job to be simulated
LOG.info("Not tracking job " + job.getJobName() if (seq >= 0 && jobdesc == null) {
+ " as seq id is less than zero: " + seq); throw new IllegalArgumentException("JobStory not available for job "
return; + job.getJobID());
} }
int maps = 0; int maps = -1;
int reds = 0; int reds = -1;
if (jobdesc == null) { if (jobdesc != null) {
throw new IllegalArgumentException( // Note that the ZombieJob will return a >= 0 value
" JobStory not available for job " + job.getJobName());
} else {
maps = jobdesc.getNumberMaps(); maps = jobdesc.getNumberMaps();
reds = jobdesc.getNumberReduces(); reds = jobdesc.getNumberReduces();
} }
JobStats stats = new JobStats(maps, reds, job); return new JobStats(maps, reds, job);
jobMaps.put(seq,stats); }
/**
* Add a submitted job for monitoring.
*/
public void addJobStats(JobStats stats) {
int seq = GridmixJob.getJobSeqId(stats.getJob());
if (seq < 0) {
LOG.info("Not tracking job " + stats.getJob().getJobName()
+ " as seq id is less than zero: " + seq);
return;
}
submittedJobsMap.put(seq, stats);
numMapsSubmitted += stats.getNoOfMaps();
numReducesSubmitted += stats.getNoOfReds();
} }
/** /**
* Used by JobMonitor to add the completed job. * Used by JobMonitor to add the completed job.
*/ */
@Override @Override
public void add(Job job) { public void add(Statistics.JobStats job) {
//This thread will be notified initially by jobmonitor incase of //This thread will be notified initially by job-monitor incase of
//data generation. Ignore that as we are getting once the input is //data generation. Ignore that as we are getting once the input is
//generated. //generated.
if (!statistics.isAlive()) { if (!statistics.isAlive()) {
return; return;
} }
JobStats stat = jobMaps.remove(GridmixJob.getJobSeqId(job)); JobStats stat = submittedJobsMap.remove(GridmixJob.getJobSeqId(job.getJob()));
if (stat == null) return; // stat cannot be null
if (stat == null) {
LOG.error("[Statistics] Missing entry for job "
+ job.getJob().getJobID());
return;
}
// update the total number of submitted map/reduce task count
numMapsSubmitted -= stat.getNoOfMaps();
numReducesSubmitted -= stat.getNoOfReds();
completedJobsInCurrentInterval++; completedJobsInCurrentInterval++;
//check if we have reached the maximum level of job completions. //check if we have reached the maximum level of job completions.
@ -238,7 +269,7 @@ public class Statistics implements Component<Job> {
@Override @Override
public void shutdown() { public void shutdown() {
shutdown = true; shutdown = true;
jobMaps.clear(); submittedJobsMap.clear();
clusterStatlisteners.clear(); clusterStatlisteners.clear();
jobStatListeners.clear(); jobStatListeners.clear();
statistics.interrupt(); statistics.interrupt();
@ -247,7 +278,7 @@ public class Statistics implements Component<Job> {
@Override @Override
public void abort() { public void abort() {
shutdown = true; shutdown = true;
jobMaps.clear(); submittedJobsMap.clear();
clusterStatlisteners.clear(); clusterStatlisteners.clear();
jobStatListeners.clear(); jobStatListeners.clear();
statistics.interrupt(); statistics.interrupt();
@ -259,9 +290,10 @@ public class Statistics implements Component<Job> {
* TODO: In future we need to extend this to send more information. * TODO: In future we need to extend this to send more information.
*/ */
static class JobStats { static class JobStats {
private int noOfMaps; private final int noOfMaps;
private int noOfReds; private final int noOfReds;
private Job job; private JobStatus currentStatus;
private final Job job;
public JobStats(int noOfMaps,int numOfReds, Job job){ public JobStats(int noOfMaps,int numOfReds, Job job){
this.job = job; this.job = job;
@ -284,6 +316,20 @@ public class Statistics implements Component<Job> {
public Job getJob() { public Job getJob() {
return job; return job;
} }
/**
* Update the job statistics.
*/
public synchronized void updateJobStatus(JobStatus status) {
this.currentStatus = status;
}
/**
* Get the current job status.
*/
public synchronized JobStatus getJobStatus() {
return currentStatus;
}
} }
static class ClusterStats { static class ClusterStats {
@ -316,15 +362,28 @@ public class Statistics implements Component<Job> {
} }
int getNumRunningJob() { int getNumRunningJob() {
return jobMaps.size(); return submittedJobsMap.size();
} }
/** /**
* @return runningWatitingJobs * @return runningWatitingJobs
*/ */
static Collection<JobStats> getRunningJobStats() { static Collection<JobStats> getRunningJobStats() {
return jobMaps.values(); return submittedJobsMap.values();
} }
/**
* Returns the total number of submitted map tasks
*/
static int getSubmittedMapTasks() {
return numMapsSubmitted;
}
/**
* Returns the total number of submitted reduce tasks
*/
static int getSubmittedReduceTasks() {
return numReducesSubmitted;
}
} }
} }

View File

@ -25,11 +25,15 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.gridmix.Statistics.ClusterStats; import org.apache.hadoop.mapred.gridmix.Statistics.ClusterStats;
import org.apache.hadoop.mapred.gridmix.Statistics.JobStats; import org.apache.hadoop.mapred.gridmix.Statistics.JobStats;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.tools.rumen.JobStory; import org.apache.hadoop.tools.rumen.JobStory;
import org.apache.hadoop.tools.rumen.JobStoryProducer; import org.apache.hadoop.tools.rumen.JobStoryProducer;
import java.io.IOException; import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -87,6 +91,13 @@ public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
"gridmix.throttle.jobs-to-tracker-ratio"; "gridmix.throttle.jobs-to-tracker-ratio";
final float maxJobTrackerRatio; final float maxJobTrackerRatio;
/**
* Represents a list of blacklisted jobs. Jobs are blacklisted when either
* they are complete or their status cannot be obtained. Stress mode will
* ignore blacklisted jobs from its overload computation.
*/
private Set<JobID> blacklistedJobs = new HashSet<JobID>();
/** /**
* Creating a new instance does not start the thread. * Creating a new instance does not start the thread.
* *
@ -145,42 +156,66 @@ public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
try { try {
startFlag.await(); startFlag.await();
if (Thread.currentThread().isInterrupted()) { if (Thread.currentThread().isInterrupted()) {
LOG.warn("[STRESS] Interrupted before start!. Exiting..");
return; return;
} }
LOG.info("START STRESS @ " + System.currentTimeMillis()); LOG.info("START STRESS @ " + System.currentTimeMillis());
while (!Thread.currentThread().isInterrupted()) { while (!Thread.currentThread().isInterrupted()) {
try { try {
while (loadStatus.overloaded()) { while (loadStatus.overloaded()) {
// update the overload status
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Cluster overloaded in run! Sleeping..."); LOG.debug("Updating the overload status.");
} }
// sleep
try { try {
Thread.sleep(1000); checkLoadAndGetSlotsToBackfill();
} catch (InterruptedException ie) { } catch (IOException ioe) {
LOG.warn("[STRESS] Check failed!", ioe);
return; return;
} }
// if the cluster is still overloaded, then sleep
if (loadStatus.overloaded()) {
if (LOG.isDebugEnabled()) {
LOG.debug("[STRESS] Cluster overloaded in run! Sleeping...");
}
// sleep
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
LOG.warn("[STRESS] Interrupted while sleeping! Exiting.", ie);
return;
}
}
} }
while (!loadStatus.overloaded()) { while (!loadStatus.overloaded()) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Cluster underloaded in run! Stressing..."); LOG.debug("[STRESS] Cluster underloaded in run! Stressing...");
} }
try { try {
//TODO This in-line read can block submission for large jobs. //TODO This in-line read can block submission for large jobs.
final JobStory job = getNextJobFiltered(); final JobStory job = getNextJobFiltered();
if (null == job) { if (null == job) {
LOG.warn("[STRESS] Finished consuming the input trace. "
+ "Exiting..");
return; return;
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Job Selected: " + job.getJobID()); LOG.debug("Job Selected: " + job.getJobID());
} }
submitter.add(
jobCreator.createGridmixJob( UserGroupInformation ugi =
conf, 0L, job, scratch, UserGroupInformation.createRemoteUser(job.getUser());
userResolver.getTargetUgi( UserGroupInformation tgtUgi = userResolver.getTargetUgi(ugi);
UserGroupInformation.createRemoteUser(job.getUser())), GridmixJob tJob =
sequence.getAndIncrement())); jobCreator.createGridmixJob(conf, 0L, job, scratch,
tgtUgi, sequence.getAndIncrement());
// submit the job
submitter.add(tJob);
// TODO: We need to take care of scenario when one map/reduce // TODO: We need to take care of scenario when one map/reduce
// takes more than 1 slot. // takes more than 1 slot.
@ -198,7 +233,7 @@ public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
loadStatus.decrementJobLoad(1); loadStatus.decrementJobLoad(1);
} catch (IOException e) { } catch (IOException e) {
LOG.error("Error while submitting the job ", e); LOG.error("[STRESS] Error while submitting the job ", e);
error = e; error = e;
return; return;
} }
@ -209,6 +244,7 @@ public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
} }
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.error("[STRESS] Interrupted in the main block!", e);
return; return;
} finally { } finally {
IOUtils.cleanup(null, jobProducer); IOUtils.cleanup(null, jobProducer);
@ -224,9 +260,17 @@ public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
*/ */
@Override @Override
public void update(Statistics.ClusterStats item) { public void update(Statistics.ClusterStats item) {
ClusterStatus clusterMetrics = item.getStatus(); ClusterStatus clusterStatus = item.getStatus();
try { try {
checkLoadAndGetSlotsToBackfill(item, clusterMetrics); // update the max cluster map/reduce task capacity
loadStatus.updateMapCapacity(clusterStatus.getMaxMapTasks());
loadStatus.updateReduceCapacity(clusterStatus.getMaxReduceTasks());
int numTrackers = clusterStatus.getTaskTrackers();
int jobLoad =
(int) (maxJobTrackerRatio * numTrackers) - item.getNumRunningJob();
loadStatus.updateJobLoad(jobLoad);
} catch (Exception e) { } catch (Exception e) {
LOG.error("Couldn't get the new Status",e); LOG.error("Couldn't get the new Status",e);
} }
@ -258,22 +302,8 @@ public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
* @param clusterStatus Cluster status * @param clusterStatus Cluster status
* @throws java.io.IOException * @throws java.io.IOException
*/ */
private void checkLoadAndGetSlotsToBackfill( protected void checkLoadAndGetSlotsToBackfill()
ClusterStats stats, ClusterStatus clusterStatus) throws IOException, InterruptedException { throws IOException, InterruptedException {
// update the max cluster capacity incase its updated
int mapCapacity = clusterStatus.getMaxMapTasks();
loadStatus.updateMapCapacity(mapCapacity);
int reduceCapacity = clusterStatus.getMaxReduceTasks();
loadStatus.updateReduceCapacity(reduceCapacity);
int numTrackers = clusterStatus.getTaskTrackers();
int jobLoad =
(int) (maxJobTrackerRatio * numTrackers) - stats.getNumRunningJob();
loadStatus.updateJobLoad(jobLoad);
if (loadStatus.getJobLoad() <= 0) { if (loadStatus.getJobLoad() <= 0) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(System.currentTimeMillis() + " [JobLoad] Overloaded is " LOG.debug(System.currentTimeMillis() + " [JobLoad] Overloaded is "
@ -283,17 +313,143 @@ public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
return; // stop calculation because we know it is overloaded. return; // stop calculation because we know it is overloaded.
} }
float incompleteMapTasks = 0; // include pending & running map tasks. int mapCapacity = loadStatus.getMapCapacity();
for (JobStats job : ClusterStats.getRunningJobStats()) { int reduceCapacity = loadStatus.getReduceCapacity();
float mapProgress = job.getJob().mapProgress();
int noOfMaps = job.getNoOfMaps(); // return if the cluster status is not set
incompleteMapTasks += if (mapCapacity < 0 || reduceCapacity < 0) {
calcEffectiveIncompleteMapTasks(mapCapacity, noOfMaps, mapProgress); // note that, by default, the overload status is true
// missing cluster status will result into blocking of job submission
return;
} }
int mapSlotsBackFill = // Determine the max permissible map & reduce task load
(int) ((overloadMapTaskMapSlotRatio * mapCapacity) - incompleteMapTasks); int maxMapLoad = (int) (overloadMapTaskMapSlotRatio * mapCapacity);
loadStatus.updateMapLoad(mapSlotsBackFill); int maxReduceLoad =
(int) (overloadReduceTaskReduceSlotRatio * reduceCapacity);
// compute the total number of map & reduce tasks submitted
int totalMapTasks = ClusterStats.getSubmittedMapTasks();
int totalReduceTasks = ClusterStats.getSubmittedReduceTasks();
if (LOG.isDebugEnabled()) {
LOG.debug("Total submitted map tasks: " + totalMapTasks);
LOG.debug("Total submitted reduce tasks: " + totalReduceTasks);
LOG.debug("Max map load: " + maxMapLoad);
LOG.debug("Max reduce load: " + maxReduceLoad);
}
// generate a pessimistic bound on the max running+pending map tasks
// this check is to avoid the heavy-duty actual map load calculation
int mapSlotsBackFill = (int) (maxMapLoad - totalMapTasks);
// generate a pessimistic bound on the max running+pending reduce tasks
// this check is to avoid the heavy-duty actual reduce load calculation
int reduceSlotsBackFill = (int) (maxReduceLoad - totalReduceTasks);
// maintain a list of seen job ids
Set<JobID> seenJobIDs = new HashSet<JobID>();
// check if the total number of submitted map/reduce tasks exceeds the
// permissible limit
if (totalMapTasks > maxMapLoad || totalReduceTasks > maxReduceLoad) {
// if yes, calculate the real load
float incompleteMapTasks = 0; // include pending & running map tasks.
float incompleteReduceTasks = 0; // include pending & running reduce tasks
for (JobStats job : ClusterStats.getRunningJobStats()) {
JobID id = job.getJob().getJobID();
seenJobIDs.add(id);
// Note that this is a hack! Ideally, ClusterStats.getRunningJobStats()
// should be smart enough to take care of completed jobs.
if (blacklistedJobs.contains(id)) {
LOG.warn("Ignoring blacklisted job: " + id);
continue;
}
int noOfMaps = job.getNoOfMaps();
int noOfReduces = job.getNoOfReds();
// consider polling for jobs where maps>0 and reds>0
// TODO: What about setup/cleanup tasks for cases where m=0 and r=0
// What otherwise?
if (noOfMaps > 0 || noOfReduces > 0) {
// get the job's status
JobStatus status = job.getJobStatus();
// blacklist completed jobs and continue
if (status != null && status.isJobComplete()) {
LOG.warn("Blacklisting completed job: " + id);
blacklistedJobs.add(id);
continue;
}
// get the map and reduce tasks' progress
float mapProgress = 0f;
float reduceProgress = 0f;
// check if the status is missing (this can happen for unpolled jobs)
if (status != null) {
mapProgress = status.getMapProgress();
reduceProgress = status.getReduceProgress();
}
incompleteMapTasks +=
calcEffectiveIncompleteMapTasks(mapCapacity, noOfMaps, mapProgress);
// bail out early
int currentMapSlotsBackFill = (int) (maxMapLoad - incompleteMapTasks);
if (currentMapSlotsBackFill <= 0) {
// reset the reduce task load since we are bailing out
incompleteReduceTasks = totalReduceTasks;
if (LOG.isDebugEnabled()) {
LOG.debug("Terminating overload check due to high map load.");
}
break;
}
// compute the real reduce load
if (noOfReduces > 0) {
incompleteReduceTasks +=
calcEffectiveIncompleteReduceTasks(reduceCapacity, noOfReduces,
reduceProgress);
}
// bail out early
int currentReduceSlotsBackFill =
(int) (maxReduceLoad - incompleteReduceTasks);
if (currentReduceSlotsBackFill <= 0) {
// reset the map task load since we are bailing out
incompleteMapTasks = totalMapTasks;
if (LOG.isDebugEnabled()) {
LOG.debug("Terminating overload check due to high reduce load.");
}
break;
}
} else {
LOG.warn("Blacklisting empty job: " + id);
blacklistedJobs.add(id);
}
}
// calculate the real map load on the cluster
mapSlotsBackFill = (int) (maxMapLoad - incompleteMapTasks);
// calculate the real reduce load on the cluster
reduceSlotsBackFill = (int)(maxReduceLoad - incompleteReduceTasks);
// clean up the backlisted set to keep the memory footprint minimal
// retain only the jobs that are seen in this cycle
blacklistedJobs.retainAll(seenJobIDs);
if (LOG.isDebugEnabled() && blacklistedJobs.size() > 0) {
LOG.debug("Blacklisted jobs count: " + blacklistedJobs.size());
}
}
// update
loadStatus.updateMapLoad(mapSlotsBackFill);
loadStatus.updateReduceLoad(reduceSlotsBackFill);
if (loadStatus.getMapLoad() <= 0) { if (loadStatus.getMapLoad() <= 0) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -303,23 +459,7 @@ public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
} }
return; // stop calculation because we know it is overloaded. return; // stop calculation because we know it is overloaded.
} }
float incompleteReduceTasks = 0; // include pending & running reduce tasks.
for (JobStats job : ClusterStats.getRunningJobStats()) {
// Cached the num-reds value in JobStats
int noOfReduces = job.getNoOfReds();
if (noOfReduces > 0) {
float reduceProgress = job.getJob().reduceProgress();
incompleteReduceTasks +=
calcEffectiveIncompleteReduceTasks(reduceCapacity, noOfReduces,
reduceProgress);
}
}
int reduceSlotsBackFill =
(int)((overloadReduceTaskReduceSlotRatio * reduceCapacity)
- incompleteReduceTasks);
loadStatus.updateReduceLoad(reduceSlotsBackFill);
if (loadStatus.getReduceLoad() <= 0) { if (loadStatus.getReduceLoad() <= 0) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(System.currentTimeMillis() + " [REDUCE-LOAD] Overloaded is " LOG.debug(System.currentTimeMillis() + " [REDUCE-LOAD] Overloaded is "
@ -445,7 +585,7 @@ public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
|| (numJobsBackfill <= 0)); || (numJobsBackfill <= 0));
} }
public synchronized boolean overloaded() { public boolean overloaded() {
return overloaded.get(); return overloaded.get();
} }

View File

@ -0,0 +1,217 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.gridmix;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.gridmix.Statistics.ClusterStats;
import org.apache.hadoop.mapred.gridmix.Statistics.JobStats;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.tools.rumen.JobStory;
import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
import org.apache.hadoop.tools.rumen.TaskInfo;
import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
import org.junit.Test;
import static org.junit.Assert.*;
/**
* Test the Gridmix's {@link Statistics} class.
*/
public class TestGridmixStatistics {
/**
* Test {@link Statistics.JobStats}.
*/
@Test
@SuppressWarnings("deprecation")
public void testJobStats() throws Exception {
Job job = new Job() {};
JobStats stats = new JobStats(1, 2, job);
assertEquals("Incorrect num-maps", 1, stats.getNoOfMaps());
assertEquals("Incorrect num-reds", 2, stats.getNoOfReds());
assertTrue("Incorrect job", job == stats.getJob());
assertNull("Unexpected job status", stats.getJobStatus());
// add a new status
JobStatus status = new JobStatus();
stats.updateJobStatus(status);
assertNotNull("Missing job status", stats.getJobStatus());
assertTrue("Incorrect job status", status == stats.getJobStatus());
}
private static JobStory getCustomJobStory(final int numMaps,
final int numReds) {
return new JobStory() {
@Override
public InputSplit[] getInputSplits() {
return null;
}
@Override
public JobConf getJobConf() {
return null;
}
@Override
public JobID getJobID() {
return null;
}
@Override
public TaskAttemptInfo getMapTaskAttemptInfoAdjusted(int arg0, int arg1,
int arg2) {
return null;
}
@Override
public String getName() {
return null;
}
@Override
public int getNumberMaps() {
return numMaps;
}
@Override
public int getNumberReduces() {
return numReds;
}
@Override
public Values getOutcome() {
return null;
}
@Override
public String getQueueName() {
return null;
}
@Override
public long getSubmissionTime() {
return 0;
}
@Override
public TaskAttemptInfo getTaskAttemptInfo(TaskType arg0, int arg1,
int arg2) {
return null;
}
@Override
public TaskInfo getTaskInfo(TaskType arg0, int arg1) {
return null;
}
@Override
public String getUser() {
return null;
}
};
}
/**
* Test {@link Statistics}.
*/
@Test
@SuppressWarnings("deprecation")
public void testStatistics() throws Exception {
// test job stats generation
Configuration conf = new Configuration();
// test dummy jobs like data-generation etc
Job job = new Job(conf) {
};
JobStats stats = Statistics.generateJobStats(job, null);
testJobStats(stats, -1, -1, null, job);
// add a job desc with 2 map and 1 reduce task
conf.setInt(GridmixJob.GRIDMIX_JOB_SEQ, 1);
// test dummy jobs like data-generation etc
job = new Job(conf) {
};
JobStory zjob = getCustomJobStory(2, 1);
stats = Statistics.generateJobStats(job, zjob);
testJobStats(stats, 2, 1, null, job);
// add a job status
JobStatus jStatus = new JobStatus();
stats.updateJobStatus(jStatus);
testJobStats(stats, 2, 1, jStatus, job);
// start the statistics
CountDownLatch startFlag = new CountDownLatch(1); // prevents the collector
// thread from starting
Statistics statistics = new Statistics(new JobConf(), 0, startFlag);
statistics.start();
testClusterStats(0, 0, 0);
// add to the statistics object
statistics.addJobStats(stats);
testClusterStats(2, 1, 1);
// add another job
JobStory zjob2 = getCustomJobStory(10, 5);
conf.setInt(GridmixJob.GRIDMIX_JOB_SEQ, 2);
job = new Job(conf) {
};
JobStats stats2 = Statistics.generateJobStats(job, zjob2);
statistics.addJobStats(stats2);
testClusterStats(12, 6, 2);
// finish off one job
statistics.add(stats2);
testClusterStats(2, 1, 1);
// finish off the other job
statistics.add(stats);
testClusterStats(0, 0, 0);
statistics.shutdown();
}
// test the job stats
private static void testJobStats(JobStats stats, int numMaps, int numReds,
JobStatus jStatus, Job job) {
assertEquals("Incorrect num map tasks", numMaps, stats.getNoOfMaps());
assertEquals("Incorrect num reduce tasks", numReds, stats.getNoOfReds());
if (job != null) {
assertNotNull("Missing job", job);
}
// check running job
assertTrue("Incorrect job", job == stats.getJob());
if (jStatus != null) {
assertNotNull("Missing job status", jStatus);
}
// check job stats
assertTrue("Incorrect job status", jStatus == stats.getJobStatus());
}
// test the cluster stats
private static void testClusterStats(int numSubmittedMapTasks,
int numSubmittedReduceTasks,
int numSubmittedJobs) {
assertEquals("Incorrect count of total number of submitted map tasks",
numSubmittedMapTasks, ClusterStats.getSubmittedMapTasks());
assertEquals("Incorrect count of total number of submitted reduce tasks",
numSubmittedReduceTasks,
ClusterStats.getSubmittedReduceTasks());
assertEquals("Incorrect submitted jobs",
numSubmittedJobs, ClusterStats.getRunningJobStats().size());
}
}

View File

@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPInputStream; import java.util.zip.GZIPInputStream;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -96,7 +97,7 @@ public class TestGridmixSubmission {
private final BlockingQueue<Job> retiredJobs; private final BlockingQueue<Job> retiredJobs;
public TestMonitor(int expected, Statistics stats) { public TestMonitor(int expected, Statistics stats) {
super(stats); super(5, TimeUnit.SECONDS, stats, 1);
this.expected = expected; this.expected = expected;
retiredJobs = new LinkedBlockingQueue<Job>(); retiredJobs = new LinkedBlockingQueue<Job>();
} }
@ -349,7 +350,7 @@ public class TestGridmixSubmission {
} }
@Override @Override
protected JobMonitor createJobMonitor(Statistics stats) { protected JobMonitor createJobMonitor(Statistics stats, Configuration conf){
monitor = new TestMonitor(NJOBS + 1, stats); monitor = new TestMonitor(NJOBS + 1, stats);
return monitor; return monitor;
} }

View File

@ -193,7 +193,7 @@ public class TestGridmixSummary {
es.update(null); es.update(null);
assertEquals("ExecutionSummarizer init failed", 0, assertEquals("ExecutionSummarizer init failed", 0,
es.getSimulationStartTime()); es.getSimulationStartTime());
testExecutionSummarizer(0, 0, 0, 0, 0, 0, es); testExecutionSummarizer(0, 0, 0, 0, 0, 0, 0, es);
long simStartTime = System.currentTimeMillis(); long simStartTime = System.currentTimeMillis();
es.start(null); es.start(null);
@ -203,14 +203,24 @@ public class TestGridmixSummary {
es.getSimulationStartTime() <= System.currentTimeMillis()); es.getSimulationStartTime() <= System.currentTimeMillis());
// test with job stats // test with job stats
JobStats stats = generateFakeJobStats(1, 10, true); JobStats stats = generateFakeJobStats(1, 10, true, false);
es.update(stats); es.update(stats);
testExecutionSummarizer(1, 10, 0, 1, 1, 0, es); testExecutionSummarizer(1, 10, 0, 1, 1, 0, 0, es);
// test with failed job // test with failed job
stats = generateFakeJobStats(5, 1, false); stats = generateFakeJobStats(5, 1, false, false);
es.update(stats); es.update(stats);
testExecutionSummarizer(6, 11, 0, 2, 1, 1, es); testExecutionSummarizer(6, 11, 0, 2, 1, 1, 0, es);
// test with successful but lost job
stats = generateFakeJobStats(1, 1, true, true);
es.update(stats);
testExecutionSummarizer(7, 12, 0, 3, 1, 1, 1, es);
// test with failed but lost job
stats = generateFakeJobStats(2, 2, false, true);
es.update(stats);
testExecutionSummarizer(9, 14, 0, 4, 1, 1, 2, es);
// test finalize // test finalize
// define a fake job factory // define a fake job factory
@ -306,7 +316,7 @@ public class TestGridmixSummary {
// test the ExecutionSummarizer // test the ExecutionSummarizer
private static void testExecutionSummarizer(int numMaps, int numReds, private static void testExecutionSummarizer(int numMaps, int numReds,
int totalJobsInTrace, int totalJobSubmitted, int numSuccessfulJob, int totalJobsInTrace, int totalJobSubmitted, int numSuccessfulJob,
int numFailedJobs, ExecutionSummarizer es) { int numFailedJobs, int numLostJobs, ExecutionSummarizer es) {
assertEquals("ExecutionSummarizer test failed [num-maps]", assertEquals("ExecutionSummarizer test failed [num-maps]",
numMaps, es.getNumMapTasksLaunched()); numMaps, es.getNumMapTasksLaunched());
assertEquals("ExecutionSummarizer test failed [num-reducers]", assertEquals("ExecutionSummarizer test failed [num-reducers]",
@ -319,12 +329,14 @@ public class TestGridmixSummary {
numSuccessfulJob, es.getNumSuccessfulJobs()); numSuccessfulJob, es.getNumSuccessfulJobs());
assertEquals("ExecutionSummarizer test failed [num-failed jobs]", assertEquals("ExecutionSummarizer test failed [num-failed jobs]",
numFailedJobs, es.getNumFailedJobs()); numFailedJobs, es.getNumFailedJobs());
assertEquals("ExecutionSummarizer test failed [num-lost jobs]",
numLostJobs, es.getNumLostJobs());
} }
// generate fake job stats // generate fake job stats
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
private static JobStats generateFakeJobStats(final int numMaps, private static JobStats generateFakeJobStats(final int numMaps,
final int numReds, final boolean isSuccessful) final int numReds, final boolean isSuccessful, final boolean lost)
throws IOException { throws IOException {
// A fake job // A fake job
Job fakeJob = new Job() { Job fakeJob = new Job() {
@ -335,6 +347,9 @@ public class TestGridmixSummary {
@Override @Override
public boolean isSuccessful() throws IOException, InterruptedException { public boolean isSuccessful() throws IOException, InterruptedException {
if (lost) {
throw new IOException("Test failure!");
}
return isSuccessful; return isSuccessful;
}; };
}; };

View File

@ -42,6 +42,7 @@ import java.util.List;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*; import static org.junit.Assert.*;
@ -74,7 +75,7 @@ public class TestSleepJob {
private final int expected; private final int expected;
public TestMonitor(int expected, Statistics stats) { public TestMonitor(int expected, Statistics stats) {
super(stats); super(5, TimeUnit.SECONDS, stats, 1);
this.expected = expected; this.expected = expected;
retiredJobs = new LinkedBlockingQueue<Job>(); retiredJobs = new LinkedBlockingQueue<Job>();
} }
@ -102,7 +103,7 @@ public class TestSleepJob {
private TestMonitor monitor; private TestMonitor monitor;
@Override @Override
protected JobMonitor createJobMonitor(Statistics stats) { protected JobMonitor createJobMonitor(Statistics stats, Configuration c) {
monitor = new TestMonitor(NJOBS + 1, stats); monitor = new TestMonitor(NJOBS + 1, stats);
return monitor; return monitor;
} }

View File

@ -206,6 +206,22 @@ hadoop jar &lt;gridmix-jar&gt; org.apache.hadoop.mapred.gridmix.Gridmix \
options using the values obtained from the original task (i.e via options using the values obtained from the original task (i.e via
trace). trace).
</td> </td>
</tr>
<tr>
<td>
<code>gridmix.job-monitor.thread-count</code>
</td>
<td>Total number of threads to use for polling for jobs' status. The
default value is 1.
</td>
</tr>
<tr>
<td>
<code>gridmix.job-monitor.sleep-time-ms</code>
</td>
<td>The time each Gridmix status poller thread will sleep before
starting the next cycle. The default value is 500 milliseconds.
</td>
</tr> </tr>
</table> </table>
</section> </section>