MAPREDUCE-6638. Do not attempt to recover progress from previous job attempts if spill encryption is enabled. (Haibo Chen via kasha)
(cherry picked from commit de7a0a92ca1983b35ca4beb7ab712fd700a9e6e0)
This commit is contained in:
parent
03b797a6ac
commit
876fc2e61a
@ -146,7 +146,6 @@
|
|||||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
|
import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.util.Clock;
|
import org.apache.hadoop.yarn.util.Clock;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
|
||||||
import org.apache.hadoop.yarn.util.SystemClock;
|
import org.apache.hadoop.yarn.util.SystemClock;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
|
|
||||||
@ -1267,44 +1266,77 @@ public Boolean call(Configuration conf) throws IOException {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void processRecovery() throws IOException{
|
private void processRecovery() throws IOException{
|
||||||
if (appAttemptID.getAttemptId() == 1) {
|
boolean attemptRecovery = shouldAttemptRecovery();
|
||||||
return; // no need to recover on the first attempt
|
boolean recoverySucceeded = true;
|
||||||
|
if (attemptRecovery) {
|
||||||
|
LOG.info("Attempting to recover.");
|
||||||
|
try {
|
||||||
|
parsePreviousJobHistory();
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Unable to parse prior job history, aborting recovery", e);
|
||||||
|
recoverySucceeded = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!isFirstAttempt() && (!attemptRecovery || !recoverySucceeded)) {
|
||||||
|
amInfos.addAll(readJustAMInfos());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isFirstAttempt() {
|
||||||
|
return appAttemptID.getAttemptId() == 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if the current job attempt should try to recover from previous
|
||||||
|
* job attempts if any.
|
||||||
|
*/
|
||||||
|
private boolean shouldAttemptRecovery() throws IOException {
|
||||||
|
if (isFirstAttempt()) {
|
||||||
|
return false; // no need to recover on the first attempt
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean recoveryEnabled = getConfig().getBoolean(
|
boolean recoveryEnabled = getConfig().getBoolean(
|
||||||
MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE,
|
MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE,
|
||||||
MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE_DEFAULT);
|
MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE_DEFAULT);
|
||||||
|
if (!recoveryEnabled) {
|
||||||
|
LOG.info("Not attempting to recover. Recovery disabled. To enable " +
|
||||||
|
"recovery, set " + MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
boolean recoverySupportedByCommitter = isRecoverySupported();
|
boolean recoverySupportedByCommitter = isRecoverySupported();
|
||||||
|
if (!recoverySupportedByCommitter) {
|
||||||
|
LOG.info("Not attempting to recover. Recovery is not supported by " +
|
||||||
|
committer.getClass() + ". Use an OutputCommitter that supports" +
|
||||||
|
" recovery.");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
// If a shuffle secret was not provided by the job client then this app
|
int reducerCount = getConfig().getInt(MRJobConfig.NUM_REDUCES, 0);
|
||||||
// attempt will generate one. However that disables recovery if there
|
|
||||||
// are reducers as the shuffle secret would be app attempt specific.
|
// If a shuffle secret was not provided by the job client, one will be
|
||||||
int numReduceTasks = getConfig().getInt(MRJobConfig.NUM_REDUCES, 0);
|
// generated in this job attempt. However, that disables recovery if
|
||||||
|
// there are reducers as the shuffle secret would be job attempt specific.
|
||||||
boolean shuffleKeyValidForRecovery =
|
boolean shuffleKeyValidForRecovery =
|
||||||
TokenCache.getShuffleSecretKey(jobCredentials) != null;
|
TokenCache.getShuffleSecretKey(jobCredentials) != null;
|
||||||
|
if (reducerCount > 0 && !shuffleKeyValidForRecovery) {
|
||||||
|
LOG.info("Not attempting to recover. The shuffle key is invalid for " +
|
||||||
|
"recovery.");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
if (recoveryEnabled && recoverySupportedByCommitter
|
// If the intermediate data is encrypted, recovering the job requires the
|
||||||
&& (numReduceTasks <= 0 || shuffleKeyValidForRecovery)) {
|
// access to the key. Until the encryption key is persisted, we should
|
||||||
LOG.info("Recovery is enabled. "
|
// avoid attempts to recover.
|
||||||
+ "Will try to recover from previous life on best effort basis.");
|
boolean spillEncrypted = CryptoUtils.isEncryptedSpillEnabled(getConfig());
|
||||||
try {
|
if (reducerCount > 0 && spillEncrypted) {
|
||||||
parsePreviousJobHistory();
|
LOG.info("Not attempting to recover. Intermediate spill encryption" +
|
||||||
} catch (IOException e) {
|
" is enabled.");
|
||||||
LOG.warn("Unable to parse prior job history, aborting recovery", e);
|
return false;
|
||||||
// try to get just the AMInfos
|
|
||||||
amInfos.addAll(readJustAMInfos());
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
LOG.info("Will not try to recover. recoveryEnabled: "
|
|
||||||
+ recoveryEnabled + " recoverySupportedByCommitter: "
|
|
||||||
+ recoverySupportedByCommitter + " numReduceTasks: "
|
|
||||||
+ numReduceTasks + " shuffleKeyValidForRecovery: "
|
|
||||||
+ shuffleKeyValidForRecovery + " ApplicationAttemptID: "
|
|
||||||
+ appAttemptID.getAttemptId());
|
|
||||||
// Get the amInfos anyways whether recovery is enabled or not
|
|
||||||
amInfos.addAll(readJustAMInfos());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static FSDataInputStream getPreviousJobHistoryStream(
|
private static FSDataInputStream getPreviousJobHistoryStream(
|
||||||
@ -1404,6 +1436,10 @@ private List<AMInfo> readJustAMInfos() {
|
|||||||
return amInfos;
|
return amInfos;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean recovered() {
|
||||||
|
return recoveredJobStartTime > 0;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This can be overridden to instantiate multiple jobs and create a
|
* This can be overridden to instantiate multiple jobs and create a
|
||||||
* workflow.
|
* workflow.
|
||||||
|
@ -579,6 +579,72 @@ public void testRecoverySuccessUsingCustomOutputCommitter() throws Exception {
|
|||||||
app.verifyCompleted();
|
app.verifyCompleted();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRecoveryWithSpillEncryption() throws Exception {
|
||||||
|
int runCount = 0;
|
||||||
|
MRApp app = new MRAppWithHistory(1, 1, false, this.getClass().getName(),
|
||||||
|
true, ++runCount) {
|
||||||
|
};
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
|
||||||
|
conf.setBoolean("mapred.mapper.new-api", true);
|
||||||
|
conf.setBoolean("mapred.reducer.new-api", true);
|
||||||
|
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
||||||
|
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
|
||||||
|
conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
|
||||||
|
|
||||||
|
// run the MR job at the first attempt
|
||||||
|
Job jobAttempt1 = app.submit(conf);
|
||||||
|
app.waitForState(jobAttempt1, JobState.RUNNING);
|
||||||
|
|
||||||
|
Iterator<Task> tasks = jobAttempt1.getTasks().values().iterator();
|
||||||
|
|
||||||
|
// finish the map task but the reduce task
|
||||||
|
Task mapper = tasks.next();
|
||||||
|
app.waitForState(mapper, TaskState.RUNNING);
|
||||||
|
TaskAttempt mapAttempt = mapper.getAttempts().values().iterator().next();
|
||||||
|
app.waitForState(mapAttempt, TaskAttemptState.RUNNING);
|
||||||
|
app.getContext().getEventHandler().handle(
|
||||||
|
new TaskAttemptEvent(mapAttempt.getID(), TaskAttemptEventType.TA_DONE));
|
||||||
|
app.waitForState(mapper, TaskState.SUCCEEDED);
|
||||||
|
|
||||||
|
// crash the first attempt of the MR job
|
||||||
|
app.stop();
|
||||||
|
|
||||||
|
// run the MR job again at the second attempt
|
||||||
|
app = new MRAppWithHistory(1, 1, false, this.getClass().getName(), false,
|
||||||
|
++runCount);
|
||||||
|
Job jobAttempt2 = app.submit(conf);
|
||||||
|
Assert.assertTrue("Recovery from previous job attempt is processed even " +
|
||||||
|
"though intermediate data encryption is enabled.", !app.recovered());
|
||||||
|
|
||||||
|
// The map task succeeded from previous job attempt will not be recovered
|
||||||
|
// because the data spill encryption is enabled.
|
||||||
|
// Let's finish the job at the second attempt and verify its completion.
|
||||||
|
app.waitForState(jobAttempt2, JobState.RUNNING);
|
||||||
|
tasks = jobAttempt2.getTasks().values().iterator();
|
||||||
|
mapper = tasks.next();
|
||||||
|
Task reducer = tasks.next();
|
||||||
|
|
||||||
|
// finish the map task first
|
||||||
|
app.waitForState(mapper, TaskState.RUNNING);
|
||||||
|
mapAttempt = mapper.getAttempts().values().iterator().next();
|
||||||
|
app.waitForState(mapAttempt, TaskAttemptState.RUNNING);
|
||||||
|
app.getContext().getEventHandler().handle(
|
||||||
|
new TaskAttemptEvent(mapAttempt.getID(), TaskAttemptEventType.TA_DONE));
|
||||||
|
app.waitForState(mapper, TaskState.SUCCEEDED);
|
||||||
|
|
||||||
|
// then finish the reduce task
|
||||||
|
TaskAttempt redAttempt = reducer.getAttempts().values().iterator().next();
|
||||||
|
app.waitForState(redAttempt, TaskAttemptState.RUNNING);
|
||||||
|
app.getContext().getEventHandler().handle(
|
||||||
|
new TaskAttemptEvent(redAttempt.getID(), TaskAttemptEventType.TA_DONE));
|
||||||
|
app.waitForState(reducer, TaskState.SUCCEEDED);
|
||||||
|
|
||||||
|
// verify that the job succeeds at the 2rd attempt
|
||||||
|
app.waitForState(jobAttempt2, JobState.SUCCEEDED);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This test case primarily verifies if the recovery is controlled through config
|
* This test case primarily verifies if the recovery is controlled through config
|
||||||
* property. In this case, recover is turned OFF. AM with 3 maps and 0 reduce.
|
* property. In this case, recover is turned OFF. AM with 3 maps and 0 reduce.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user