svn merge -c 1467307 Merging from trunk to branch-2 to fix HDFS-4477.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1467308 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ac7d455a0f
commit
c9b9ebde1d
@ -27,8 +27,10 @@
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import javax.crypto.SecretKey;
|
import javax.crypto.SecretKey;
|
||||||
|
|
||||||
@ -144,6 +146,10 @@ protected void logUpdateMasterKey(DelegationKey key) throws IOException {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void logExpireToken(TokenIdent ident) throws IOException {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Update the current master key
|
* Update the current master key
|
||||||
* This is called once by startThreads before tokenRemoverThread is created,
|
* This is called once by startThreads before tokenRemoverThread is created,
|
||||||
@ -363,15 +369,25 @@ byte[] getPassword() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/** Remove expired delegation tokens from cache */
|
/** Remove expired delegation tokens from cache */
|
||||||
private synchronized void removeExpiredToken() {
|
private void removeExpiredToken() throws IOException {
|
||||||
long now = Time.now();
|
long now = Time.now();
|
||||||
Iterator<DelegationTokenInformation> i = currentTokens.values().iterator();
|
Set<TokenIdent> expiredTokens = new HashSet<TokenIdent>();
|
||||||
while (i.hasNext()) {
|
synchronized (this) {
|
||||||
long renewDate = i.next().getRenewDate();
|
Iterator<Map.Entry<TokenIdent, DelegationTokenInformation>> i =
|
||||||
if (now > renewDate) {
|
currentTokens.entrySet().iterator();
|
||||||
i.remove();
|
while (i.hasNext()) {
|
||||||
|
Map.Entry<TokenIdent, DelegationTokenInformation> entry = i.next();
|
||||||
|
long renewDate = entry.getValue().getRenewDate();
|
||||||
|
if (renewDate < now) {
|
||||||
|
expiredTokens.add(entry.getKey());
|
||||||
|
i.remove();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// don't hold lock on 'this' to avoid edit log updates blocking token ops
|
||||||
|
for (TokenIdent ident : expiredTokens) {
|
||||||
|
logExpireToken(ident);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void stopThreads() {
|
public void stopThreads() {
|
||||||
|
@ -2210,6 +2210,8 @@ Release 0.23.8 - UNRELEASED
|
|||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
||||||
|
HDFS-4477. Secondary namenode may retain old tokens (daryn via kihwal)
|
||||||
|
|
||||||
Release 0.23.7 - UNRELEASED
|
Release 0.23.7 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -310,6 +310,23 @@ protected void logUpdateMasterKey(DelegationKey key)
|
|||||||
namesystem.logUpdateMasterKey(key);
|
namesystem.logUpdateMasterKey(key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override //AbstractDelegationTokenManager
|
||||||
|
protected void logExpireToken(final DelegationTokenIdentifier dtId)
|
||||||
|
throws IOException {
|
||||||
|
synchronized (noInterruptsLock) {
|
||||||
|
// The edit logging code will fail catastrophically if it
|
||||||
|
// is interrupted during a logSync, since the interrupt
|
||||||
|
// closes the edit log files. Doing this inside the
|
||||||
|
// above lock and then checking interruption status
|
||||||
|
// prevents this bug.
|
||||||
|
if (Thread.interrupted()) {
|
||||||
|
throw new InterruptedIOException(
|
||||||
|
"Interrupted before expiring delegation token");
|
||||||
|
}
|
||||||
|
namesystem.logExpireDelegationToken(dtId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/** A utility method for creating credentials. */
|
/** A utility method for creating credentials. */
|
||||||
public static Credentials createCredentials(final NameNode namenode,
|
public static Credentials createCredentials(final NameNode namenode,
|
||||||
|
@ -5289,6 +5289,21 @@ public void logUpdateMasterKey(DelegationKey key) {
|
|||||||
getEditLog().logSync();
|
getEditLog().logSync();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Log the cancellation of expired tokens to edit logs
|
||||||
|
*
|
||||||
|
* @param id token identifier to cancel
|
||||||
|
*/
|
||||||
|
public void logExpireDelegationToken(DelegationTokenIdentifier id) {
|
||||||
|
assert !isInSafeMode() :
|
||||||
|
"this should never be called while in safemode, since we stop " +
|
||||||
|
"the DT manager before entering safemode!";
|
||||||
|
// No need to hold FSN lock since we don't access any internal
|
||||||
|
// structures, and this is stopped before the FSN shuts itself
|
||||||
|
// down, etc.
|
||||||
|
getEditLog().logCancelDelegationToken(id);
|
||||||
|
}
|
||||||
|
|
||||||
private void logReassignLease(String leaseHolder, String src,
|
private void logReassignLease(String leaseHolder, String src,
|
||||||
String newHolder) {
|
String newHolder) {
|
||||||
assert hasWriteLock();
|
assert hasWriteLock();
|
||||||
|
@ -17,6 +17,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.namenode;
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
@ -30,12 +31,14 @@
|
|||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
|
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import static org.mockito.Mockito.*;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class tests the creation and validation of a checkpoint.
|
* This class tests the creation and validation of a checkpoint.
|
||||||
@ -163,4 +166,70 @@ public void testEditLog() throws IOException {
|
|||||||
if(cluster != null) cluster.shutdown();
|
if(cluster != null) cluster.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout=10000)
|
||||||
|
public void testEditsForCancelOnTokenExpire() throws IOException,
|
||||||
|
InterruptedException {
|
||||||
|
long renewInterval = 2000;
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setBoolean(
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
|
||||||
|
conf.setLong(DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY, renewInterval);
|
||||||
|
conf.setLong(DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_KEY, renewInterval*2);
|
||||||
|
|
||||||
|
Text renewer = new Text(UserGroupInformation.getCurrentUser().getUserName());
|
||||||
|
FSImage fsImage = mock(FSImage.class);
|
||||||
|
FSEditLog log = mock(FSEditLog.class);
|
||||||
|
doReturn(log).when(fsImage).getEditLog();
|
||||||
|
FSNamesystem fsn = new FSNamesystem(conf, fsImage);
|
||||||
|
|
||||||
|
DelegationTokenSecretManager dtsm = fsn.getDelegationTokenSecretManager();
|
||||||
|
try {
|
||||||
|
dtsm.startThreads();
|
||||||
|
|
||||||
|
// get two tokens
|
||||||
|
Token<DelegationTokenIdentifier> token1 = fsn.getDelegationToken(renewer);
|
||||||
|
Token<DelegationTokenIdentifier> token2 = fsn.getDelegationToken(renewer);
|
||||||
|
DelegationTokenIdentifier ident1 =
|
||||||
|
(DelegationTokenIdentifier)token1.decodeIdentifier();
|
||||||
|
DelegationTokenIdentifier ident2 =
|
||||||
|
(DelegationTokenIdentifier)token2.decodeIdentifier();
|
||||||
|
|
||||||
|
// verify we got the tokens
|
||||||
|
verify(log, times(1)).logGetDelegationToken(eq(ident1), anyLong());
|
||||||
|
verify(log, times(1)).logGetDelegationToken(eq(ident2), anyLong());
|
||||||
|
|
||||||
|
// this is a little tricky because DTSM doesn't let us set scan interval
|
||||||
|
// so need to periodically sleep, then stop/start threads to force scan
|
||||||
|
|
||||||
|
// renew first token 1/2 to expire
|
||||||
|
Thread.sleep(renewInterval/2);
|
||||||
|
fsn.renewDelegationToken(token2);
|
||||||
|
verify(log, times(1)).logRenewDelegationToken(eq(ident2), anyLong());
|
||||||
|
// force scan and give it a little time to complete
|
||||||
|
dtsm.stopThreads(); dtsm.startThreads();
|
||||||
|
Thread.sleep(250);
|
||||||
|
// no token has expired yet
|
||||||
|
verify(log, times(0)).logCancelDelegationToken(eq(ident1));
|
||||||
|
verify(log, times(0)).logCancelDelegationToken(eq(ident2));
|
||||||
|
|
||||||
|
// sleep past expiration of 1st non-renewed token
|
||||||
|
Thread.sleep(renewInterval/2);
|
||||||
|
dtsm.stopThreads(); dtsm.startThreads();
|
||||||
|
Thread.sleep(250);
|
||||||
|
// non-renewed token should have implicitly been cancelled
|
||||||
|
verify(log, times(1)).logCancelDelegationToken(eq(ident1));
|
||||||
|
verify(log, times(0)).logCancelDelegationToken(eq(ident2));
|
||||||
|
|
||||||
|
// sleep past expiration of 2nd renewed token
|
||||||
|
Thread.sleep(renewInterval/2);
|
||||||
|
dtsm.stopThreads(); dtsm.startThreads();
|
||||||
|
Thread.sleep(250);
|
||||||
|
// both tokens should have been implicitly cancelled by now
|
||||||
|
verify(log, times(1)).logCancelDelegationToken(eq(ident1));
|
||||||
|
verify(log, times(1)).logCancelDelegationToken(eq(ident2));
|
||||||
|
} finally {
|
||||||
|
dtsm.stopThreads();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user