MAPREDUCE-6452. NPE when intermediate encrypt enabled for LocalRunner. Contributed by Zhihai Xu

This commit is contained in:
Zhihai Xu 2015-08-28 12:13:23 -07:00
parent b6ceee9bf4
commit cbb249534a
3 changed files with 59 additions and 0 deletions

View File

@ -559,6 +559,13 @@ Release 2.8.0 - UNRELEASED
committing is not utilized when input path is absolute.
(Dustin Cote via aajisaka)
MAPREDUCE-6357. MultipleOutputs.write() API should document that output
committing is not utilized when input path is absolute.
(Dustin Cote via aajisaka)
MAPREDUCE-6452. NPE when intermediate encrypt enabled for LocalRunner.
(Zhihai Xu)
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -24,6 +24,7 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -36,6 +37,8 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.crypto.KeyGenerator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -47,7 +50,9 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.hadoop.mapreduce.CryptoUtils;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.QueueInfo;
import org.apache.hadoop.mapreduce.TaskCompletionEvent;
@ -55,6 +60,7 @@ import org.apache.hadoop.mapreduce.TaskTrackerInfo;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
@ -84,6 +90,8 @@ public class LocalJobRunner implements ClientProtocol {
public static final String LOCAL_MAX_REDUCES =
"mapreduce.local.reduce.tasks.maximum";
public static final String INTERMEDIATE_DATA_ENCRYPTION_ALGO = "HmacSHA1";
private FileSystem fs;
private HashMap<JobID, Job> jobs = new HashMap<JobID, Job>();
private JobConf conf;
@ -188,6 +196,25 @@ public class LocalJobRunner implements ClientProtocol {
jobs.put(id, this);
if (CryptoUtils.isEncryptedSpillEnabled(job)) {
try {
int keyLen = conf.getInt(
MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS,
MRJobConfig
.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS);
KeyGenerator keyGen =
KeyGenerator.getInstance(INTERMEDIATE_DATA_ENCRYPTION_ALGO);
keyGen.init(keyLen);
Credentials creds =
UserGroupInformation.getCurrentUser().getCredentials();
TokenCache.setEncryptedSpillKey(keyGen.generateKey().getEncoded(),
creds);
UserGroupInformation.getCurrentUser().addCredentials(creds);
} catch (NoSuchAlgorithmException e) {
throw new IOException("Error generating encrypted spill key", e);
}
}
this.start();
}

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.SleepJob;
import org.apache.hadoop.util.ToolRunner;
import org.junit.After;
@ -81,6 +82,30 @@ public class TestLocalJobSubmission {
assertEquals("dist job res is not 0:", 0, res);
}
/**
* test the local job submission with
* intermediate data encryption enabled.
* @throws IOException
*/
@Test
public void testLocalJobEncryptedIntermediateData() throws IOException {
Configuration conf = new Configuration();
conf.set(MRConfig.FRAMEWORK_NAME, "local");
conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
final String[] args = {
"-m", "1", "-r", "1", "-mt", "1", "-rt", "1"
};
int res = -1;
try {
res = ToolRunner.run(conf, new SleepJob(), args);
} catch (Exception e) {
System.out.println("Job failed with " + e.getLocalizedMessage());
e.printStackTrace(System.out);
fail("Job failed");
}
assertEquals("dist job res is not 0:", 0, res);
}
private Path makeJar(Path p) throws IOException {
FileOutputStream fos = new FileOutputStream(new File(p.toString()));
JarOutputStream jos = new JarOutputStream(fos);