merge MAPREDUCE-3529 from trunk
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1227375 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
19f69fa577
commit
eb82638bd7
|
@ -338,6 +338,9 @@ Release 0.23.1 - Unreleased
|
|||
MAPREDUCE-3490. Fixed MapReduce AM to count failed maps also towards Reduce
|
||||
ramp up. (Sharad Agarwal and Arun C Murthy via vinodkv)
|
||||
|
||||
MAPREDUCE-3529. TokenCache does not cache viewfs credentials correctly
|
||||
(sseth)
|
||||
|
||||
Release 0.23.0 - 2011-11-01
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -19,9 +19,7 @@
|
|||
package org.apache.hadoop.mapreduce.security;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -35,9 +33,7 @@ import org.apache.hadoop.io.Text;
|
|||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.Master;
|
||||
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
|
||||
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
|
@ -90,7 +86,7 @@ public class TokenCache {
|
|||
obtainTokensForNamenodesInternal(fs, credentials, conf);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* get delegation token for a specific FS
|
||||
* @param fs
|
||||
|
@ -99,6 +95,7 @@ public class TokenCache {
|
|||
* @param conf
|
||||
* @throws IOException
|
||||
*/
|
||||
@SuppressWarnings("deprecation")
|
||||
static void obtainTokensForNamenodesInternal(FileSystem fs,
|
||||
Credentials credentials, Configuration conf) throws IOException {
|
||||
String delegTokenRenewer = Master.getMasterPrincipal(conf);
|
||||
|
@ -131,7 +128,8 @@ public class TokenCache {
|
|||
return;
|
||||
}
|
||||
}
|
||||
List<Token<?>> tokens = fs.getDelegationTokens(delegTokenRenewer);
|
||||
List<Token<?>> tokens =
|
||||
fs.getDelegationTokens(delegTokenRenewer, credentials);
|
||||
if (tokens != null) {
|
||||
for (Token<?> token : tokens) {
|
||||
credentials.addToken(token.getService(), token);
|
||||
|
@ -141,13 +139,13 @@ public class TokenCache {
|
|||
}
|
||||
//Call getDelegationToken as well for now - for FS implementations
|
||||
// which may not have implmented getDelegationTokens (hftp)
|
||||
Token<?> token = fs.getDelegationToken(delegTokenRenewer);
|
||||
if (token != null) {
|
||||
Text fsNameText = new Text(fsName);
|
||||
token.setService(fsNameText);
|
||||
credentials.addToken(fsNameText, token);
|
||||
LOG.info("Got dt for " + fs.getUri() + ";uri="+ fsName +
|
||||
";t.service="+token.getService());
|
||||
if (tokens == null || tokens.size() == 0) {
|
||||
Token<?> token = fs.getDelegationToken(delegTokenRenewer);
|
||||
if (token != null) {
|
||||
credentials.addToken(token.getService(), token);
|
||||
LOG.info("Got dt for " + fs.getUri() + ";uri=" + fsName
|
||||
+ ";t.service=" + token.getService());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,161 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapreduce.security;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapred.Master;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.junit.Test;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
public class TestTokenCache {
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("deprecation")
|
||||
public void testGetDelegationTokensNotImplemented() throws Exception {
|
||||
Credentials credentials = new Credentials();
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(YarnConfiguration.RM_PRINCIPAL, "mapred/host@REALM");
|
||||
String renewer = Master.getMasterPrincipal(conf);
|
||||
|
||||
FileSystem fs = setupSingleFsWithoutGetDelegationTokens();
|
||||
TokenCache.obtainTokensForNamenodesInternal(fs, credentials, conf);
|
||||
assertEquals(1, credentials.getAllTokens().size());
|
||||
|
||||
verify(fs).getDelegationTokens(renewer, credentials);
|
||||
verify(fs).getDelegationToken(renewer);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testManagedFileSystem() throws Exception {
|
||||
Credentials credentials = new Credentials();
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(YarnConfiguration.RM_PRINCIPAL, "mapred/host@REALM");
|
||||
String renewer = Master.getMasterPrincipal(conf);
|
||||
|
||||
FileSystem singleFs = setupSingleFs();
|
||||
FileSystem multiFs = setupMultiFs(singleFs, renewer, credentials);
|
||||
|
||||
TokenCache.obtainTokensForNamenodesInternal(singleFs, credentials, conf);
|
||||
assertEquals(1, credentials.getAllTokens().size());
|
||||
|
||||
TokenCache.obtainTokensForNamenodesInternal(singleFs, credentials, conf);
|
||||
assertEquals(1, credentials.getAllTokens().size());
|
||||
|
||||
TokenCache.obtainTokensForNamenodesInternal(multiFs, credentials, conf);
|
||||
assertEquals(2, credentials.getAllTokens().size());
|
||||
|
||||
TokenCache.obtainTokensForNamenodesInternal(multiFs, credentials, conf);
|
||||
assertEquals(2, credentials.getAllTokens().size());
|
||||
|
||||
verify(singleFs, times(1)).getDelegationTokens(renewer, credentials);
|
||||
verify(multiFs, times(2)).getDelegationTokens(renewer, credentials);
|
||||
// A call to getDelegationToken would have generated an exception.
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
private FileSystem setupSingleFsWithoutGetDelegationTokens() throws Exception {
|
||||
FileSystem mockFs = mock(FileSystem.class);
|
||||
when(mockFs.getCanonicalServiceName()).thenReturn("singlefs4");
|
||||
when(mockFs.getUri()).thenReturn(new URI("singlefs4:///"));
|
||||
|
||||
final Token<?> mockToken = (Token<?>) mock(Token.class);
|
||||
when(mockToken.getService()).thenReturn(new Text("singlefs4"));
|
||||
|
||||
when(mockFs.getDelegationToken(any(String.class))).thenAnswer(
|
||||
new Answer<Token<?>>() {
|
||||
@Override
|
||||
public Token<?> answer(InvocationOnMock invocation) throws Throwable {
|
||||
return mockToken;
|
||||
}
|
||||
});
|
||||
|
||||
when(mockFs.getDelegationTokens(any(String.class), any(Credentials.class)))
|
||||
.thenReturn(new LinkedList<Token<?>>());
|
||||
|
||||
return mockFs;
|
||||
}
|
||||
|
||||
private FileSystem setupSingleFs() throws Exception {
|
||||
FileSystem mockFs = mock(FileSystem.class);
|
||||
when(mockFs.getCanonicalServiceName()).thenReturn("singlefs1");
|
||||
when(mockFs.getUri()).thenReturn(new URI("singlefs1:///"));
|
||||
|
||||
List<Token<?>> tokens = new LinkedList<Token<?>>();
|
||||
Token<?> mockToken = mock(Token.class);
|
||||
when(mockToken.getService()).thenReturn(new Text("singlefs1"));
|
||||
tokens.add(mockToken);
|
||||
|
||||
when(mockFs.getDelegationTokens(any(String.class))).thenThrow(
|
||||
new RuntimeException(
|
||||
"getDelegationTokens(renewer) should not be called"));
|
||||
when(mockFs.getDelegationTokens(any(String.class), any(Credentials.class)))
|
||||
.thenReturn(tokens);
|
||||
|
||||
return mockFs;
|
||||
}
|
||||
|
||||
private FileSystem setupMultiFs(final FileSystem singleFs,
|
||||
final String renewer, final Credentials credentials) throws Exception {
|
||||
FileSystem mockFs = mock(FileSystem.class);
|
||||
when(mockFs.getCanonicalServiceName()).thenReturn("multifs");
|
||||
when(mockFs.getUri()).thenReturn(new URI("multifs:///"));
|
||||
|
||||
when(mockFs.getDelegationTokens(any(String.class))).thenThrow(
|
||||
new RuntimeException(
|
||||
"getDelegationTokens(renewer) should not be called"));
|
||||
when(mockFs.getDelegationTokens(renewer, credentials)).thenAnswer(
|
||||
new Answer<List<Token<?>>>() {
|
||||
|
||||
@Override
|
||||
public List<Token<?>> answer(InvocationOnMock invocation)
|
||||
throws Throwable {
|
||||
List<Token<?>> newTokens = new LinkedList<Token<?>>();
|
||||
if (credentials.getToken(new Text("singlefs1")) == null) {
|
||||
newTokens.addAll(singleFs.getDelegationTokens(renewer,
|
||||
credentials));
|
||||
} else {
|
||||
newTokens.add(credentials.getToken(new Text("singlefs1")));
|
||||
}
|
||||
Token<?> mockToken2 = mock(Token.class);
|
||||
when(mockToken2.getService()).thenReturn(new Text("singlefs2"));
|
||||
newTokens.add(mockToken2);
|
||||
return newTokens;
|
||||
}
|
||||
});
|
||||
|
||||
return mockFs;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue