HDFS-5322. HDFS delegation token not found in cache errors seen on secure HA clusters. Contributed by Jing Zhao.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1531436 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
335923be6f
commit
f7eaacc103
|
@ -1295,6 +1295,29 @@ public abstract class Server {
|
|||
}
|
||||
}
|
||||
|
||||
private Throwable getCauseForInvalidToken(IOException e) {
|
||||
Throwable cause = e;
|
||||
while (cause != null) {
|
||||
if (cause instanceof RetriableException) {
|
||||
return (RetriableException) cause;
|
||||
} else if (cause instanceof StandbyException) {
|
||||
return (StandbyException) cause;
|
||||
} else if (cause instanceof InvalidToken) {
|
||||
// FIXME: hadoop method signatures are restricting the SASL
|
||||
// callbacks to only returning InvalidToken, but some services
|
||||
// need to throw other exceptions (ex. NN + StandyException),
|
||||
// so for now we'll tunnel the real exceptions via an
|
||||
// InvalidToken's cause which normally is not set
|
||||
if (cause.getCause() != null) {
|
||||
cause = cause.getCause();
|
||||
}
|
||||
return cause;
|
||||
}
|
||||
cause = cause.getCause();
|
||||
}
|
||||
return e;
|
||||
}
|
||||
|
||||
private void saslProcess(RpcSaslProto saslMessage)
|
||||
throws WrappedRpcServerException, IOException, InterruptedException {
|
||||
if (saslContextEstablished) {
|
||||
|
@ -1307,29 +1330,11 @@ public abstract class Server {
|
|||
try {
|
||||
saslResponse = processSaslMessage(saslMessage);
|
||||
} catch (IOException e) {
|
||||
IOException sendToClient = e;
|
||||
Throwable cause = e;
|
||||
while (cause != null) {
|
||||
if (cause instanceof InvalidToken) {
|
||||
// FIXME: hadoop method signatures are restricting the SASL
|
||||
// callbacks to only returning InvalidToken, but some services
|
||||
// need to throw other exceptions (ex. NN + StandyException),
|
||||
// so for now we'll tunnel the real exceptions via an
|
||||
// InvalidToken's cause which normally is not set
|
||||
if (cause.getCause() != null) {
|
||||
cause = cause.getCause();
|
||||
}
|
||||
sendToClient = (IOException) cause;
|
||||
break;
|
||||
}
|
||||
cause = cause.getCause();
|
||||
}
|
||||
rpcMetrics.incrAuthenticationFailures();
|
||||
String clientIP = this.toString();
|
||||
// attempting user could be null
|
||||
AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser +
|
||||
" (" + e.getLocalizedMessage() + ")");
|
||||
throw sendToClient;
|
||||
AUDITLOG.warn(AUTH_FAILED_FOR + this.toString() + ":"
|
||||
+ attemptingUser + " (" + e.getLocalizedMessage() + ")");
|
||||
throw (IOException) getCauseForInvalidToken(e);
|
||||
}
|
||||
|
||||
if (saslServer != null && saslServer.isComplete()) {
|
||||
|
|
|
@ -45,11 +45,13 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ipc.RetriableException;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.ipc.Server.Connection;
|
||||
import org.apache.hadoop.ipc.StandbyException;
|
||||
import org.apache.hadoop.security.token.SecretManager;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
|
||||
/**
|
||||
* A utility class for dealing with SASL on RPC server
|
||||
|
@ -267,13 +269,15 @@ public class SaslRpcServer {
|
|||
this.connection = connection;
|
||||
}
|
||||
|
||||
private char[] getPassword(TokenIdentifier tokenid) throws InvalidToken {
|
||||
return encodePassword(secretManager.retrievePassword(tokenid));
|
||||
private char[] getPassword(TokenIdentifier tokenid) throws InvalidToken,
|
||||
StandbyException, RetriableException, IOException {
|
||||
return encodePassword(secretManager.retriableRetrievePassword(tokenid));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(Callback[] callbacks) throws InvalidToken,
|
||||
UnsupportedCallbackException {
|
||||
UnsupportedCallbackException, StandbyException, RetriableException,
|
||||
IOException {
|
||||
NameCallback nc = null;
|
||||
PasswordCallback pc = null;
|
||||
AuthorizeCallback ac = null;
|
||||
|
@ -292,7 +296,8 @@ public class SaslRpcServer {
|
|||
}
|
||||
}
|
||||
if (pc != null) {
|
||||
TokenIdentifier tokenIdentifier = getIdentifier(nc.getDefaultName(), secretManager);
|
||||
TokenIdentifier tokenIdentifier = getIdentifier(nc.getDefaultName(),
|
||||
secretManager);
|
||||
char[] password = getPassword(tokenIdentifier);
|
||||
UserGroupInformation user = null;
|
||||
user = tokenIdentifier.getUser(); // may throw exception
|
||||
|
|
|
@ -29,6 +29,7 @@ import javax.crypto.spec.SecretKeySpec;
|
|||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.ipc.RetriableException;
|
||||
import org.apache.hadoop.ipc.StandbyException;
|
||||
|
||||
|
||||
|
@ -66,7 +67,29 @@ public abstract class SecretManager<T extends TokenIdentifier> {
|
|||
* @return the password to use
|
||||
* @throws InvalidToken the token was invalid
|
||||
*/
|
||||
public abstract byte[] retrievePassword(T identifier) throws InvalidToken;
|
||||
public abstract byte[] retrievePassword(T identifier)
|
||||
throws InvalidToken;
|
||||
|
||||
/**
|
||||
* The same functionality with {@link #retrievePassword}, except that this
|
||||
* method can throw a {@link RetriableException} or a {@link StandbyException}
|
||||
* to indicate that client can retry/failover the same operation because of
|
||||
* temporary issue on the server side.
|
||||
*
|
||||
* @param identifier the identifier to validate
|
||||
* @return the password to use
|
||||
* @throws InvalidToken the token was invalid
|
||||
* @throws StandbyException the server is in standby state, the client can
|
||||
* try other servers
|
||||
* @throws RetriableException the token was invalid, and the server thinks
|
||||
* this may be a temporary issue and suggests the client to retry
|
||||
* @throws IOException to allow future exceptions to be added without breaking
|
||||
* compatibility
|
||||
*/
|
||||
public byte[] retriableRetrievePassword(T identifier)
|
||||
throws InvalidToken, StandbyException, RetriableException, IOException {
|
||||
return retrievePassword(identifier);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an empty token identifier.
|
||||
|
|
|
@ -289,20 +289,30 @@ extends AbstractDelegationTokenIdentifier>
|
|||
+ tokenRenewInterval, password, getTrackingIdIfEnabled(identifier)));
|
||||
return password;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized byte[] retrievePassword(TokenIdent identifier)
|
||||
|
||||
/**
|
||||
* Find the DelegationTokenInformation for the given token id, and verify that
|
||||
* if the token is expired. Note that this method should be called with
|
||||
* acquiring the secret manager's monitor.
|
||||
*/
|
||||
protected DelegationTokenInformation checkToken(TokenIdent identifier)
|
||||
throws InvalidToken {
|
||||
assert Thread.holdsLock(this);
|
||||
DelegationTokenInformation info = currentTokens.get(identifier);
|
||||
if (info == null) {
|
||||
throw new InvalidToken("token (" + identifier.toString()
|
||||
+ ") can't be found in cache");
|
||||
}
|
||||
long now = Time.now();
|
||||
if (info.getRenewDate() < now) {
|
||||
if (info.getRenewDate() < Time.now()) {
|
||||
throw new InvalidToken("token (" + identifier.toString() + ") is expired");
|
||||
}
|
||||
return info.getPassword();
|
||||
return info;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized byte[] retrievePassword(TokenIdent identifier)
|
||||
throws InvalidToken {
|
||||
return checkToken(identifier).getPassword();
|
||||
}
|
||||
|
||||
protected String getTrackingIdIfEnabled(TokenIdent ident) {
|
||||
|
|
|
@ -369,6 +369,9 @@ Release 2.2.1 - UNRELEASED
|
|||
HDFS-5335. Hive query failed with possible race in dfs output stream.
|
||||
(Haohui Mai via suresh)
|
||||
|
||||
HDFS-5322. HDFS delegation token not found in cache errors seen on secure HA
|
||||
clusters. (jing9)
|
||||
|
||||
Release 2.2.0 - 2013-10-13
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Co
|
|||
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
|
||||
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.ipc.RetriableException;
|
||||
import org.apache.hadoop.ipc.StandbyException;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
|
@ -115,6 +116,24 @@ public class DelegationTokenSecretManager
|
|||
return super.retrievePassword(identifier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] retriableRetrievePassword(DelegationTokenIdentifier identifier)
|
||||
throws InvalidToken, StandbyException, RetriableException, IOException {
|
||||
namesystem.checkOperation(OperationCategory.READ);
|
||||
try {
|
||||
return super.retrievePassword(identifier);
|
||||
} catch (InvalidToken it) {
|
||||
if (namesystem.inTransitionToActive()) {
|
||||
// if the namesystem is currently in the middle of transition to
|
||||
// active state, let client retry since the corresponding editlog may
|
||||
// have not been applied yet
|
||||
throw new RetriableException(it);
|
||||
} else {
|
||||
throw it;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns expiry time of a token given its identifier.
|
||||
*
|
||||
|
|
|
@ -455,6 +455,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
private HAContext haContext;
|
||||
|
||||
private final boolean haEnabled;
|
||||
|
||||
/**
|
||||
* Whether the namenode is in the middle of starting the active service
|
||||
*/
|
||||
private volatile boolean startingActiveService = false;
|
||||
|
||||
private INodeId inodeId;
|
||||
|
||||
|
@ -903,6 +908,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
* @throws IOException
|
||||
*/
|
||||
void startActiveServices() throws IOException {
|
||||
startingActiveService = true;
|
||||
LOG.info("Starting services required for active state");
|
||||
writeLock();
|
||||
try {
|
||||
|
@ -957,8 +963,19 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
nnrmthread.start();
|
||||
} finally {
|
||||
writeUnlock();
|
||||
startingActiveService = false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Whether the namenode is transitioning to active state and is in the
|
||||
* middle of the {@link #startActiveServices()}
|
||||
*/
|
||||
public boolean inTransitionToActive() {
|
||||
return haEnabled && haContext != null
|
||||
&& haContext.getState().getServiceState() == HAServiceState.ACTIVE
|
||||
&& startingActiveService;
|
||||
}
|
||||
|
||||
private boolean shouldUseDelegationTokens() {
|
||||
return UserGroupInformation.isSecurityEnabled() ||
|
||||
|
@ -6460,11 +6477,17 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
* Verifies that the given identifier and password are valid and match.
|
||||
* @param identifier Token identifier.
|
||||
* @param password Password in the token.
|
||||
* @throws InvalidToken
|
||||
*/
|
||||
public synchronized void verifyToken(DelegationTokenIdentifier identifier,
|
||||
byte[] password) throws InvalidToken {
|
||||
getDelegationTokenSecretManager().verifyToken(identifier, password);
|
||||
byte[] password) throws InvalidToken, RetriableException {
|
||||
try {
|
||||
getDelegationTokenSecretManager().verifyToken(identifier, password);
|
||||
} catch (InvalidToken it) {
|
||||
if (inTransitionToActive()) {
|
||||
throw new RetriableException(it);
|
||||
}
|
||||
throw it;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -6481,6 +6504,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
return editLogTailer;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setEditLogTailerForTests(EditLogTailer tailer) {
|
||||
this.editLogTailer = tailer;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void setFsLockForTests(ReentrantReadWriteLock lock) {
|
||||
this.fsLock = lock;
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.AbstractFileSystem;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.HAUtil;
|
||||
|
@ -47,19 +48,22 @@ import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
|||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.ipc.RetriableException;
|
||||
import org.apache.hadoop.ipc.StandbyException;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.SecurityUtilTestHelper;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.mockito.internal.util.reflection.Whitebox;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
|
||||
|
@ -78,8 +82,12 @@ public class TestDelegationTokensWithHA {
|
|||
private static DelegationTokenSecretManager dtSecretManager;
|
||||
private static DistributedFileSystem dfs;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
private volatile boolean catchup = false;
|
||||
|
||||
@Before
|
||||
public void setupCluster() throws Exception {
|
||||
SecurityUtilTestHelper.setTokenServiceUseIp(true);
|
||||
|
||||
conf.setBoolean(
|
||||
DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
|
||||
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTH_TO_LOCAL,
|
||||
|
@ -101,18 +109,12 @@ public class TestDelegationTokensWithHA {
|
|||
nn0.getNamesystem());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void shutdownCluster() throws IOException {
|
||||
@After
|
||||
public void shutdownCluster() throws IOException {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Before
|
||||
public void prepTest() {
|
||||
SecurityUtilTestHelper.setTokenServiceUseIp(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDelegationTokenDFSApi() throws Exception {
|
||||
|
@ -155,6 +157,96 @@ public class TestDelegationTokensWithHA {
|
|||
doRenewOrCancel(token, clientConf, TokenTestAction.CANCEL);
|
||||
}
|
||||
|
||||
private class EditLogTailerForTest extends EditLogTailer {
|
||||
public EditLogTailerForTest(FSNamesystem namesystem, Configuration conf) {
|
||||
super(namesystem, conf);
|
||||
}
|
||||
|
||||
public void catchupDuringFailover() throws IOException {
|
||||
synchronized (TestDelegationTokensWithHA.this) {
|
||||
while (!catchup) {
|
||||
try {
|
||||
LOG.info("The editlog tailer is waiting to catchup...");
|
||||
TestDelegationTokensWithHA.this.wait();
|
||||
} catch (InterruptedException e) {}
|
||||
}
|
||||
}
|
||||
super.catchupDuringFailover();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test if correct exception (StandbyException or RetriableException) can be
|
||||
* thrown during the NN failover.
|
||||
*/
|
||||
@Test
|
||||
public void testDelegationTokenDuringNNFailover() throws Exception {
|
||||
EditLogTailer editLogTailer = nn1.getNamesystem().getEditLogTailer();
|
||||
// stop the editLogTailer of nn1
|
||||
editLogTailer.stop();
|
||||
Configuration conf = (Configuration) Whitebox.getInternalState(
|
||||
editLogTailer, "conf");
|
||||
nn1.getNamesystem().setEditLogTailerForTests(
|
||||
new EditLogTailerForTest(nn1.getNamesystem(), conf));
|
||||
|
||||
// create token
|
||||
final Token<DelegationTokenIdentifier> token =
|
||||
getDelegationToken(fs, "JobTracker");
|
||||
DelegationTokenIdentifier identifier = new DelegationTokenIdentifier();
|
||||
byte[] tokenId = token.getIdentifier();
|
||||
identifier.readFields(new DataInputStream(
|
||||
new ByteArrayInputStream(tokenId)));
|
||||
|
||||
// Ensure that it's present in the nn0 secret manager and can
|
||||
// be renewed directly from there.
|
||||
LOG.info("A valid token should have non-null password, " +
|
||||
"and should be renewed successfully");
|
||||
assertTrue(null != dtSecretManager.retrievePassword(identifier));
|
||||
dtSecretManager.renewToken(token, "JobTracker");
|
||||
|
||||
// transition nn0 to standby
|
||||
cluster.transitionToStandby(0);
|
||||
|
||||
try {
|
||||
cluster.getNameNodeRpc(0).renewDelegationToken(token);
|
||||
fail("StandbyException is expected since nn0 is in standby state");
|
||||
} catch (StandbyException e) {
|
||||
GenericTestUtils.assertExceptionContains(
|
||||
HAServiceState.STANDBY.toString(), e);
|
||||
}
|
||||
|
||||
new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
cluster.transitionToActive(1);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Transition nn1 to active failed", e);
|
||||
}
|
||||
}
|
||||
}.start();
|
||||
|
||||
Thread.sleep(1000);
|
||||
try {
|
||||
nn1.getNamesystem().verifyToken(token.decodeIdentifier(),
|
||||
token.getPassword());
|
||||
fail("RetriableException/StandbyException is expected since nn1 is in transition");
|
||||
} catch (IOException e) {
|
||||
assertTrue(e instanceof StandbyException
|
||||
|| e instanceof RetriableException);
|
||||
LOG.info("Got expected exception", e);
|
||||
}
|
||||
|
||||
catchup = true;
|
||||
synchronized (this) {
|
||||
this.notifyAll();
|
||||
}
|
||||
|
||||
Configuration clientConf = dfs.getConf();
|
||||
doRenewOrCancel(token, clientConf, TokenTestAction.RENEW);
|
||||
doRenewOrCancel(token, clientConf, TokenTestAction.CANCEL);
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Test
|
||||
public void testDelegationTokenWithDoAs() throws Exception {
|
||||
|
|
Loading…
Reference in New Issue