MAPREDUCE-4942. mapreduce.Job has a bunch of methods that throw InterruptedException so its incompatible with MR1. (rkanter via tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1480754 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Alejandro Abdelnur 2013-05-09 20:11:33 +00:00
parent 6aa0edc793
commit c56c50df3f
4 changed files with 80 additions and 84 deletions

View File

@ -233,6 +233,9 @@ Release 2.0.5-beta - UNRELEASED
MAPREDUCE-5226. Handling YarnRemoteException separately from IOException in MAPREDUCE-5226. Handling YarnRemoteException separately from IOException in
MR App's use of AMRMProtocol after YARN-630. (Xuan Gong via vinodkv) MR App's use of AMRMProtocol after YARN-630. (Xuan Gong via vinodkv)
MAPREDUCE-4942. mapreduce.Job has a bunch of methods that throw
InterruptedException so its incompatible with MR1. (rkanter via tucu)
Release 2.0.4-alpha - 2013-04-25 Release 2.0.4-alpha - 2013-04-25
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -209,11 +209,7 @@ public class JobClient extends CLI {
* completed. * completed.
*/ */
public float mapProgress() throws IOException { public float mapProgress() throws IOException {
try {
return job.mapProgress(); return job.mapProgress();
} catch (InterruptedException ie) {
throw new IOException(ie);
}
} }
/** /**
@ -221,11 +217,7 @@ public class JobClient extends CLI {
* completed. * completed.
*/ */
public float reduceProgress() throws IOException { public float reduceProgress() throws IOException {
try {
return job.reduceProgress(); return job.reduceProgress();
} catch (InterruptedException ie) {
throw new IOException(ie);
}
} }
/** /**
@ -245,33 +237,21 @@ public class JobClient extends CLI {
* completed. * completed.
*/ */
public float setupProgress() throws IOException { public float setupProgress() throws IOException {
try {
return job.setupProgress(); return job.setupProgress();
} catch (InterruptedException ie) {
throw new IOException(ie);
}
} }
/** /**
* Returns immediately whether the whole job is done yet or not. * Returns immediately whether the whole job is done yet or not.
*/ */
public synchronized boolean isComplete() throws IOException { public synchronized boolean isComplete() throws IOException {
try {
return job.isComplete(); return job.isComplete();
} catch (InterruptedException ie) {
throw new IOException(ie);
}
} }
/** /**
* True iff job completed successfully. * True iff job completed successfully.
*/ */
public synchronized boolean isSuccessful() throws IOException { public synchronized boolean isSuccessful() throws IOException {
try {
return job.isSuccessful(); return job.isSuccessful();
} catch (InterruptedException ie) {
throw new IOException(ie);
}
} }
/** /**
@ -302,11 +282,7 @@ public class JobClient extends CLI {
* Tells the service to terminate the current job. * Tells the service to terminate the current job.
*/ */
public synchronized void killJob() throws IOException { public synchronized void killJob() throws IOException {
try {
job.killJob(); job.killJob();
} catch (InterruptedException ie) {
throw new IOException(ie);
}
} }
@ -331,15 +307,11 @@ public class JobClient extends CLI {
*/ */
public synchronized void killTask(TaskAttemptID taskId, public synchronized void killTask(TaskAttemptID taskId,
boolean shouldFail) throws IOException { boolean shouldFail) throws IOException {
try {
if (shouldFail) { if (shouldFail) {
job.failTask(taskId); job.failTask(taskId);
} else { } else {
job.killTask(taskId); job.killTask(taskId);
} }
} catch (InterruptedException ie) {
throw new IOException(ie);
}
} }
/** @deprecated Applications should rather use {@link #killTask(TaskAttemptID, boolean)}*/ /** @deprecated Applications should rather use {@link #killTask(TaskAttemptID, boolean)}*/
@ -378,16 +350,12 @@ public class JobClient extends CLI {
* Returns the counters for this job * Returns the counters for this job
*/ */
public Counters getCounters() throws IOException { public Counters getCounters() throws IOException {
try {
Counters result = null; Counters result = null;
org.apache.hadoop.mapreduce.Counters temp = job.getCounters(); org.apache.hadoop.mapreduce.Counters temp = job.getCounters();
if(temp != null) { if(temp != null) {
result = Counters.downgrade(temp); result = Counters.downgrade(temp);
} }
return result; return result;
} catch (InterruptedException ie) {
throw new IOException(ie);
}
} }
@Override @Override

View File

@ -296,7 +296,7 @@ public class Job extends JobContextImpl implements JobContext {
* it, if necessary * it, if necessary
*/ */
synchronized void ensureFreshStatus() synchronized void ensureFreshStatus()
throws IOException, InterruptedException { throws IOException {
if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) { if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) {
updateStatus(); updateStatus();
} }
@ -306,13 +306,18 @@ public class Job extends JobContextImpl implements JobContext {
* immediately * immediately
* @throws IOException * @throws IOException
*/ */
synchronized void updateStatus() throws IOException, InterruptedException { synchronized void updateStatus() throws IOException {
try {
this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
@Override @Override
public JobStatus run() throws IOException, InterruptedException { public JobStatus run() throws IOException, InterruptedException {
return cluster.getClient().getJobStatus(status.getJobID()); return cluster.getClient().getJobStatus(status.getJobID());
} }
}); });
}
catch (InterruptedException ie) {
throw new IOException(ie);
}
if (this.status == null) { if (this.status == null) {
throw new IOException("Job status not available "); throw new IOException("Job status not available ");
} }
@ -537,7 +542,7 @@ public class Job extends JobContextImpl implements JobContext {
* @return the progress of the job's map-tasks. * @return the progress of the job's map-tasks.
* @throws IOException * @throws IOException
*/ */
public float mapProgress() throws IOException, InterruptedException { public float mapProgress() throws IOException {
ensureState(JobState.RUNNING); ensureState(JobState.RUNNING);
ensureFreshStatus(); ensureFreshStatus();
return status.getMapProgress(); return status.getMapProgress();
@ -550,7 +555,7 @@ public class Job extends JobContextImpl implements JobContext {
* @return the progress of the job's reduce-tasks. * @return the progress of the job's reduce-tasks.
* @throws IOException * @throws IOException
*/ */
public float reduceProgress() throws IOException, InterruptedException { public float reduceProgress() throws IOException {
ensureState(JobState.RUNNING); ensureState(JobState.RUNNING);
ensureFreshStatus(); ensureFreshStatus();
return status.getReduceProgress(); return status.getReduceProgress();
@ -576,7 +581,7 @@ public class Job extends JobContextImpl implements JobContext {
* @return the progress of the job's setup-tasks. * @return the progress of the job's setup-tasks.
* @throws IOException * @throws IOException
*/ */
public float setupProgress() throws IOException, InterruptedException { public float setupProgress() throws IOException {
ensureState(JobState.RUNNING); ensureState(JobState.RUNNING);
ensureFreshStatus(); ensureFreshStatus();
return status.getSetupProgress(); return status.getSetupProgress();
@ -589,7 +594,7 @@ public class Job extends JobContextImpl implements JobContext {
* @return <code>true</code> if the job is complete, else <code>false</code>. * @return <code>true</code> if the job is complete, else <code>false</code>.
* @throws IOException * @throws IOException
*/ */
public boolean isComplete() throws IOException, InterruptedException { public boolean isComplete() throws IOException {
ensureState(JobState.RUNNING); ensureState(JobState.RUNNING);
updateStatus(); updateStatus();
return status.isJobComplete(); return status.isJobComplete();
@ -601,7 +606,7 @@ public class Job extends JobContextImpl implements JobContext {
* @return <code>true</code> if the job succeeded, else <code>false</code>. * @return <code>true</code> if the job succeeded, else <code>false</code>.
* @throws IOException * @throws IOException
*/ */
public boolean isSuccessful() throws IOException, InterruptedException { public boolean isSuccessful() throws IOException {
ensureState(JobState.RUNNING); ensureState(JobState.RUNNING);
updateStatus(); updateStatus();
return status.getState() == JobStatus.State.SUCCEEDED; return status.getState() == JobStatus.State.SUCCEEDED;
@ -613,10 +618,15 @@ public class Job extends JobContextImpl implements JobContext {
* *
* @throws IOException * @throws IOException
*/ */
public void killJob() throws IOException, InterruptedException { public void killJob() throws IOException {
ensureState(JobState.RUNNING); ensureState(JobState.RUNNING);
try {
cluster.getClient().killJob(getJobID()); cluster.getClient().killJob(getJobID());
} }
catch (InterruptedException ie) {
throw new IOException(ie);
}
}
/** /**
* Set the priority of a running job. * Set the priority of a running job.
@ -673,7 +683,7 @@ public class Job extends JobContextImpl implements JobContext {
try { try {
return getTaskCompletionEvents(startFrom, 10); return getTaskCompletionEvents(startFrom, 10);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
throw new RuntimeException(ie); throw new IOException(ie);
} }
} }
@ -684,14 +694,19 @@ public class Job extends JobContextImpl implements JobContext {
* @throws IOException * @throws IOException
*/ */
public boolean killTask(final TaskAttemptID taskId) public boolean killTask(final TaskAttemptID taskId)
throws IOException, InterruptedException { throws IOException {
ensureState(JobState.RUNNING); ensureState(JobState.RUNNING);
try {
return ugi.doAs(new PrivilegedExceptionAction<Boolean>() { return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
public Boolean run() throws IOException, InterruptedException { public Boolean run() throws IOException, InterruptedException {
return cluster.getClient().killTask(taskId, false); return cluster.getClient().killTask(taskId, false);
} }
}); });
} }
catch (InterruptedException ie) {
throw new IOException(ie);
}
}
/** /**
* Fail indicated task attempt. * Fail indicated task attempt.
@ -700,8 +715,9 @@ public class Job extends JobContextImpl implements JobContext {
* @throws IOException * @throws IOException
*/ */
public boolean failTask(final TaskAttemptID taskId) public boolean failTask(final TaskAttemptID taskId)
throws IOException, InterruptedException { throws IOException {
ensureState(JobState.RUNNING); ensureState(JobState.RUNNING);
try {
return ugi.doAs(new PrivilegedExceptionAction<Boolean>() { return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
@Override @Override
public Boolean run() throws IOException, InterruptedException { public Boolean run() throws IOException, InterruptedException {
@ -709,6 +725,10 @@ public class Job extends JobContextImpl implements JobContext {
} }
}); });
} }
catch (InterruptedException ie) {
throw new IOException(ie);
}
}
/** /**
* Gets the counters for this job. May return null if the job has been * Gets the counters for this job. May return null if the job has been
@ -718,8 +738,9 @@ public class Job extends JobContextImpl implements JobContext {
* @throws IOException * @throws IOException
*/ */
public Counters getCounters() public Counters getCounters()
throws IOException, InterruptedException { throws IOException {
ensureState(JobState.RUNNING); ensureState(JobState.RUNNING);
try {
return ugi.doAs(new PrivilegedExceptionAction<Counters>() { return ugi.doAs(new PrivilegedExceptionAction<Counters>() {
@Override @Override
public Counters run() throws IOException, InterruptedException { public Counters run() throws IOException, InterruptedException {
@ -727,6 +748,10 @@ public class Job extends JobContextImpl implements JobContext {
} }
}); });
} }
catch (InterruptedException ie) {
throw new IOException(ie);
}
}
/** /**
* Gets the diagnostic messages for a given task attempt. * Gets the diagnostic messages for a given task attempt.

View File

@ -346,7 +346,7 @@ public class TestGridmixSummary {
}; };
@Override @Override
public boolean isSuccessful() throws IOException, InterruptedException { public boolean isSuccessful() throws IOException {
if (lost) { if (lost) {
throw new IOException("Test failure!"); throw new IOException("Test failure!");
} }