HADOOP-13487. Hadoop KMS should load old delegation tokens from Zookeeper on startup. Contributed by Xiao Chen.

(cherry picked from commit 2f21d53e3aaf97f425f954f52b50f36dddb6b886)
This commit is contained in:
Xiao Chen 2016-08-22 14:31:13 -07:00
parent c0c4cc8ee8
commit 0f9bed050d
2 changed files with 136 additions and 1 deletions

View File

@ -361,6 +361,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
}
}
}, listenerThreadPool);
loadFromZKCache(false);
}
} catch (Exception e) {
throw new IOException("Could not start PathChildrenCache for keys", e);
@ -389,6 +390,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
}
}
}, listenerThreadPool);
loadFromZKCache(true);
}
} catch (Exception e) {
throw new IOException("Could not start PathChildrenCache for tokens", e);
@ -396,6 +398,43 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
super.startThreads();
}
/**
* Load the PathChildrenCache into the in-memory map. Possible caches to be
* loaded are keyCache and tokenCache.
*
* @param isTokenCache true if loading tokenCache, false if loading keyCache.
*/
private void loadFromZKCache(final boolean isTokenCache) {
final String cacheName = isTokenCache ? "token" : "key";
LOG.info("Starting to load {} cache.", cacheName);
final List<ChildData> children;
if (isTokenCache) {
children = tokenCache.getCurrentData();
} else {
children = keyCache.getCurrentData();
}
int count = 0;
for (ChildData child : children) {
try {
if (isTokenCache) {
processTokenAddOrUpdate(child);
} else {
processKeyAddOrUpdate(child.getData());
}
} catch (Exception e) {
LOG.info("Ignoring node {} because it failed to load.",
child.getPath());
LOG.debug("Failure exception:", e);
++count;
}
}
if (count > 0) {
LOG.warn("Ignored {} nodes while loading {} cache.", count, cacheName);
}
LOG.info("Loaded {} cache.", cacheName);
}
private void processKeyAddOrUpdate(byte[] data) throws IOException {
ByteArrayInputStream bin = new ByteArrayInputStream(data);
DataInputStream din = new DataInputStream(bin);
@ -890,4 +929,9 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
public ExecutorService getListenerThreadPool() {
return listenerThreadPool;
}
@VisibleForTesting
DelegationTokenInformation getTokenInfoFromMemory(TokenIdent ident) {
return currentTokens.get(ident);
}
}

View File

@ -24,6 +24,7 @@ import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import com.google.common.base.Supplier;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
@ -37,6 +38,7 @@ import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
@ -44,12 +46,18 @@ import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.fail;
import org.junit.Test;
public class TestZKDelegationTokenSecretManager {
private static final Logger LOG =
LoggerFactory.getLogger(TestZKDelegationTokenSecretManager.class);
private static final int TEST_RETRIES = 2;
@ -61,6 +69,9 @@ public class TestZKDelegationTokenSecretManager {
private TestingServer zkServer;
@Rule
public Timeout globalTimeout = new Timeout(300000);
@Before
public void setup() throws Exception {
zkServer = new TestingServer();
@ -382,4 +393,84 @@ public class TestZKDelegationTokenSecretManager {
}
}
@SuppressWarnings({ "unchecked" })
@Test
public void testNodesLoadedAfterRestart() throws Exception {
final String connectString = zkServer.getConnectString();
final Configuration conf = getSecretConf(connectString);
final int removeScan = 1;
// Set the remove scan interval to remove expired tokens
conf.setLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL, removeScan);
// Set the update interval to trigger background thread to run. The thread
// is hard-coded to sleep at least 5 seconds.
conf.setLong(DelegationTokenManager.UPDATE_INTERVAL, 5);
// Set token expire time to 5 seconds.
conf.setLong(DelegationTokenManager.RENEW_INTERVAL, 5);
DelegationTokenManager tm =
new DelegationTokenManager(conf, new Text("bla"));
tm.init();
Token<DelegationTokenIdentifier> token =
(Token<DelegationTokenIdentifier>) tm
.createToken(UserGroupInformation.getCurrentUser(), "good");
Assert.assertNotNull(token);
Token<DelegationTokenIdentifier> cancelled =
(Token<DelegationTokenIdentifier>) tm
.createToken(UserGroupInformation.getCurrentUser(), "cancelled");
Assert.assertNotNull(cancelled);
tm.verifyToken(token);
tm.verifyToken(cancelled);
// Cancel one token, verify it's gone
tm.cancelToken(cancelled, "cancelled");
final AbstractDelegationTokenSecretManager sm =
tm.getDelegationTokenSecretManager();
final ZKDelegationTokenSecretManager zksm =
(ZKDelegationTokenSecretManager) sm;
final AbstractDelegationTokenIdentifier idCancelled =
sm.decodeTokenIdentifier(cancelled);
LOG.info("Waiting for the cancelled token to be removed");
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
AbstractDelegationTokenSecretManager.DelegationTokenInformation dtinfo =
zksm.getTokenInfo(idCancelled);
return dtinfo == null;
}
}, 100, 5000);
// Fake a restart which launches a new tm
tm.destroy();
tm = new DelegationTokenManager(conf, new Text("bla"));
tm.init();
final AbstractDelegationTokenSecretManager smNew =
tm.getDelegationTokenSecretManager();
final ZKDelegationTokenSecretManager zksmNew =
(ZKDelegationTokenSecretManager) smNew;
// The cancelled token should be gone, and not loaded.
AbstractDelegationTokenIdentifier id =
smNew.decodeTokenIdentifier(cancelled);
AbstractDelegationTokenSecretManager.DelegationTokenInformation dtinfo =
zksmNew.getTokenInfo(id);
Assert.assertNull("canceled dt should be gone!", dtinfo);
// The good token should be loaded on startup, and removed after expiry.
id = smNew.decodeTokenIdentifier(token);
dtinfo = zksmNew.getTokenInfoFromMemory(id);
Assert.assertNotNull("good dt should be in memory!", dtinfo);
// Wait for the good token to expire.
Thread.sleep(5000);
final ZKDelegationTokenSecretManager zksm1 = zksmNew;
final AbstractDelegationTokenIdentifier id1 = id;
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
LOG.info("Waiting for the expired token to be removed...");
return zksm1.getTokenInfo(id1) == null;
}
}, 1000, 5000);
}
}