svn merge -c 1395769 FIXES: MAPREDUCE-4554. Job Credentials are not transmitted if security is turned off (Benoy Antony via bobby)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1395772 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
88a3f46ddb
commit
09f0aee7f8
|
@ -419,6 +419,9 @@ Release 0.23.5 - UNRELEASED
|
|||
|
||||
BUG FIXES
|
||||
|
||||
MAPREDUCE-4554. Job Credentials are not transmitted if security is turned
|
||||
off (Benoy Antony via bobby)
|
||||
|
||||
Release 0.23.4 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -490,19 +490,17 @@ public class MRAppMaster extends CompositeService {
|
|||
try {
|
||||
this.currentUser = UserGroupInformation.getCurrentUser();
|
||||
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
// Read the file-system tokens from the localized tokens-file.
|
||||
Path jobSubmitDir =
|
||||
FileContext.getLocalFSFileContext().makeQualified(
|
||||
new Path(new File(MRJobConfig.JOB_SUBMIT_DIR)
|
||||
.getAbsolutePath()));
|
||||
Path jobTokenFile =
|
||||
new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE);
|
||||
fsTokens.addAll(Credentials.readTokenStorageFile(jobTokenFile, conf));
|
||||
LOG.info("jobSubmitDir=" + jobSubmitDir + " jobTokenFile="
|
||||
+ jobTokenFile);
|
||||
currentUser.addCredentials(fsTokens); // For use by AppMaster itself.
|
||||
}
|
||||
// Read the file-system tokens from the localized tokens-file.
|
||||
Path jobSubmitDir =
|
||||
FileContext.getLocalFSFileContext().makeQualified(
|
||||
new Path(new File(MRJobConfig.JOB_SUBMIT_DIR)
|
||||
.getAbsolutePath()));
|
||||
Path jobTokenFile =
|
||||
new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE);
|
||||
fsTokens.addAll(Credentials.readTokenStorageFile(jobTokenFile, conf));
|
||||
LOG.info("jobSubmitDir=" + jobSubmitDir + " jobTokenFile="
|
||||
+ jobTokenFile);
|
||||
currentUser.addCredentials(fsTokens); // For use by AppMaster itself.
|
||||
} catch (IOException e) {
|
||||
throw new YarnException(e);
|
||||
}
|
||||
|
|
|
@ -647,14 +647,10 @@ public abstract class TaskAttemptImpl implements
|
|||
MRApps.setupDistributedCache(conf, localResources);
|
||||
|
||||
// Setup up task credentials buffer
|
||||
Credentials taskCredentials = new Credentials();
|
||||
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
LOG.info("Adding #" + credentials.numberOfTokens()
|
||||
+ " tokens and #" + credentials.numberOfSecretKeys()
|
||||
+ " secret keys for NM use for launching container");
|
||||
taskCredentials.addAll(credentials);
|
||||
}
|
||||
LOG.info("Adding #" + credentials.numberOfTokens()
|
||||
+ " tokens and #" + credentials.numberOfSecretKeys()
|
||||
+ " secret keys for NM use for launching container");
|
||||
Credentials taskCredentials = new Credentials(credentials);
|
||||
|
||||
// LocalStorageToken is needed irrespective of whether security is enabled
|
||||
// or not.
|
||||
|
|
|
@ -179,6 +179,10 @@ import org.junit.Test;
|
|||
public Configuration getConfig() {
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void downloadTokensAndSetupUGI(Configuration conf) {
|
||||
}
|
||||
}
|
||||
|
||||
private final class MRAppTestCleanup extends MRApp {
|
||||
|
@ -266,4 +270,4 @@ import org.junit.Test;
|
|||
Assert.assertTrue("Staging directory not cleaned before notifying RM",
|
||||
app.cleanedBeforeContainerAllocatorStopped);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -372,12 +372,9 @@ public class YARNRunner implements ClientProtocol {
|
|||
}
|
||||
|
||||
// Setup security tokens
|
||||
ByteBuffer securityTokens = null;
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
DataOutputBuffer dob = new DataOutputBuffer();
|
||||
ts.writeTokenStorageToStream(dob);
|
||||
securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
|
||||
}
|
||||
DataOutputBuffer dob = new DataOutputBuffer();
|
||||
ts.writeTokenStorageToStream(dob);
|
||||
ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
|
||||
|
||||
// Setup the command to run the AM
|
||||
List<String> vargs = new ArrayList<String>(8);
|
||||
|
|
|
@ -0,0 +1,142 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapreduce.security;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.IntWritable;
|
||||
import org.apache.hadoop.io.NullWritable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.Mapper;
|
||||
import org.apache.hadoop.mapreduce.Reducer;
|
||||
import org.apache.hadoop.mapreduce.SleepJob;
|
||||
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
||||
/**
|
||||
* class for testing transport of keys via Credentials .
|
||||
* Client passes a list of keys in the Credentials object.
|
||||
* The mapper and reducer checks whether it can access the keys
|
||||
* from Credentials.
|
||||
*/
|
||||
public class CredentialsTestJob extends Configured implements Tool {
|
||||
|
||||
private static final int NUM_OF_KEYS = 10;
|
||||
|
||||
private static void checkSecrets(Credentials ts) {
|
||||
if ( ts == null){
|
||||
throw new RuntimeException("The credentials are not available");
|
||||
// fail the test
|
||||
}
|
||||
|
||||
for(int i=0; i<NUM_OF_KEYS; i++) {
|
||||
String secretName = "alias"+i;
|
||||
// get token storage and a key
|
||||
byte[] secretValue = ts.getSecretKey(new Text(secretName));
|
||||
System.out.println(secretValue);
|
||||
|
||||
if (secretValue == null){
|
||||
throw new RuntimeException("The key "+ secretName + " is not available. ");
|
||||
// fail the test
|
||||
}
|
||||
|
||||
String secretValueStr = new String (secretValue);
|
||||
|
||||
if ( !("password"+i).equals(secretValueStr)){
|
||||
throw new RuntimeException("The key "+ secretName +
|
||||
" is not correct. Expected value is "+ ("password"+i) +
|
||||
". Actual value is " + secretValueStr); // fail the test
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class CredentialsTestMapper
|
||||
extends Mapper<IntWritable, IntWritable, IntWritable, NullWritable> {
|
||||
Credentials ts;
|
||||
|
||||
protected void setup(Context context)
|
||||
throws IOException, InterruptedException {
|
||||
ts = context.getCredentials();
|
||||
}
|
||||
|
||||
public void map(IntWritable key, IntWritable value, Context context
|
||||
) throws IOException, InterruptedException {
|
||||
checkSecrets(ts);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
public static class CredentialsTestReducer
|
||||
extends Reducer<IntWritable, NullWritable, NullWritable, NullWritable> {
|
||||
Credentials ts;
|
||||
|
||||
protected void setup(Context context)
|
||||
throws IOException, InterruptedException {
|
||||
ts = context.getCredentials();
|
||||
}
|
||||
|
||||
public void reduce(IntWritable key, Iterable<NullWritable> values,
|
||||
Context context)
|
||||
throws IOException {
|
||||
checkSecrets(ts);
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
int res = ToolRunner.run(new Configuration(), new CredentialsTestJob(), args);
|
||||
System.exit(res);
|
||||
}
|
||||
|
||||
public Job createJob()
|
||||
throws IOException {
|
||||
Configuration conf = getConf();
|
||||
conf.setInt(MRJobConfig.NUM_MAPS, 1);
|
||||
Job job = Job.getInstance(conf, "test");
|
||||
job.setNumReduceTasks(1);
|
||||
job.setJarByClass(CredentialsTestJob.class);
|
||||
job.setNumReduceTasks(1);
|
||||
job.setMapperClass(CredentialsTestJob.CredentialsTestMapper.class);
|
||||
job.setMapOutputKeyClass(IntWritable.class);
|
||||
job.setMapOutputValueClass(NullWritable.class);
|
||||
job.setReducerClass(CredentialsTestJob.CredentialsTestReducer.class);
|
||||
job.setInputFormatClass(SleepJob.SleepInputFormat.class);
|
||||
job.setPartitionerClass(SleepJob.SleepJobPartitioner.class);
|
||||
job.setOutputFormatClass(NullOutputFormat.class);
|
||||
job.setSpeculativeExecution(false);
|
||||
job.setJobName("test job");
|
||||
FileInputFormat.addInputPath(job, new Path("ignored"));
|
||||
return job;
|
||||
}
|
||||
|
||||
public int run(String[] args) throws Exception {
|
||||
|
||||
Job job = createJob();
|
||||
return job.waitForCompletion(true) ? 0 : 1;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,137 @@
|
|||
package org.apache.hadoop.mapreduce.security;
|
||||
|
||||
/** Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.MiniMRClientCluster;
|
||||
import org.apache.hadoop.mapred.MiniMRClientClusterFactory;
|
||||
import org.apache.hadoop.mapreduce.JobContext;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Tests whether a protected secret passed from JobClient is
|
||||
* available to the child task
|
||||
*/
|
||||
|
||||
public class TestMRCredentials {
|
||||
|
||||
static final int NUM_OF_KEYS = 10;
|
||||
private static MiniMRClientCluster mrCluster;
|
||||
private static MiniDFSCluster dfsCluster;
|
||||
private static int numSlaves = 1;
|
||||
private static JobConf jConf;
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
System.setProperty("hadoop.log.dir", "logs");
|
||||
Configuration conf = new Configuration();
|
||||
dfsCluster = new MiniDFSCluster(conf, numSlaves, true, null);
|
||||
jConf = new JobConf(conf);
|
||||
FileSystem.setDefaultUri(conf, dfsCluster.getFileSystem().getUri().toString());
|
||||
mrCluster = MiniMRClientClusterFactory.create(TestMRCredentials.class, 1, jConf);
|
||||
createKeysAsJson("keys.json");
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
if(mrCluster != null)
|
||||
mrCluster.stop();
|
||||
mrCluster = null;
|
||||
if(dfsCluster != null)
|
||||
dfsCluster.shutdown();
|
||||
dfsCluster = null;
|
||||
|
||||
new File("keys.json").delete();
|
||||
|
||||
}
|
||||
|
||||
public static void createKeysAsJson (String fileName)
|
||||
throws FileNotFoundException, IOException{
|
||||
StringBuilder jsonString = new StringBuilder();
|
||||
jsonString.append("{");
|
||||
for(int i=0; i<NUM_OF_KEYS; i++) {
|
||||
String keyName = "alias"+i;
|
||||
String password = "password"+i;
|
||||
jsonString.append("\""+ keyName +"\":"+ "\""+password+"\"" );
|
||||
if (i < (NUM_OF_KEYS-1)){
|
||||
jsonString.append(",");
|
||||
}
|
||||
|
||||
}
|
||||
jsonString.append("}");
|
||||
|
||||
FileOutputStream fos= new FileOutputStream (fileName);
|
||||
fos.write(jsonString.toString().getBytes());
|
||||
fos.close();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* run a distributed job and verify that TokenCache is available
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void test () throws IOException {
|
||||
|
||||
// make sure JT starts
|
||||
Configuration jobConf = new JobConf(mrCluster.getConfig());
|
||||
|
||||
// provide namenodes names for the job to get the delegation tokens for
|
||||
//String nnUri = dfsCluster.getNameNode().getUri(namenode).toString();
|
||||
NameNode nn = dfsCluster.getNameNode();
|
||||
URI nnUri = NameNode.getUri(nn.getNameNodeAddress());
|
||||
jobConf.set(JobContext.JOB_NAMENODES, nnUri + "," + nnUri.toString());
|
||||
|
||||
|
||||
jobConf.set("mapreduce.job.credentials.json" , "keys.json");
|
||||
|
||||
// using argument to pass the file name
|
||||
String[] args = {
|
||||
"-m", "1", "-r", "1", "-mt", "1", "-rt", "1"
|
||||
};
|
||||
|
||||
int res = -1;
|
||||
try {
|
||||
res = ToolRunner.run(jobConf, new CredentialsTestJob(), args);
|
||||
} catch (Exception e) {
|
||||
System.out.println("Job failed with" + e.getLocalizedMessage());
|
||||
e.printStackTrace(System.out);
|
||||
fail("Job failed");
|
||||
}
|
||||
assertEquals("dist job res is not 0", res, 0);
|
||||
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue