YARN-3336. FileSystem memory leak in DelegationTokenRenewer.
(cherry picked from commit6ca1f12024
) (cherry picked from commit342c525eaa
)
This commit is contained in:
parent
6b9f2d9f39
commit
57e297208d
|
@ -732,6 +732,9 @@ Release 2.7.0 - UNRELEASED
|
|||
YARN-3384. TestLogAggregationService.verifyContainerLogs fails after
|
||||
YARN-2777. (Naganarasimha G R via ozawa)
|
||||
|
||||
YARN-3336. FileSystem memory leak in DelegationTokenRenewer.
|
||||
(Zhihai Xu via cnauroth)
|
||||
|
||||
Release 2.6.0 - 2014-11-18
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -605,6 +605,7 @@ public class DelegationTokenRenewer extends AbstractService {
|
|||
rmContext.getSystemCredentialsForApps().put(applicationId, byteBuffer);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected Token<?>[] obtainSystemTokensForUser(String user,
|
||||
final Credentials credentials) throws IOException, InterruptedException {
|
||||
// Get new hdfs tokens on behalf of this user
|
||||
|
@ -615,8 +616,16 @@ public class DelegationTokenRenewer extends AbstractService {
|
|||
proxyUser.doAs(new PrivilegedExceptionAction<Token<?>[]>() {
|
||||
@Override
|
||||
public Token<?>[] run() throws Exception {
|
||||
return FileSystem.get(getConfig()).addDelegationTokens(
|
||||
UserGroupInformation.getLoginUser().getUserName(), credentials);
|
||||
FileSystem fs = FileSystem.get(getConfig());
|
||||
try {
|
||||
return fs.addDelegationTokens(
|
||||
UserGroupInformation.getLoginUser().getUserName(),
|
||||
credentials);
|
||||
} finally {
|
||||
// Close the FileSystem created by the new proxy user,
|
||||
// So that we don't leave an entry in the FileSystem cache
|
||||
fs.close();
|
||||
}
|
||||
}
|
||||
});
|
||||
return newTokens;
|
||||
|
|
|
@ -287,9 +287,16 @@ public class TestDelegationTokenRenewer {
|
|||
* exception
|
||||
*/
|
||||
static class MyFS extends DistributedFileSystem {
|
||||
|
||||
public MyFS() {}
|
||||
public void close() {}
|
||||
private static AtomicInteger instanceCounter = new AtomicInteger();
|
||||
public MyFS() {
|
||||
instanceCounter.incrementAndGet();
|
||||
}
|
||||
public void close() {
|
||||
instanceCounter.decrementAndGet();
|
||||
}
|
||||
public static int getInstanceCounter() {
|
||||
return instanceCounter.get();
|
||||
}
|
||||
@Override
|
||||
public void initialize(URI uri, Configuration conf) throws IOException {}
|
||||
|
||||
|
@ -299,6 +306,11 @@ public class TestDelegationTokenRenewer {
|
|||
LOG.info("Called MYDFS.getdelegationtoken " + result);
|
||||
return result;
|
||||
}
|
||||
|
||||
public Token<?>[] addDelegationTokens(
|
||||
final String renewer, Credentials credentials) throws IOException {
|
||||
return new Token<?>[0];
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1022,4 +1034,16 @@ public class TestDelegationTokenRenewer {
|
|||
// app2 completes, app1 is still running, check the token is not cancelled
|
||||
Assert.assertFalse(Renewer.cancelled);
|
||||
}
|
||||
|
||||
// Test FileSystem memory leak in obtainSystemTokensForUser.
|
||||
@Test
|
||||
public void testFSLeakInObtainSystemTokensForUser() throws Exception{
|
||||
Credentials credentials = new Credentials();
|
||||
String user = "test";
|
||||
int oldCounter = MyFS.getInstanceCounter();
|
||||
delegationTokenRenewer.obtainSystemTokensForUser(user, credentials);
|
||||
delegationTokenRenewer.obtainSystemTokensForUser(user, credentials);
|
||||
delegationTokenRenewer.obtainSystemTokensForUser(user, credentials);
|
||||
Assert.assertEquals(oldCounter, MyFS.getInstanceCounter());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue