MAPREDUCE-6086. mapreduce.job.credentials.binary should allow all URIs. (Zhihai Xu via kasha)
This commit is contained in:
parent
1b4b46004a
commit
52945a33cc
|
@ -353,6 +353,9 @@ Release 2.6.0 - UNRELEASED
|
|||
MAPREDUCE-6090. mapred hsadmin getGroups fails to connect in some cases
|
||||
(Robert Kanter via jlowe)
|
||||
|
||||
MAPREDUCE-6086. mapreduce.job.credentials.binary should allow all URIs.
|
||||
(Zhihai Xu via kasha)
|
||||
|
||||
Release 2.5.1 - 2014-09-05
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -578,7 +578,9 @@ class JobSubmitter {
|
|||
conf.get("mapreduce.job.credentials.binary");
|
||||
if (binaryTokenFilename != null) {
|
||||
Credentials binary = Credentials.readTokenStorageFile(
|
||||
new Path("file:///" + binaryTokenFilename), conf);
|
||||
FileSystem.getLocal(conf).makeQualified(
|
||||
new Path(binaryTokenFilename)),
|
||||
conf);
|
||||
credentials.addAll(binary);
|
||||
}
|
||||
// add secret keys coming from a json file
|
||||
|
|
|
@ -134,7 +134,9 @@ public class TokenCache {
|
|||
Credentials binary;
|
||||
try {
|
||||
binary = Credentials.readTokenStorageFile(
|
||||
new Path("file:///" + binaryTokenFilename), conf);
|
||||
FileSystem.getLocal(conf).makeQualified(
|
||||
new Path(binaryTokenFilename)),
|
||||
conf);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
|
|
@ -63,11 +63,24 @@ public class TestTokenCache {
|
|||
|
||||
@Test
|
||||
@SuppressWarnings("deprecation")
|
||||
public void testBinaryCredentials() throws Exception {
|
||||
public void testBinaryCredentialsWithoutScheme() throws Exception {
|
||||
testBinaryCredentials(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("deprecation")
|
||||
public void testBinaryCredentialsWithScheme() throws Exception {
|
||||
testBinaryCredentials(true);
|
||||
}
|
||||
|
||||
private void testBinaryCredentials(boolean hasScheme) throws Exception {
|
||||
Path TEST_ROOT_DIR =
|
||||
new Path(System.getProperty("test.build.data","test/build/data"));
|
||||
// ick, but need fq path minus file:/
|
||||
String binaryTokenFile = FileSystem.getLocal(conf).makeQualified(
|
||||
String binaryTokenFile = hasScheme
|
||||
? FileSystem.getLocal(conf).makeQualified(
|
||||
new Path(TEST_ROOT_DIR, "tokenFile")).toString()
|
||||
: FileSystem.getLocal(conf).makeQualified(
|
||||
new Path(TEST_ROOT_DIR, "tokenFile")).toUri().getPath();
|
||||
|
||||
MockFileSystem fs1 = createFileSystemForServiceName("service1");
|
||||
|
|
|
@ -150,30 +150,15 @@ public class TestBinaryTokenFile {
|
|||
// Credentials in the job will not have delegation tokens
|
||||
// because security is disabled. Fetch delegation tokens
|
||||
// and store in binary token file.
|
||||
try {
|
||||
Credentials cred1 = new Credentials();
|
||||
Credentials cred2 = new Credentials();
|
||||
TokenCache.obtainTokensForNamenodesInternal(cred1, new Path[] { p1 },
|
||||
job.getConfiguration());
|
||||
for (Token<? extends TokenIdentifier> t : cred1.getAllTokens()) {
|
||||
cred2.addToken(new Text(DELEGATION_TOKEN_KEY), t);
|
||||
}
|
||||
DataOutputStream os = new DataOutputStream(new FileOutputStream(
|
||||
binaryTokenFileName.toString()));
|
||||
try {
|
||||
cred2.writeTokenStorageToStream(os);
|
||||
} finally {
|
||||
os.close();
|
||||
}
|
||||
createBinaryTokenFile(job.getConfiguration());
|
||||
job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY,
|
||||
binaryTokenFileName.toString());
|
||||
// NB: the MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY key now gets deleted from config,
|
||||
// so it's not accessible in the job's config. So, we use another key to pass the file name into the job configuration:
|
||||
// NB: the MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY
|
||||
// key now gets deleted from config,
|
||||
// so it's not accessible in the job's config. So,
|
||||
// we use another key to pass the file name into the job configuration:
|
||||
job.getConfiguration().set(KEY_SECURITY_TOKEN_FILE_NAME,
|
||||
binaryTokenFileName.toString());
|
||||
} catch (IOException e) {
|
||||
Assert.fail("Exception " + e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -226,6 +211,28 @@ public class TestBinaryTokenFile {
|
|||
}
|
||||
}
|
||||
|
||||
private static void createBinaryTokenFile(Configuration conf) {
|
||||
// Fetch delegation tokens and store in binary token file.
|
||||
try {
|
||||
Credentials cred1 = new Credentials();
|
||||
Credentials cred2 = new Credentials();
|
||||
TokenCache.obtainTokensForNamenodesInternal(cred1, new Path[] { p1 },
|
||||
conf);
|
||||
for (Token<? extends TokenIdentifier> t : cred1.getAllTokens()) {
|
||||
cred2.addToken(new Text(DELEGATION_TOKEN_KEY), t);
|
||||
}
|
||||
DataOutputStream os = new DataOutputStream(new FileOutputStream(
|
||||
binaryTokenFileName.toString()));
|
||||
try {
|
||||
cred2.writeTokenStorageToStream(os);
|
||||
} finally {
|
||||
os.close();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
Assert.fail("Exception " + e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* run a distributed job and verify that TokenCache is available
|
||||
* @throws IOException
|
||||
|
@ -252,4 +259,33 @@ public class TestBinaryTokenFile {
|
|||
}
|
||||
assertEquals("dist job res is not 0:", 0, res);
|
||||
}
|
||||
|
||||
/**
|
||||
* run a distributed job with -tokenCacheFile option parameter and
|
||||
* verify that no exception happens.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void testTokenCacheFile() throws IOException {
|
||||
Configuration conf = mrCluster.getConfig();
|
||||
createBinaryTokenFile(conf);
|
||||
// provide namenodes names for the job to get the delegation tokens for
|
||||
final String nnUri = dfsCluster.getURI(0).toString();
|
||||
conf.set(MRJobConfig.JOB_NAMENODES, nnUri + "," + nnUri);
|
||||
|
||||
// using argument to pass the file name
|
||||
final String[] args = {
|
||||
"-tokenCacheFile", binaryTokenFileName.toString(),
|
||||
"-m", "1", "-r", "1", "-mt", "1", "-rt", "1"
|
||||
};
|
||||
int res = -1;
|
||||
try {
|
||||
res = ToolRunner.run(conf, new SleepJob(), args);
|
||||
} catch (Exception e) {
|
||||
System.out.println("Job failed with " + e.getLocalizedMessage());
|
||||
e.printStackTrace(System.out);
|
||||
fail("Job failed");
|
||||
}
|
||||
assertEquals("dist job res is not 0:", 0, res);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue