HADOOP-9084. Augment DelegationTokenRenewer API to cancel the tokens on calls to removeRenewAction. (kkambatl via tucu)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1414326 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
966e20fa3d
commit
f6230b1c8c
|
@ -450,6 +450,9 @@ Release 2.0.3-alpha - Unreleased
|
||||||
HADOOP-9049. DelegationTokenRenewer needs to be Singleton and FileSystems
|
HADOOP-9049. DelegationTokenRenewer needs to be Singleton and FileSystems
|
||||||
should register/deregister to/from. (Karthik Kambatla via tomwhite)
|
should register/deregister to/from. (Karthik Kambatla via tomwhite)
|
||||||
|
|
||||||
|
HADOOP-9084. Augment DelegationTokenRenewer API to cancel the tokens on
|
||||||
|
calls to removeRenewAction. (kkambatl via tucu)
|
||||||
|
|
||||||
Release 2.0.2-alpha - 2012-09-07
|
Release 2.0.2-alpha - 2012-09-07
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -291,5 +291,13 @@
|
||||||
<Field name="previousSnapshot" />
|
<Field name="previousSnapshot" />
|
||||||
<Bug pattern="IS2_INCONSISTENT_SYNC" />
|
<Bug pattern="IS2_INCONSISTENT_SYNC" />
|
||||||
</Match>
|
</Match>
|
||||||
|
<!--
|
||||||
|
The method uses a generic type T that extends two other types
|
||||||
|
T1 and T2. Findbugs complains of a cast from T1 to T2.
|
||||||
|
-->
|
||||||
|
<Match>
|
||||||
|
<Class name="org.apache.hadoop.fs.DelegationTokenRenewer" />
|
||||||
|
<Method name="removeRenewAction" />
|
||||||
|
<Bug pattern="BC_UNCONFIRMED_CAST" />
|
||||||
|
</Match>
|
||||||
</FindBugsFilter>
|
</FindBugsFilter>
|
||||||
|
|
|
@ -24,6 +24,8 @@ import java.util.concurrent.DelayQueue;
|
||||||
import java.util.concurrent.Delayed;
|
import java.util.concurrent.Delayed;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
|
@ -35,6 +37,9 @@ import org.apache.hadoop.util.Time;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class DelegationTokenRenewer
|
public class DelegationTokenRenewer
|
||||||
extends Thread {
|
extends Thread {
|
||||||
|
private static final Log LOG = LogFactory
|
||||||
|
.getLog(DelegationTokenRenewer.class);
|
||||||
|
|
||||||
/** The renewable interface used by the renewer. */
|
/** The renewable interface used by the renewer. */
|
||||||
public interface Renewable {
|
public interface Renewable {
|
||||||
/** @return the renew token. */
|
/** @return the renew token. */
|
||||||
|
@ -168,11 +173,24 @@ public class DelegationTokenRenewer
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Remove the associated renew action from the queue */
|
/**
|
||||||
|
* Remove the associated renew action from the queue
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
public synchronized <T extends FileSystem & Renewable> void removeRenewAction(
|
public synchronized <T extends FileSystem & Renewable> void removeRenewAction(
|
||||||
final T fs) {
|
final T fs) throws IOException {
|
||||||
for (RenewAction<?> action : queue) {
|
for (RenewAction<?> action : queue) {
|
||||||
if (action.weakFs.get() == fs) {
|
if (action.weakFs.get() == fs) {
|
||||||
|
try {
|
||||||
|
fs.getRenewToken().cancel(fs.getConf());
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
LOG.error("Interrupted while canceling token for " + fs.getUri()
|
||||||
|
+ "filesystem");
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug(ie.getStackTrace());
|
||||||
|
}
|
||||||
|
}
|
||||||
queue.remove(action);
|
queue.remove(action);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,6 +23,7 @@ public class TestDelegationTokenRenewer {
|
||||||
@SuppressWarnings("rawtypes")
|
@SuppressWarnings("rawtypes")
|
||||||
static class TestToken extends Token {
|
static class TestToken extends Token {
|
||||||
public volatile int renewCount = 0;
|
public volatile int renewCount = 0;
|
||||||
|
public volatile boolean cancelled = false;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long renew(Configuration conf) {
|
public long renew(Configuration conf) {
|
||||||
|
@ -33,6 +34,11 @@ public class TestDelegationTokenRenewer {
|
||||||
}
|
}
|
||||||
return renewCount;
|
return renewCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void cancel(Configuration conf) {
|
||||||
|
cancelled = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static class TestFileSystem extends FileSystem implements
|
static class TestFileSystem extends FileSystem implements
|
||||||
|
@ -123,27 +129,12 @@ public class TestDelegationTokenRenewer {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAddRenewAction() throws IOException, InterruptedException {
|
public void testAddRemoveRenewAction() throws IOException,
|
||||||
|
InterruptedException {
|
||||||
TestFileSystem tfs = new TestFileSystem();
|
TestFileSystem tfs = new TestFileSystem();
|
||||||
renewer.addRenewAction(tfs);
|
renewer.addRenewAction(tfs);
|
||||||
|
|
||||||
for (int i = 0; i < 10; i++) {
|
for (int i = 0; i < 60; i++) {
|
||||||
Thread.sleep(RENEW_CYCLE);
|
|
||||||
if (tfs.testToken.renewCount > 0) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
assertTrue("Token not renewed even after 10 seconds",
|
|
||||||
(tfs.testToken.renewCount > 0));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testRemoveRenewAction() throws IOException, InterruptedException {
|
|
||||||
TestFileSystem tfs = new TestFileSystem();
|
|
||||||
renewer.addRenewAction(tfs);
|
|
||||||
|
|
||||||
for (int i = 0; i < 10; i++) {
|
|
||||||
Thread.sleep(RENEW_CYCLE);
|
Thread.sleep(RENEW_CYCLE);
|
||||||
if (tfs.testToken.renewCount > 0) {
|
if (tfs.testToken.renewCount > 0) {
|
||||||
renewer.removeRenewAction(tfs);
|
renewer.removeRenewAction(tfs);
|
||||||
|
@ -151,9 +142,9 @@ public class TestDelegationTokenRenewer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
assertTrue("Token not renewed even once",
|
assertTrue("Token not renewed even after 1 minute",
|
||||||
(tfs.testToken.renewCount > 0));
|
(tfs.testToken.renewCount > 0));
|
||||||
assertTrue("Token not removed",
|
assertTrue("Token not removed", (tfs.testToken.renewCount < MAX_RENEWALS));
|
||||||
(tfs.testToken.renewCount < MAX_RENEWALS));
|
assertTrue("Token not cancelled", tfs.testToken.cancelled);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue