diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index ce4dd041e14..7966b8da3f7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -1292,6 +1292,29 @@ public abstract class Server { } } + private Throwable getCauseForInvalidToken(IOException e) { + Throwable cause = e; + while (cause != null) { + if (cause instanceof RetriableException) { + return (RetriableException) cause; + } else if (cause instanceof StandbyException) { + return (StandbyException) cause; + } else if (cause instanceof InvalidToken) { + // FIXME: hadoop method signatures are restricting the SASL + // callbacks to only returning InvalidToken, but some services + // need to throw other exceptions (ex. NN + StandyException), + // so for now we'll tunnel the real exceptions via an + // InvalidToken's cause which normally is not set + if (cause.getCause() != null) { + cause = cause.getCause(); + } + return cause; + } + cause = cause.getCause(); + } + return e; + } + private void saslProcess(RpcSaslProto saslMessage) throws WrappedRpcServerException, IOException, InterruptedException { if (saslContextEstablished) { @@ -1304,29 +1327,11 @@ public abstract class Server { try { saslResponse = processSaslMessage(saslMessage); } catch (IOException e) { - IOException sendToClient = e; - Throwable cause = e; - while (cause != null) { - if (cause instanceof InvalidToken) { - // FIXME: hadoop method signatures are restricting the SASL - // callbacks to only returning InvalidToken, but some services - // need to throw other exceptions (ex. NN + StandyException), - // so for now we'll tunnel the real exceptions via an - // InvalidToken's cause which normally is not set - if (cause.getCause() != null) { - cause = cause.getCause(); - } - sendToClient = (IOException) cause; - break; - } - cause = cause.getCause(); - } rpcMetrics.incrAuthenticationFailures(); - String clientIP = this.toString(); // attempting user could be null - AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser + - " (" + e.getLocalizedMessage() + ")"); - throw sendToClient; + AUDITLOG.warn(AUTH_FAILED_FOR + this.toString() + ":" + + attemptingUser + " (" + e.getLocalizedMessage() + ")"); + throw (IOException) getCauseForInvalidToken(e); } if (saslServer != null && saslServer.isComplete()) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcServer.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcServer.java index 2390dfcd658..72b56c81749 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcServer.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcServer.java @@ -45,11 +45,13 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server.Connection; +import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.security.token.SecretManager; -import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.SecretManager.InvalidToken; +import org.apache.hadoop.security.token.TokenIdentifier; /** * A utility class for dealing with SASL on RPC server @@ -267,13 +269,15 @@ public class SaslRpcServer { this.connection = connection; } - private char[] getPassword(TokenIdentifier tokenid) throws InvalidToken { - return encodePassword(secretManager.retrievePassword(tokenid)); + private char[] getPassword(TokenIdentifier tokenid) throws InvalidToken, + StandbyException, RetriableException, IOException { + return encodePassword(secretManager.retriableRetrievePassword(tokenid)); } @Override public void handle(Callback[] callbacks) throws InvalidToken, - UnsupportedCallbackException { + UnsupportedCallbackException, StandbyException, RetriableException, + IOException { NameCallback nc = null; PasswordCallback pc = null; AuthorizeCallback ac = null; @@ -292,7 +296,8 @@ public class SaslRpcServer { } } if (pc != null) { - TokenIdentifier tokenIdentifier = getIdentifier(nc.getDefaultName(), secretManager); + TokenIdentifier tokenIdentifier = getIdentifier(nc.getDefaultName(), + secretManager); char[] password = getPassword(tokenIdentifier); UserGroupInformation user = null; user = tokenIdentifier.getUser(); // may throw exception diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/SecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/SecretManager.java index 37634b646fc..5fe03912edd 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/SecretManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/SecretManager.java @@ -29,6 +29,7 @@ import javax.crypto.spec.SecretKeySpec; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.ipc.StandbyException; @@ -66,7 +67,29 @@ public abstract class SecretManager { * @return the password to use * @throws InvalidToken the token was invalid */ - public abstract byte[] retrievePassword(T identifier) throws InvalidToken; + public abstract byte[] retrievePassword(T identifier) + throws InvalidToken; + + /** + * The same functionality with {@link #retrievePassword}, except that this + * method can throw a {@link RetriableException} or a {@link StandbyException} + * to indicate that client can retry/failover the same operation because of + * temporary issue on the server side. + * + * @param identifier the identifier to validate + * @return the password to use + * @throws InvalidToken the token was invalid + * @throws StandbyException the server is in standby state, the client can + * try other servers + * @throws RetriableException the token was invalid, and the server thinks + * this may be a temporary issue and suggests the client to retry + * @throws IOException to allow future exceptions to be added without breaking + * compatibility + */ + public byte[] retriableRetrievePassword(T identifier) + throws InvalidToken, StandbyException, RetriableException, IOException { + return retrievePassword(identifier); + } /** * Create an empty token identifier. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java index e8a632a5e3d..d892c5d7f36 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java @@ -289,20 +289,30 @@ extends AbstractDelegationTokenIdentifier> + tokenRenewInterval, password, getTrackingIdIfEnabled(identifier))); return password; } - - @Override - public synchronized byte[] retrievePassword(TokenIdent identifier) + + /** + * Find the DelegationTokenInformation for the given token id, and verify that + * if the token is expired. Note that this method should be called with + * acquiring the secret manager's monitor. + */ + protected DelegationTokenInformation checkToken(TokenIdent identifier) throws InvalidToken { + assert Thread.holdsLock(this); DelegationTokenInformation info = currentTokens.get(identifier); if (info == null) { throw new InvalidToken("token (" + identifier.toString() + ") can't be found in cache"); } - long now = Time.now(); - if (info.getRenewDate() < now) { + if (info.getRenewDate() < Time.now()) { throw new InvalidToken("token (" + identifier.toString() + ") is expired"); } - return info.getPassword(); + return info; + } + + @Override + public synchronized byte[] retrievePassword(TokenIdent identifier) + throws InvalidToken { + return checkToken(identifier).getPassword(); } protected String getTrackingIdIfEnabled(TokenIdent ident) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 759e786ac88..c9a72949621 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -125,6 +125,9 @@ Release 2.2.1 - UNRELEASED HDFS-5335. Hive query failed with possible race in dfs output stream. (Haohui Mai via suresh) + HDFS-5322. HDFS delegation token not found in cache errors seen on secure HA + clusters. (jing9) + Release 2.2.0 - 2013-10-13 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java index f233d1f5f72..b2446cbb806 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Co import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step; import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType; import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SecurityUtil; @@ -115,6 +116,24 @@ public class DelegationTokenSecretManager return super.retrievePassword(identifier); } + @Override + public byte[] retriableRetrievePassword(DelegationTokenIdentifier identifier) + throws InvalidToken, StandbyException, RetriableException, IOException { + namesystem.checkOperation(OperationCategory.READ); + try { + return super.retrievePassword(identifier); + } catch (InvalidToken it) { + if (namesystem.inTransitionToActive()) { + // if the namesystem is currently in the middle of transition to + // active state, let client retry since the corresponding editlog may + // have not been applied yet + throw new RetriableException(it); + } else { + throw it; + } + } + } + /** * Returns expiry time of a token given its identifier. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 53e86176bf0..05dad980b5a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -440,6 +440,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats, private HAContext haContext; private final boolean haEnabled; + + /** + * Whether the namenode is in the middle of starting the active service + */ + private volatile boolean startingActiveService = false; private INodeId inodeId; @@ -888,6 +893,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, * @throws IOException */ void startActiveServices() throws IOException { + startingActiveService = true; LOG.info("Starting services required for active state"); writeLock(); try { @@ -942,8 +948,19 @@ public class FSNamesystem implements Namesystem, FSClusterStats, nnrmthread.start(); } finally { writeUnlock(); + startingActiveService = false; } } + + /** + * @return Whether the namenode is transitioning to active state and is in the + * middle of the {@link #startActiveServices()} + */ + public boolean inTransitionToActive() { + return haEnabled && haContext != null + && haContext.getState().getServiceState() == HAServiceState.ACTIVE + && startingActiveService; + } private boolean shouldUseDelegationTokens() { return UserGroupInformation.isSecurityEnabled() || @@ -6443,11 +6460,17 @@ public class FSNamesystem implements Namesystem, FSClusterStats, * Verifies that the given identifier and password are valid and match. * @param identifier Token identifier. * @param password Password in the token. - * @throws InvalidToken */ public synchronized void verifyToken(DelegationTokenIdentifier identifier, - byte[] password) throws InvalidToken { - getDelegationTokenSecretManager().verifyToken(identifier, password); + byte[] password) throws InvalidToken, RetriableException { + try { + getDelegationTokenSecretManager().verifyToken(identifier, password); + } catch (InvalidToken it) { + if (inTransitionToActive()) { + throw new RetriableException(it); + } + throw it; + } } @Override @@ -6464,6 +6487,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats, return editLogTailer; } + @VisibleForTesting + public void setEditLogTailerForTests(EditLogTailer tailer) { + this.editLogTailer = tailer; + } + @VisibleForTesting void setFsLockForTests(ReentrantReadWriteLock lock) { this.fsLock = lock; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java index d16bddd1d97..c90988233ca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java @@ -39,6 +39,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.AbstractFileSystem; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HAUtil; @@ -47,19 +48,22 @@ import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.RetriableException; +import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtilTestHelper; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.test.GenericTestUtils; -import org.junit.AfterClass; +import org.junit.After; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; import com.google.common.base.Joiner; @@ -78,8 +82,12 @@ public class TestDelegationTokensWithHA { private static DelegationTokenSecretManager dtSecretManager; private static DistributedFileSystem dfs; - @BeforeClass - public static void setupCluster() throws Exception { + private volatile boolean catchup = false; + + @Before + public void setupCluster() throws Exception { + SecurityUtilTestHelper.setTokenServiceUseIp(true); + conf.setBoolean( DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true); conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTH_TO_LOCAL, @@ -101,18 +109,12 @@ public class TestDelegationTokensWithHA { nn0.getNamesystem()); } - @AfterClass - public static void shutdownCluster() throws IOException { + @After + public void shutdownCluster() throws IOException { if (cluster != null) { cluster.shutdown(); } } - - - @Before - public void prepTest() { - SecurityUtilTestHelper.setTokenServiceUseIp(true); - } @Test public void testDelegationTokenDFSApi() throws Exception { @@ -155,6 +157,96 @@ public class TestDelegationTokensWithHA { doRenewOrCancel(token, clientConf, TokenTestAction.CANCEL); } + private class EditLogTailerForTest extends EditLogTailer { + public EditLogTailerForTest(FSNamesystem namesystem, Configuration conf) { + super(namesystem, conf); + } + + public void catchupDuringFailover() throws IOException { + synchronized (TestDelegationTokensWithHA.this) { + while (!catchup) { + try { + LOG.info("The editlog tailer is waiting to catchup..."); + TestDelegationTokensWithHA.this.wait(); + } catch (InterruptedException e) {} + } + } + super.catchupDuringFailover(); + } + } + + /** + * Test if correct exception (StandbyException or RetriableException) can be + * thrown during the NN failover. + */ + @Test + public void testDelegationTokenDuringNNFailover() throws Exception { + EditLogTailer editLogTailer = nn1.getNamesystem().getEditLogTailer(); + // stop the editLogTailer of nn1 + editLogTailer.stop(); + Configuration conf = (Configuration) Whitebox.getInternalState( + editLogTailer, "conf"); + nn1.getNamesystem().setEditLogTailerForTests( + new EditLogTailerForTest(nn1.getNamesystem(), conf)); + + // create token + final Token token = + getDelegationToken(fs, "JobTracker"); + DelegationTokenIdentifier identifier = new DelegationTokenIdentifier(); + byte[] tokenId = token.getIdentifier(); + identifier.readFields(new DataInputStream( + new ByteArrayInputStream(tokenId))); + + // Ensure that it's present in the nn0 secret manager and can + // be renewed directly from there. + LOG.info("A valid token should have non-null password, " + + "and should be renewed successfully"); + assertTrue(null != dtSecretManager.retrievePassword(identifier)); + dtSecretManager.renewToken(token, "JobTracker"); + + // transition nn0 to standby + cluster.transitionToStandby(0); + + try { + cluster.getNameNodeRpc(0).renewDelegationToken(token); + fail("StandbyException is expected since nn0 is in standby state"); + } catch (StandbyException e) { + GenericTestUtils.assertExceptionContains( + HAServiceState.STANDBY.toString(), e); + } + + new Thread() { + @Override + public void run() { + try { + cluster.transitionToActive(1); + } catch (Exception e) { + LOG.error("Transition nn1 to active failed", e); + } + } + }.start(); + + Thread.sleep(1000); + try { + nn1.getNamesystem().verifyToken(token.decodeIdentifier(), + token.getPassword()); + fail("RetriableException/StandbyException is expected since nn1 is in transition"); + } catch (IOException e) { + assertTrue(e instanceof StandbyException + || e instanceof RetriableException); + LOG.info("Got expected exception", e); + } + + catchup = true; + synchronized (this) { + this.notifyAll(); + } + + Configuration clientConf = dfs.getConf(); + doRenewOrCancel(token, clientConf, TokenTestAction.RENEW); + doRenewOrCancel(token, clientConf, TokenTestAction.CANCEL); + } + @SuppressWarnings("deprecation") @Test public void testDelegationTokenWithDoAs() throws Exception {