MAPREDUCE-6452. NPE when intermediate encrypt enabled for LocalRunner. Contributed by Zhihai Xu
(cherry picked from commit cbb249534a
)
This commit is contained in:
parent
5184779418
commit
297bda600e
|
@ -290,6 +290,13 @@ Release 2.8.0 - UNRELEASED
|
||||||
committing is not utilized when input path is absolute.
|
committing is not utilized when input path is absolute.
|
||||||
(Dustin Cote via aajisaka)
|
(Dustin Cote via aajisaka)
|
||||||
|
|
||||||
|
MAPREDUCE-6357. MultipleOutputs.write() API should document that output
|
||||||
|
committing is not utilized when input path is absolute.
|
||||||
|
(Dustin Cote via aajisaka)
|
||||||
|
|
||||||
|
MAPREDUCE-6452. NPE when intermediate encrypt enabled for LocalRunner.
|
||||||
|
(Zhihai Xu)
|
||||||
|
|
||||||
Release 2.7.2 - UNRELEASED
|
Release 2.7.2 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -24,6 +24,7 @@ import java.io.DataInputStream;
|
||||||
import java.io.DataOutputStream;
|
import java.io.DataOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
import java.security.NoSuchAlgorithmException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -36,6 +37,8 @@ import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import javax.crypto.KeyGenerator;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
@ -47,13 +50,16 @@ import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.ipc.ProtocolSignature;
|
import org.apache.hadoop.ipc.ProtocolSignature;
|
||||||
import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
|
import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
|
||||||
import org.apache.hadoop.mapreduce.ClusterMetrics;
|
import org.apache.hadoop.mapreduce.ClusterMetrics;
|
||||||
|
import org.apache.hadoop.mapreduce.CryptoUtils;
|
||||||
import org.apache.hadoop.mapreduce.MRConfig;
|
import org.apache.hadoop.mapreduce.MRConfig;
|
||||||
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.OutputFormat;
|
import org.apache.hadoop.mapreduce.OutputFormat;
|
||||||
import org.apache.hadoop.mapreduce.QueueInfo;
|
import org.apache.hadoop.mapreduce.QueueInfo;
|
||||||
import org.apache.hadoop.mapreduce.TaskCompletionEvent;
|
import org.apache.hadoop.mapreduce.TaskCompletionEvent;
|
||||||
import org.apache.hadoop.mapreduce.TaskTrackerInfo;
|
import org.apache.hadoop.mapreduce.TaskTrackerInfo;
|
||||||
import org.apache.hadoop.mapreduce.TaskType;
|
import org.apache.hadoop.mapreduce.TaskType;
|
||||||
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
|
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
|
||||||
|
import org.apache.hadoop.mapreduce.security.TokenCache;
|
||||||
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
|
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
|
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
|
||||||
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
|
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
|
||||||
|
@ -83,6 +89,8 @@ public class LocalJobRunner implements ClientProtocol {
|
||||||
public static final String LOCAL_MAX_REDUCES =
|
public static final String LOCAL_MAX_REDUCES =
|
||||||
"mapreduce.local.reduce.tasks.maximum";
|
"mapreduce.local.reduce.tasks.maximum";
|
||||||
|
|
||||||
|
public static final String INTERMEDIATE_DATA_ENCRYPTION_ALGO = "HmacSHA1";
|
||||||
|
|
||||||
private FileSystem fs;
|
private FileSystem fs;
|
||||||
private HashMap<JobID, Job> jobs = new HashMap<JobID, Job>();
|
private HashMap<JobID, Job> jobs = new HashMap<JobID, Job>();
|
||||||
private JobConf conf;
|
private JobConf conf;
|
||||||
|
@ -187,6 +195,25 @@ public class LocalJobRunner implements ClientProtocol {
|
||||||
|
|
||||||
jobs.put(id, this);
|
jobs.put(id, this);
|
||||||
|
|
||||||
|
if (CryptoUtils.isEncryptedSpillEnabled(job)) {
|
||||||
|
try {
|
||||||
|
int keyLen = conf.getInt(
|
||||||
|
MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS,
|
||||||
|
MRJobConfig
|
||||||
|
.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS);
|
||||||
|
KeyGenerator keyGen =
|
||||||
|
KeyGenerator.getInstance(INTERMEDIATE_DATA_ENCRYPTION_ALGO);
|
||||||
|
keyGen.init(keyLen);
|
||||||
|
Credentials creds =
|
||||||
|
UserGroupInformation.getCurrentUser().getCredentials();
|
||||||
|
TokenCache.setEncryptedSpillKey(keyGen.generateKey().getEncoded(),
|
||||||
|
creds);
|
||||||
|
UserGroupInformation.getCurrentUser().addCredentials(creds);
|
||||||
|
} catch (NoSuchAlgorithmException e) {
|
||||||
|
throw new IOException("Error generating encrypted spill key", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
this.start();
|
this.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
import org.apache.hadoop.mapreduce.MRConfig;
|
import org.apache.hadoop.mapreduce.MRConfig;
|
||||||
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.SleepJob;
|
import org.apache.hadoop.mapreduce.SleepJob;
|
||||||
import org.apache.hadoop.util.ToolRunner;
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
@ -81,6 +82,30 @@ public class TestLocalJobSubmission {
|
||||||
assertEquals("dist job res is not 0:", 0, res);
|
assertEquals("dist job res is not 0:", 0, res);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* test the local job submission with
|
||||||
|
* intermediate data encryption enabled.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testLocalJobEncryptedIntermediateData() throws IOException {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set(MRConfig.FRAMEWORK_NAME, "local");
|
||||||
|
conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
|
||||||
|
final String[] args = {
|
||||||
|
"-m", "1", "-r", "1", "-mt", "1", "-rt", "1"
|
||||||
|
};
|
||||||
|
int res = -1;
|
||||||
|
try {
|
||||||
|
res = ToolRunner.run(conf, new SleepJob(), args);
|
||||||
|
} catch (Exception e) {
|
||||||
|
System.out.println("Job failed with " + e.getLocalizedMessage());
|
||||||
|
e.printStackTrace(System.out);
|
||||||
|
fail("Job failed");
|
||||||
|
}
|
||||||
|
assertEquals("dist job res is not 0:", 0, res);
|
||||||
|
}
|
||||||
|
|
||||||
private Path makeJar(Path p) throws IOException {
|
private Path makeJar(Path p) throws IOException {
|
||||||
FileOutputStream fos = new FileOutputStream(new File(p.toString()));
|
FileOutputStream fos = new FileOutputStream(new File(p.toString()));
|
||||||
JarOutputStream jos = new JarOutputStream(fos);
|
JarOutputStream jos = new JarOutputStream(fos);
|
||||||
|
|
Loading…
Reference in New Issue