MAPREDUCE-4554. Job Credentials are not transmitted if security is turned off (Benoy Antony via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1395769 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2012-10-08 20:50:40 +00:00
parent 17b32bb2de
commit 49b20c2ed1
7 changed files with 305 additions and 28 deletions

View File

@ -555,6 +555,9 @@ Release 0.23.5 - UNRELEASED
BUG FIXES
MAPREDUCE-4554. Job Credentials are not transmitted if security is turned
off (Benoy Antony via bobby)
Release 0.23.4 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -490,19 +490,17 @@ public class MRAppMaster extends CompositeService {
try {
this.currentUser = UserGroupInformation.getCurrentUser();
if (UserGroupInformation.isSecurityEnabled()) {
// Read the file-system tokens from the localized tokens-file.
Path jobSubmitDir =
FileContext.getLocalFSFileContext().makeQualified(
new Path(new File(MRJobConfig.JOB_SUBMIT_DIR)
.getAbsolutePath()));
Path jobTokenFile =
new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE);
fsTokens.addAll(Credentials.readTokenStorageFile(jobTokenFile, conf));
LOG.info("jobSubmitDir=" + jobSubmitDir + " jobTokenFile="
+ jobTokenFile);
currentUser.addCredentials(fsTokens); // For use by AppMaster itself.
}
// Read the file-system tokens from the localized tokens-file.
Path jobSubmitDir =
FileContext.getLocalFSFileContext().makeQualified(
new Path(new File(MRJobConfig.JOB_SUBMIT_DIR)
.getAbsolutePath()));
Path jobTokenFile =
new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE);
fsTokens.addAll(Credentials.readTokenStorageFile(jobTokenFile, conf));
LOG.info("jobSubmitDir=" + jobSubmitDir + " jobTokenFile="
+ jobTokenFile);
currentUser.addCredentials(fsTokens); // For use by AppMaster itself.
} catch (IOException e) {
throw new YarnException(e);
}

View File

@ -647,14 +647,10 @@ public abstract class TaskAttemptImpl implements
MRApps.setupDistributedCache(conf, localResources);
// Setup up task credentials buffer
Credentials taskCredentials = new Credentials();
if (UserGroupInformation.isSecurityEnabled()) {
LOG.info("Adding #" + credentials.numberOfTokens()
+ " tokens and #" + credentials.numberOfSecretKeys()
+ " secret keys for NM use for launching container");
taskCredentials.addAll(credentials);
}
LOG.info("Adding #" + credentials.numberOfTokens()
+ " tokens and #" + credentials.numberOfSecretKeys()
+ " secret keys for NM use for launching container");
Credentials taskCredentials = new Credentials(credentials);
// LocalStorageToken is needed irrespective of whether security is enabled
// or not.

View File

@ -179,6 +179,10 @@ import org.junit.Test;
public Configuration getConfig() {
return conf;
}
@Override
protected void downloadTokensAndSetupUGI(Configuration conf) {
}
}
private final class MRAppTestCleanup extends MRApp {
@ -266,4 +270,4 @@ import org.junit.Test;
Assert.assertTrue("Staging directory not cleaned before notifying RM",
app.cleanedBeforeContainerAllocatorStopped);
}
}
}

View File

@ -372,12 +372,9 @@ public class YARNRunner implements ClientProtocol {
}
// Setup security tokens
ByteBuffer securityTokens = null;
if (UserGroupInformation.isSecurityEnabled()) {
DataOutputBuffer dob = new DataOutputBuffer();
ts.writeTokenStorageToStream(dob);
securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
}
DataOutputBuffer dob = new DataOutputBuffer();
ts.writeTokenStorageToStream(dob);
ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
// Setup the command to run the AM
List<String> vargs = new ArrayList<String>(8);

View File

@ -0,0 +1,142 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.security;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.SleepJob;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* class for testing transport of keys via Credentials .
* Client passes a list of keys in the Credentials object.
* The mapper and reducer checks whether it can access the keys
* from Credentials.
*/
public class CredentialsTestJob extends Configured implements Tool {
private static final int NUM_OF_KEYS = 10;
private static void checkSecrets(Credentials ts) {
if ( ts == null){
throw new RuntimeException("The credentials are not available");
// fail the test
}
for(int i=0; i<NUM_OF_KEYS; i++) {
String secretName = "alias"+i;
// get token storage and a key
byte[] secretValue = ts.getSecretKey(new Text(secretName));
System.out.println(secretValue);
if (secretValue == null){
throw new RuntimeException("The key "+ secretName + " is not available. ");
// fail the test
}
String secretValueStr = new String (secretValue);
if ( !("password"+i).equals(secretValueStr)){
throw new RuntimeException("The key "+ secretName +
" is not correct. Expected value is "+ ("password"+i) +
". Actual value is " + secretValueStr); // fail the test
}
}
}
public static class CredentialsTestMapper
extends Mapper<IntWritable, IntWritable, IntWritable, NullWritable> {
Credentials ts;
protected void setup(Context context)
throws IOException, InterruptedException {
ts = context.getCredentials();
}
public void map(IntWritable key, IntWritable value, Context context
) throws IOException, InterruptedException {
checkSecrets(ts);
}
}
public static class CredentialsTestReducer
extends Reducer<IntWritable, NullWritable, NullWritable, NullWritable> {
Credentials ts;
protected void setup(Context context)
throws IOException, InterruptedException {
ts = context.getCredentials();
}
public void reduce(IntWritable key, Iterable<NullWritable> values,
Context context)
throws IOException {
checkSecrets(ts);
}
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new CredentialsTestJob(), args);
System.exit(res);
}
public Job createJob()
throws IOException {
Configuration conf = getConf();
conf.setInt(MRJobConfig.NUM_MAPS, 1);
Job job = Job.getInstance(conf, "test");
job.setNumReduceTasks(1);
job.setJarByClass(CredentialsTestJob.class);
job.setNumReduceTasks(1);
job.setMapperClass(CredentialsTestJob.CredentialsTestMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(CredentialsTestJob.CredentialsTestReducer.class);
job.setInputFormatClass(SleepJob.SleepInputFormat.class);
job.setPartitionerClass(SleepJob.SleepJobPartitioner.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setSpeculativeExecution(false);
job.setJobName("test job");
FileInputFormat.addInputPath(job, new Path("ignored"));
return job;
}
public int run(String[] args) throws Exception {
Job job = createJob();
return job.waitForCompletion(true) ? 0 : 1;
}
}

View File

@ -0,0 +1,137 @@
package org.apache.hadoop.mapreduce.security;
/** Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRClientCluster;
import org.apache.hadoop.mapred.MiniMRClientClusterFactory;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.util.ToolRunner;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Tests whether a protected secret passed from JobClient is
* available to the child task
*/
public class TestMRCredentials {
static final int NUM_OF_KEYS = 10;
private static MiniMRClientCluster mrCluster;
private static MiniDFSCluster dfsCluster;
private static int numSlaves = 1;
private static JobConf jConf;
@SuppressWarnings("deprecation")
@BeforeClass
public static void setUp() throws Exception {
System.setProperty("hadoop.log.dir", "logs");
Configuration conf = new Configuration();
dfsCluster = new MiniDFSCluster(conf, numSlaves, true, null);
jConf = new JobConf(conf);
FileSystem.setDefaultUri(conf, dfsCluster.getFileSystem().getUri().toString());
mrCluster = MiniMRClientClusterFactory.create(TestMRCredentials.class, 1, jConf);
createKeysAsJson("keys.json");
}
@AfterClass
public static void tearDown() throws Exception {
if(mrCluster != null)
mrCluster.stop();
mrCluster = null;
if(dfsCluster != null)
dfsCluster.shutdown();
dfsCluster = null;
new File("keys.json").delete();
}
public static void createKeysAsJson (String fileName)
throws FileNotFoundException, IOException{
StringBuilder jsonString = new StringBuilder();
jsonString.append("{");
for(int i=0; i<NUM_OF_KEYS; i++) {
String keyName = "alias"+i;
String password = "password"+i;
jsonString.append("\""+ keyName +"\":"+ "\""+password+"\"" );
if (i < (NUM_OF_KEYS-1)){
jsonString.append(",");
}
}
jsonString.append("}");
FileOutputStream fos= new FileOutputStream (fileName);
fos.write(jsonString.toString().getBytes());
fos.close();
}
/**
* run a distributed job and verify that TokenCache is available
* @throws IOException
*/
@Test
public void test () throws IOException {
// make sure JT starts
Configuration jobConf = new JobConf(mrCluster.getConfig());
// provide namenodes names for the job to get the delegation tokens for
//String nnUri = dfsCluster.getNameNode().getUri(namenode).toString();
NameNode nn = dfsCluster.getNameNode();
URI nnUri = NameNode.getUri(nn.getNameNodeAddress());
jobConf.set(JobContext.JOB_NAMENODES, nnUri + "," + nnUri.toString());
jobConf.set("mapreduce.job.credentials.json" , "keys.json");
// using argument to pass the file name
String[] args = {
"-m", "1", "-r", "1", "-mt", "1", "-rt", "1"
};
int res = -1;
try {
res = ToolRunner.run(jobConf, new CredentialsTestJob(), args);
} catch (Exception e) {
System.out.println("Job failed with" + e.getLocalizedMessage());
e.printStackTrace(System.out);
fail("Job failed");
}
assertEquals("dist job res is not 0", res, 0);
}
}