diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 361a19b6953..27af9f91356 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -559,6 +559,13 @@ Release 2.8.0 - UNRELEASED committing is not utilized when input path is absolute. (Dustin Cote via aajisaka) + MAPREDUCE-6357. MultipleOutputs.write() API should document that output + committing is not utilized when input path is absolute. + (Dustin Cote via aajisaka) + + MAPREDUCE-6452. NPE when intermediate encrypt enabled for LocalRunner. + (Zhihai Xu) + Release 2.7.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java index b6855024f36..45d3cc5b29a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java @@ -24,6 +24,7 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -36,6 +37,8 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import javax.crypto.KeyGenerator; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -47,7 +50,9 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus; import org.apache.hadoop.mapreduce.ClusterMetrics; +import org.apache.hadoop.mapreduce.CryptoUtils; import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.QueueInfo; import org.apache.hadoop.mapreduce.TaskCompletionEvent; @@ -55,6 +60,7 @@ import org.apache.hadoop.mapreduce.TaskTrackerInfo; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID; import org.apache.hadoop.mapreduce.protocol.ClientProtocol; +import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; @@ -84,6 +90,8 @@ public class LocalJobRunner implements ClientProtocol { public static final String LOCAL_MAX_REDUCES = "mapreduce.local.reduce.tasks.maximum"; + public static final String INTERMEDIATE_DATA_ENCRYPTION_ALGO = "HmacSHA1"; + private FileSystem fs; private HashMap jobs = new HashMap(); private JobConf conf; @@ -188,6 +196,25 @@ public class LocalJobRunner implements ClientProtocol { jobs.put(id, this); + if (CryptoUtils.isEncryptedSpillEnabled(job)) { + try { + int keyLen = conf.getInt( + MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS, + MRJobConfig + .DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS); + KeyGenerator keyGen = + KeyGenerator.getInstance(INTERMEDIATE_DATA_ENCRYPTION_ALGO); + keyGen.init(keyLen); + Credentials creds = + UserGroupInformation.getCurrentUser().getCredentials(); + TokenCache.setEncryptedSpillKey(keyGen.generateKey().getEncoded(), + creds); + UserGroupInformation.getCurrentUser().addCredentials(creds); + } catch (NoSuchAlgorithmException e) { + throw new IOException("Error generating encrypted spill key", e); + } + } + this.start(); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java index d73ee4b7c55..8b02857b23a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java @@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.SleepJob; import org.apache.hadoop.util.ToolRunner; import org.junit.After; @@ -81,6 +82,30 @@ public class TestLocalJobSubmission { assertEquals("dist job res is not 0:", 0, res); } + /** + * test the local job submission with + * intermediate data encryption enabled. + * @throws IOException + */ + @Test + public void testLocalJobEncryptedIntermediateData() throws IOException { + Configuration conf = new Configuration(); + conf.set(MRConfig.FRAMEWORK_NAME, "local"); + conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true); + final String[] args = { + "-m", "1", "-r", "1", "-mt", "1", "-rt", "1" + }; + int res = -1; + try { + res = ToolRunner.run(conf, new SleepJob(), args); + } catch (Exception e) { + System.out.println("Job failed with " + e.getLocalizedMessage()); + e.printStackTrace(System.out); + fail("Job failed"); + } + assertEquals("dist job res is not 0:", 0, res); + } + private Path makeJar(Path p) throws IOException { FileOutputStream fos = new FileOutputStream(new File(p.toString())); JarOutputStream jos = new JarOutputStream(fos);