MAPREDUCE-6086. mapreduce.job.credentials.binary should allow all URIs. (Zhihai Xu via kasha)

(cherry picked from commit 52945a33cc)
This commit is contained in:
Karthik Kambatla 2014-09-18 15:41:55 -07:00
parent 039f56e739
commit 372bf54072
5 changed files with 86 additions and 30 deletions

View File

@ -136,6 +136,9 @@ Release 2.6.0 - UNRELEASED
MAPREDUCE-6090. mapred hsadmin getGroups fails to connect in some cases MAPREDUCE-6090. mapred hsadmin getGroups fails to connect in some cases
(Robert Kanter via jlowe) (Robert Kanter via jlowe)
MAPREDUCE-6086. mapreduce.job.credentials.binary should allow all URIs.
(Zhihai Xu via kasha)
Release 2.5.1 - 2014-09-05 Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -578,7 +578,9 @@ class JobSubmitter {
conf.get("mapreduce.job.credentials.binary"); conf.get("mapreduce.job.credentials.binary");
if (binaryTokenFilename != null) { if (binaryTokenFilename != null) {
Credentials binary = Credentials.readTokenStorageFile( Credentials binary = Credentials.readTokenStorageFile(
new Path("file:///" + binaryTokenFilename), conf); FileSystem.getLocal(conf).makeQualified(
new Path(binaryTokenFilename)),
conf);
credentials.addAll(binary); credentials.addAll(binary);
} }
// add secret keys coming from a json file // add secret keys coming from a json file

View File

@ -134,7 +134,9 @@ public class TokenCache {
Credentials binary; Credentials binary;
try { try {
binary = Credentials.readTokenStorageFile( binary = Credentials.readTokenStorageFile(
new Path("file:///" + binaryTokenFilename), conf); FileSystem.getLocal(conf).makeQualified(
new Path(binaryTokenFilename)),
conf);
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }

View File

@ -63,12 +63,25 @@ public class TestTokenCache {
@Test @Test
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
public void testBinaryCredentials() throws Exception { public void testBinaryCredentialsWithoutScheme() throws Exception {
testBinaryCredentials(false);
}
@Test
@SuppressWarnings("deprecation")
public void testBinaryCredentialsWithScheme() throws Exception {
testBinaryCredentials(true);
}
private void testBinaryCredentials(boolean hasScheme) throws Exception {
Path TEST_ROOT_DIR = Path TEST_ROOT_DIR =
new Path(System.getProperty("test.build.data","test/build/data")); new Path(System.getProperty("test.build.data","test/build/data"));
// ick, but need fq path minus file:/ // ick, but need fq path minus file:/
String binaryTokenFile = FileSystem.getLocal(conf).makeQualified( String binaryTokenFile = hasScheme
new Path(TEST_ROOT_DIR, "tokenFile")).toUri().getPath(); ? FileSystem.getLocal(conf).makeQualified(
new Path(TEST_ROOT_DIR, "tokenFile")).toString()
: FileSystem.getLocal(conf).makeQualified(
new Path(TEST_ROOT_DIR, "tokenFile")).toUri().getPath();
MockFileSystem fs1 = createFileSystemForServiceName("service1"); MockFileSystem fs1 = createFileSystemForServiceName("service1");
MockFileSystem fs2 = createFileSystemForServiceName("service2"); MockFileSystem fs2 = createFileSystemForServiceName("service2");

View File

@ -150,30 +150,15 @@ public class TestBinaryTokenFile {
// Credentials in the job will not have delegation tokens // Credentials in the job will not have delegation tokens
// because security is disabled. Fetch delegation tokens // because security is disabled. Fetch delegation tokens
// and store in binary token file. // and store in binary token file.
try { createBinaryTokenFile(job.getConfiguration());
Credentials cred1 = new Credentials(); job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY,
Credentials cred2 = new Credentials(); binaryTokenFileName.toString());
TokenCache.obtainTokensForNamenodesInternal(cred1, new Path[] { p1 }, // NB: the MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY
job.getConfiguration()); // key now gets deleted from config,
for (Token<? extends TokenIdentifier> t : cred1.getAllTokens()) { // so it's not accessible in the job's config. So,
cred2.addToken(new Text(DELEGATION_TOKEN_KEY), t); // we use another key to pass the file name into the job configuration:
} job.getConfiguration().set(KEY_SECURITY_TOKEN_FILE_NAME,
DataOutputStream os = new DataOutputStream(new FileOutputStream( binaryTokenFileName.toString());
binaryTokenFileName.toString()));
try {
cred2.writeTokenStorageToStream(os);
} finally {
os.close();
}
job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY,
binaryTokenFileName.toString());
// NB: the MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY key now gets deleted from config,
// so it's not accessible in the job's config. So, we use another key to pass the file name into the job configuration:
job.getConfiguration().set(KEY_SECURITY_TOKEN_FILE_NAME,
binaryTokenFileName.toString());
} catch (IOException e) {
Assert.fail("Exception " + e);
}
} }
} }
@ -225,7 +210,29 @@ public class TestBinaryTokenFile {
dfsCluster = null; dfsCluster = null;
} }
} }
private static void createBinaryTokenFile(Configuration conf) {
// Fetch delegation tokens and store in binary token file.
try {
Credentials cred1 = new Credentials();
Credentials cred2 = new Credentials();
TokenCache.obtainTokensForNamenodesInternal(cred1, new Path[] { p1 },
conf);
for (Token<? extends TokenIdentifier> t : cred1.getAllTokens()) {
cred2.addToken(new Text(DELEGATION_TOKEN_KEY), t);
}
DataOutputStream os = new DataOutputStream(new FileOutputStream(
binaryTokenFileName.toString()));
try {
cred2.writeTokenStorageToStream(os);
} finally {
os.close();
}
} catch (IOException e) {
Assert.fail("Exception " + e);
}
}
/** /**
* run a distributed job and verify that TokenCache is available * run a distributed job and verify that TokenCache is available
* @throws IOException * @throws IOException
@ -252,4 +259,33 @@ public class TestBinaryTokenFile {
} }
assertEquals("dist job res is not 0:", 0, res); assertEquals("dist job res is not 0:", 0, res);
} }
/**
* run a distributed job with -tokenCacheFile option parameter and
* verify that no exception happens.
* @throws IOException
*/
@Test
public void testTokenCacheFile() throws IOException {
Configuration conf = mrCluster.getConfig();
createBinaryTokenFile(conf);
// provide namenodes names for the job to get the delegation tokens for
final String nnUri = dfsCluster.getURI(0).toString();
conf.set(MRJobConfig.JOB_NAMENODES, nnUri + "," + nnUri);
// using argument to pass the file name
final String[] args = {
"-tokenCacheFile", binaryTokenFileName.toString(),
"-m", "1", "-r", "1", "-mt", "1", "-rt", "1"
};
int res = -1;
try {
res = ToolRunner.run(conf, new SleepJob(), args);
} catch (Exception e) {
System.out.println("Job failed with " + e.getLocalizedMessage());
e.printStackTrace(System.out);
fail("Job failed");
}
assertEquals("dist job res is not 0:", 0, res);
}
} }