HDFS-5322. Merge change r1531436 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1531440 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jing Zhao 2013-10-11 21:53:29 +00:00
parent f3e4452e31
commit c89a36106e
8 changed files with 233 additions and 48 deletions

View File

@ -1292,6 +1292,29 @@ private void saslReadAndProcess(DataInputStream dis) throws
}
}
private Throwable getCauseForInvalidToken(IOException e) {
Throwable cause = e;
while (cause != null) {
if (cause instanceof RetriableException) {
return (RetriableException) cause;
} else if (cause instanceof StandbyException) {
return (StandbyException) cause;
} else if (cause instanceof InvalidToken) {
// FIXME: hadoop method signatures are restricting the SASL
// callbacks to only returning InvalidToken, but some services
// need to throw other exceptions (ex. NN + StandyException),
// so for now we'll tunnel the real exceptions via an
// InvalidToken's cause which normally is not set
if (cause.getCause() != null) {
cause = cause.getCause();
}
return cause;
}
cause = cause.getCause();
}
return e;
}
private void saslProcess(RpcSaslProto saslMessage)
throws WrappedRpcServerException, IOException, InterruptedException {
if (saslContextEstablished) {
@ -1304,29 +1327,11 @@ private void saslProcess(RpcSaslProto saslMessage)
try {
saslResponse = processSaslMessage(saslMessage);
} catch (IOException e) {
IOException sendToClient = e;
Throwable cause = e;
while (cause != null) {
if (cause instanceof InvalidToken) {
// FIXME: hadoop method signatures are restricting the SASL
// callbacks to only returning InvalidToken, but some services
// need to throw other exceptions (ex. NN + StandyException),
// so for now we'll tunnel the real exceptions via an
// InvalidToken's cause which normally is not set
if (cause.getCause() != null) {
cause = cause.getCause();
}
sendToClient = (IOException) cause;
break;
}
cause = cause.getCause();
}
rpcMetrics.incrAuthenticationFailures();
String clientIP = this.toString();
// attempting user could be null
AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser +
" (" + e.getLocalizedMessage() + ")");
throw sendToClient;
AUDITLOG.warn(AUTH_FAILED_FOR + this.toString() + ":"
+ attemptingUser + " (" + e.getLocalizedMessage() + ")");
throw (IOException) getCauseForInvalidToken(e);
}
if (saslServer != null && saslServer.isComplete()) {

View File

@ -45,11 +45,13 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.Server.Connection;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.TokenIdentifier;
/**
* A utility class for dealing with SASL on RPC server
@ -267,13 +269,15 @@ public SaslDigestCallbackHandler(
this.connection = connection;
}
private char[] getPassword(TokenIdentifier tokenid) throws InvalidToken {
return encodePassword(secretManager.retrievePassword(tokenid));
private char[] getPassword(TokenIdentifier tokenid) throws InvalidToken,
StandbyException, RetriableException, IOException {
return encodePassword(secretManager.retriableRetrievePassword(tokenid));
}
@Override
public void handle(Callback[] callbacks) throws InvalidToken,
UnsupportedCallbackException {
UnsupportedCallbackException, StandbyException, RetriableException,
IOException {
NameCallback nc = null;
PasswordCallback pc = null;
AuthorizeCallback ac = null;
@ -292,7 +296,8 @@ public void handle(Callback[] callbacks) throws InvalidToken,
}
}
if (pc != null) {
TokenIdentifier tokenIdentifier = getIdentifier(nc.getDefaultName(), secretManager);
TokenIdentifier tokenIdentifier = getIdentifier(nc.getDefaultName(),
secretManager);
char[] password = getPassword(tokenIdentifier);
UserGroupInformation user = null;
user = tokenIdentifier.getUser(); // may throw exception

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.StandbyException;
@ -66,7 +67,29 @@ public InvalidToken(String msg) {
* @return the password to use
* @throws InvalidToken the token was invalid
*/
public abstract byte[] retrievePassword(T identifier) throws InvalidToken;
public abstract byte[] retrievePassword(T identifier)
throws InvalidToken;
/**
* The same functionality with {@link #retrievePassword}, except that this
* method can throw a {@link RetriableException} or a {@link StandbyException}
* to indicate that client can retry/failover the same operation because of
* temporary issue on the server side.
*
* @param identifier the identifier to validate
* @return the password to use
* @throws InvalidToken the token was invalid
* @throws StandbyException the server is in standby state, the client can
* try other servers
* @throws RetriableException the token was invalid, and the server thinks
* this may be a temporary issue and suggests the client to retry
* @throws IOException to allow future exceptions to be added without breaking
* compatibility
*/
public byte[] retriableRetrievePassword(T identifier)
throws InvalidToken, StandbyException, RetriableException, IOException {
return retrievePassword(identifier);
}
/**
* Create an empty token identifier.

View File

@ -289,20 +289,30 @@ protected synchronized byte[] createPassword(TokenIdent identifier) {
+ tokenRenewInterval, password, getTrackingIdIfEnabled(identifier)));
return password;
}
@Override
public synchronized byte[] retrievePassword(TokenIdent identifier)
/**
* Find the DelegationTokenInformation for the given token id, and verify that
* if the token is expired. Note that this method should be called with
* acquiring the secret manager's monitor.
*/
protected DelegationTokenInformation checkToken(TokenIdent identifier)
throws InvalidToken {
assert Thread.holdsLock(this);
DelegationTokenInformation info = currentTokens.get(identifier);
if (info == null) {
throw new InvalidToken("token (" + identifier.toString()
+ ") can't be found in cache");
}
long now = Time.now();
if (info.getRenewDate() < now) {
if (info.getRenewDate() < Time.now()) {
throw new InvalidToken("token (" + identifier.toString() + ") is expired");
}
return info.getPassword();
return info;
}
@Override
public synchronized byte[] retrievePassword(TokenIdent identifier)
throws InvalidToken {
return checkToken(identifier).getPassword();
}
protected String getTrackingIdIfEnabled(TokenIdent ident) {

View File

@ -125,6 +125,9 @@ Release 2.2.1 - UNRELEASED
HDFS-5335. Hive query failed with possible race in dfs output stream.
(Haohui Mai via suresh)
HDFS-5322. HDFS delegation token not found in cache errors seen on secure HA
clusters. (jing9)
Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES

View File

@ -37,6 +37,7 @@
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
@ -115,6 +116,24 @@ public byte[] retrievePassword(
return super.retrievePassword(identifier);
}
@Override
public byte[] retriableRetrievePassword(DelegationTokenIdentifier identifier)
throws InvalidToken, StandbyException, RetriableException, IOException {
namesystem.checkOperation(OperationCategory.READ);
try {
return super.retrievePassword(identifier);
} catch (InvalidToken it) {
if (namesystem.inTransitionToActive()) {
// if the namesystem is currently in the middle of transition to
// active state, let client retry since the corresponding editlog may
// have not been applied yet
throw new RetriableException(it);
} else {
throw it;
}
}
}
/**
* Returns expiry time of a token given its identifier.
*

View File

@ -440,6 +440,11 @@ private void logAuditEvent(boolean succeeded,
private HAContext haContext;
private final boolean haEnabled;
/**
* Whether the namenode is in the middle of starting the active service
*/
private volatile boolean startingActiveService = false;
private INodeId inodeId;
@ -888,6 +893,7 @@ void stopCommonServices() {
* @throws IOException
*/
void startActiveServices() throws IOException {
startingActiveService = true;
LOG.info("Starting services required for active state");
writeLock();
try {
@ -942,8 +948,19 @@ void startActiveServices() throws IOException {
nnrmthread.start();
} finally {
writeUnlock();
startingActiveService = false;
}
}
/**
* @return Whether the namenode is transitioning to active state and is in the
* middle of the {@link #startActiveServices()}
*/
public boolean inTransitionToActive() {
return haEnabled && haContext != null
&& haContext.getState().getServiceState() == HAServiceState.ACTIVE
&& startingActiveService;
}
private boolean shouldUseDelegationTokens() {
return UserGroupInformation.isSecurityEnabled() ||
@ -6443,11 +6460,17 @@ public String getSoftwareVersion() {
* Verifies that the given identifier and password are valid and match.
* @param identifier Token identifier.
* @param password Password in the token.
* @throws InvalidToken
*/
public synchronized void verifyToken(DelegationTokenIdentifier identifier,
byte[] password) throws InvalidToken {
getDelegationTokenSecretManager().verifyToken(identifier, password);
byte[] password) throws InvalidToken, RetriableException {
try {
getDelegationTokenSecretManager().verifyToken(identifier, password);
} catch (InvalidToken it) {
if (inTransitionToActive()) {
throw new RetriableException(it);
}
throw it;
}
}
@Override
@ -6464,6 +6487,11 @@ public EditLogTailer getEditLogTailer() {
return editLogTailer;
}
@VisibleForTesting
public void setEditLogTailerForTests(EditLogTailer tailer) {
this.editLogTailer = tailer;
}
@VisibleForTesting
void setFsLockForTests(ReentrantReadWriteLock lock) {
this.fsLock = lock;

View File

@ -39,6 +39,7 @@
import org.apache.hadoop.fs.AbstractFileSystem;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HAUtil;
@ -47,19 +48,22 @@
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.SecurityUtilTestHelper;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.AfterClass;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.internal.util.reflection.Whitebox;
import com.google.common.base.Joiner;
@ -78,8 +82,12 @@ public class TestDelegationTokensWithHA {
private static DelegationTokenSecretManager dtSecretManager;
private static DistributedFileSystem dfs;
@BeforeClass
public static void setupCluster() throws Exception {
private volatile boolean catchup = false;
@Before
public void setupCluster() throws Exception {
SecurityUtilTestHelper.setTokenServiceUseIp(true);
conf.setBoolean(
DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTH_TO_LOCAL,
@ -101,18 +109,12 @@ public static void setupCluster() throws Exception {
nn0.getNamesystem());
}
@AfterClass
public static void shutdownCluster() throws IOException {
@After
public void shutdownCluster() throws IOException {
if (cluster != null) {
cluster.shutdown();
}
}
@Before
public void prepTest() {
SecurityUtilTestHelper.setTokenServiceUseIp(true);
}
@Test
public void testDelegationTokenDFSApi() throws Exception {
@ -155,6 +157,96 @@ public void testDelegationTokenDFSApi() throws Exception {
doRenewOrCancel(token, clientConf, TokenTestAction.CANCEL);
}
private class EditLogTailerForTest extends EditLogTailer {
public EditLogTailerForTest(FSNamesystem namesystem, Configuration conf) {
super(namesystem, conf);
}
public void catchupDuringFailover() throws IOException {
synchronized (TestDelegationTokensWithHA.this) {
while (!catchup) {
try {
LOG.info("The editlog tailer is waiting to catchup...");
TestDelegationTokensWithHA.this.wait();
} catch (InterruptedException e) {}
}
}
super.catchupDuringFailover();
}
}
/**
* Test if correct exception (StandbyException or RetriableException) can be
* thrown during the NN failover.
*/
@Test
public void testDelegationTokenDuringNNFailover() throws Exception {
EditLogTailer editLogTailer = nn1.getNamesystem().getEditLogTailer();
// stop the editLogTailer of nn1
editLogTailer.stop();
Configuration conf = (Configuration) Whitebox.getInternalState(
editLogTailer, "conf");
nn1.getNamesystem().setEditLogTailerForTests(
new EditLogTailerForTest(nn1.getNamesystem(), conf));
// create token
final Token<DelegationTokenIdentifier> token =
getDelegationToken(fs, "JobTracker");
DelegationTokenIdentifier identifier = new DelegationTokenIdentifier();
byte[] tokenId = token.getIdentifier();
identifier.readFields(new DataInputStream(
new ByteArrayInputStream(tokenId)));
// Ensure that it's present in the nn0 secret manager and can
// be renewed directly from there.
LOG.info("A valid token should have non-null password, " +
"and should be renewed successfully");
assertTrue(null != dtSecretManager.retrievePassword(identifier));
dtSecretManager.renewToken(token, "JobTracker");
// transition nn0 to standby
cluster.transitionToStandby(0);
try {
cluster.getNameNodeRpc(0).renewDelegationToken(token);
fail("StandbyException is expected since nn0 is in standby state");
} catch (StandbyException e) {
GenericTestUtils.assertExceptionContains(
HAServiceState.STANDBY.toString(), e);
}
new Thread() {
@Override
public void run() {
try {
cluster.transitionToActive(1);
} catch (Exception e) {
LOG.error("Transition nn1 to active failed", e);
}
}
}.start();
Thread.sleep(1000);
try {
nn1.getNamesystem().verifyToken(token.decodeIdentifier(),
token.getPassword());
fail("RetriableException/StandbyException is expected since nn1 is in transition");
} catch (IOException e) {
assertTrue(e instanceof StandbyException
|| e instanceof RetriableException);
LOG.info("Got expected exception", e);
}
catchup = true;
synchronized (this) {
this.notifyAll();
}
Configuration clientConf = dfs.getConf();
doRenewOrCancel(token, clientConf, TokenTestAction.RENEW);
doRenewOrCancel(token, clientConf, TokenTestAction.CANCEL);
}
@SuppressWarnings("deprecation")
@Test
public void testDelegationTokenWithDoAs() throws Exception {