HADOOP-9049. DelegationTokenRenewer needs to be Singleton and FileSystems should register/deregister to/from. Contributed by Karthik Kambatla.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1412077 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Thomas White 2012-11-21 12:29:37 +00:00
parent c24bc56484
commit d6af507199
5 changed files with 243 additions and 25 deletions

View File

@ -444,6 +444,9 @@ Release 2.0.3-alpha - Unreleased
HADOOP-6607. Add different variants of non caching HTTP headers. (tucu) HADOOP-6607. Add different variants of non caching HTTP headers. (tucu)
HADOOP-9049. DelegationTokenRenewer needs to be Singleton and FileSystems
should register/deregister to/from. (Karthik Kambatla via tomwhite)
Release 2.0.2-alpha - 2012-09-07 Release 2.0.2-alpha - 2012-09-07
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -33,7 +33,7 @@ import org.apache.hadoop.util.Time;
* A daemon thread that waits for the next file system to renew. * A daemon thread that waits for the next file system to renew.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class DelegationTokenRenewer<T extends FileSystem & DelegationTokenRenewer.Renewable> public class DelegationTokenRenewer
extends Thread { extends Thread {
/** The renewable interface used by the renewer. */ /** The renewable interface used by the renewer. */
public interface Renewable { public interface Renewable {
@ -93,7 +93,7 @@ public class DelegationTokenRenewer<T extends FileSystem & DelegationTokenRenewe
* @param newTime the new time * @param newTime the new time
*/ */
private void updateRenewalTime() { private void updateRenewalTime() {
renewalTime = RENEW_CYCLE + Time.now(); renewalTime = renewCycle + Time.now();
} }
/** /**
@ -136,32 +136,67 @@ public class DelegationTokenRenewer<T extends FileSystem & DelegationTokenRenewe
/** Wait for 95% of a day between renewals */ /** Wait for 95% of a day between renewals */
private static final int RENEW_CYCLE = 24 * 60 * 60 * 950; private static final int RENEW_CYCLE = 24 * 60 * 60 * 950;
private DelayQueue<RenewAction<T>> queue = new DelayQueue<RenewAction<T>>(); @InterfaceAudience.Private
protected static int renewCycle = RENEW_CYCLE;
public DelegationTokenRenewer(final Class<T> clazz) { /** Queue to maintain the RenewActions to be processed by the {@link #run()} */
private volatile DelayQueue<RenewAction<?>> queue = new DelayQueue<RenewAction<?>>();
/**
* Create the singleton instance. However, the thread can be started lazily in
* {@link #addRenewAction(FileSystem)}
*/
private static DelegationTokenRenewer INSTANCE = null;
private DelegationTokenRenewer(final Class<? extends FileSystem> clazz) {
super(clazz.getSimpleName() + "-" + DelegationTokenRenewer.class.getSimpleName()); super(clazz.getSimpleName() + "-" + DelegationTokenRenewer.class.getSimpleName());
setDaemon(true); setDaemon(true);
} }
/** Add a renew action to the queue. */ public static synchronized DelegationTokenRenewer getInstance() {
public void addRenewAction(final T fs) { if (INSTANCE == null) {
queue.add(new RenewAction<T>(fs)); INSTANCE = new DelegationTokenRenewer(FileSystem.class);
}
return INSTANCE;
} }
/** Add a renew action to the queue. */
public synchronized <T extends FileSystem & Renewable> void addRenewAction(final T fs) {
queue.add(new RenewAction<T>(fs));
if (!isAlive()) {
start();
}
}
/** Remove the associated renew action from the queue */
public synchronized <T extends FileSystem & Renewable> void removeRenewAction(
final T fs) {
for (RenewAction<?> action : queue) {
if (action.weakFs.get() == fs) {
queue.remove(action);
return;
}
}
}
@SuppressWarnings("static-access")
@Override @Override
public void run() { public void run() {
for(;;) { for(;;) {
RenewAction<T> action = null; RenewAction<?> action = null;
try { try {
synchronized (this) {
action = queue.take(); action = queue.take();
if (action.renew()) { if (action.renew()) {
action.updateRenewalTime(); action.updateRenewalTime();
queue.add(action); queue.add(action);
} }
}
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
return; return;
} catch (Exception ie) { } catch (Exception ie) {
T.LOG.warn("Failed to renew token, action=" + action, ie); action.weakFs.get().LOG.warn("Failed to renew token, action=" + action,
ie);
} }
} }
} }

View File

@ -0,0 +1,159 @@
package org.apache.hadoop.fs;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.Progressable;
import org.junit.Before;
import org.junit.Test;
public class TestDelegationTokenRenewer {
private static final int RENEW_CYCLE = 1000;
private static final int MAX_RENEWALS = 100;
@SuppressWarnings("rawtypes")
static class TestToken extends Token {
public volatile int renewCount = 0;
@Override
public long renew(Configuration conf) {
if (renewCount == MAX_RENEWALS) {
Thread.currentThread().interrupt();
} else {
renewCount++;
}
return renewCount;
}
}
static class TestFileSystem extends FileSystem implements
DelegationTokenRenewer.Renewable {
private Configuration mockConf = mock(Configuration.class);;
private TestToken testToken = new TestToken();
@Override
public Configuration getConf() {
return mockConf;
}
@Override
public Token<?> getRenewToken() {
return testToken;
}
@Override
public URI getUri() {
return null;
}
@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
return null;
}
@Override
public FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
return null;
}
@Override
public FSDataOutputStream append(Path f, int bufferSize,
Progressable progress) throws IOException {
return null;
}
@Override
public boolean rename(Path src, Path dst) throws IOException {
return false;
}
@Override
public boolean delete(Path f, boolean recursive) throws IOException {
return false;
}
@Override
public FileStatus[] listStatus(Path f) throws FileNotFoundException,
IOException {
return null;
}
@Override
public void setWorkingDirectory(Path new_dir) {
}
@Override
public Path getWorkingDirectory() {
return null;
}
@Override
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
return false;
}
@Override
public FileStatus getFileStatus(Path f) throws IOException {
return null;
}
@Override
public <T extends TokenIdentifier> void setDelegationToken(Token<T> token) {
return;
}
}
private DelegationTokenRenewer renewer;
@Before
public void setup() {
DelegationTokenRenewer.renewCycle = RENEW_CYCLE;
renewer = DelegationTokenRenewer.getInstance();
}
@Test
public void testAddRenewAction() throws IOException, InterruptedException {
TestFileSystem tfs = new TestFileSystem();
renewer.addRenewAction(tfs);
for (int i = 0; i < 10; i++) {
Thread.sleep(RENEW_CYCLE);
if (tfs.testToken.renewCount > 0) {
return;
}
}
assertTrue("Token not renewed even after 10 seconds",
(tfs.testToken.renewCount > 0));
}
@Test
public void testRemoveRenewAction() throws IOException, InterruptedException {
TestFileSystem tfs = new TestFileSystem();
renewer.addRenewAction(tfs);
for (int i = 0; i < 10; i++) {
Thread.sleep(RENEW_CYCLE);
if (tfs.testToken.renewCount > 0) {
renewer.removeRenewAction(tfs);
break;
}
}
assertTrue("Token not renewed even once",
(tfs.testToken.renewCount > 0));
assertTrue("Token not removed",
(tfs.testToken.renewCount < MAX_RENEWALS));
}
}

View File

@ -82,12 +82,8 @@ import org.xml.sax.helpers.XMLReaderFactory;
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class HftpFileSystem extends FileSystem public class HftpFileSystem extends FileSystem
implements DelegationTokenRenewer.Renewable { implements DelegationTokenRenewer.Renewable {
private static final DelegationTokenRenewer<HftpFileSystem> dtRenewer
= new DelegationTokenRenewer<HftpFileSystem>(HftpFileSystem.class);
static { static {
HttpURLConnection.setFollowRedirects(true); HttpURLConnection.setFollowRedirects(true);
dtRenewer.start();
} }
public static final Text TOKEN_KIND = new Text("HFTP delegation"); public static final Text TOKEN_KIND = new Text("HFTP delegation");
@ -106,6 +102,16 @@ public class HftpFileSystem extends FileSystem
private static final HftpDelegationTokenSelector hftpTokenSelector = private static final HftpDelegationTokenSelector hftpTokenSelector =
new HftpDelegationTokenSelector(); new HftpDelegationTokenSelector();
private DelegationTokenRenewer dtRenewer = null;
private synchronized void addRenewAction(final HftpFileSystem hftpFs) {
if (dtRenewer == null) {
dtRenewer = DelegationTokenRenewer.getInstance();
}
dtRenewer.addRenewAction(hftpFs);
}
public static final SimpleDateFormat getDateFormat() { public static final SimpleDateFormat getDateFormat() {
final SimpleDateFormat df = new SimpleDateFormat(HFTP_DATE_FORMAT); final SimpleDateFormat df = new SimpleDateFormat(HFTP_DATE_FORMAT);
df.setTimeZone(TimeZone.getTimeZone(HFTP_TIMEZONE)); df.setTimeZone(TimeZone.getTimeZone(HFTP_TIMEZONE));
@ -202,7 +208,7 @@ public class HftpFileSystem extends FileSystem
if (token != null) { if (token != null) {
setDelegationToken(token); setDelegationToken(token);
if (createdToken) { if (createdToken) {
dtRenewer.addRenewAction(this); addRenewAction(this);
LOG.debug("Created new DT for " + token.getService()); LOG.debug("Created new DT for " + token.getService());
} else { } else {
LOG.debug("Found existing DT for " + token.getService()); LOG.debug("Found existing DT for " + token.getService());
@ -395,6 +401,14 @@ public class HftpFileSystem extends FileSystem
return new FSDataInputStream(new RangeHeaderInputStream(u)); return new FSDataInputStream(new RangeHeaderInputStream(u));
} }
@Override
public void close() throws IOException {
super.close();
if (dtRenewer != null) {
dtRenewer.removeRenewAction(this); // blocks
}
}
/** Class to parse and store a listing reply from the server. */ /** Class to parse and store a listing reply from the server. */
class LsParser extends DefaultHandler { class LsParser extends DefaultHandler {

View File

@ -124,15 +124,14 @@ public class WebHdfsFileSystem extends FileSystem
public static final WebHdfsDelegationTokenSelector DT_SELECTOR public static final WebHdfsDelegationTokenSelector DT_SELECTOR
= new WebHdfsDelegationTokenSelector(); = new WebHdfsDelegationTokenSelector();
private static DelegationTokenRenewer<WebHdfsFileSystem> DT_RENEWER = null; private DelegationTokenRenewer dtRenewer = null;
private static synchronized void addRenewAction(final WebHdfsFileSystem webhdfs) { private synchronized void addRenewAction(final WebHdfsFileSystem webhdfs) {
if (DT_RENEWER == null) { if (dtRenewer == null) {
DT_RENEWER = new DelegationTokenRenewer<WebHdfsFileSystem>(WebHdfsFileSystem.class); dtRenewer = DelegationTokenRenewer.getInstance();
DT_RENEWER.start();
} }
DT_RENEWER.addRenewAction(webhdfs); dtRenewer.addRenewAction(webhdfs);
} }
/** Is WebHDFS enabled in conf? */ /** Is WebHDFS enabled in conf? */
@ -766,6 +765,14 @@ public class WebHdfsFileSystem extends FileSystem
new OffsetUrlOpener(url), new OffsetUrlOpener(null))); new OffsetUrlOpener(url), new OffsetUrlOpener(null)));
} }
@Override
public void close() throws IOException {
super.close();
if (dtRenewer != null) {
dtRenewer.removeRenewAction(this); // blocks
}
}
class OffsetUrlOpener extends ByteRangeInputStream.URLOpener { class OffsetUrlOpener extends ByteRangeInputStream.URLOpener {
OffsetUrlOpener(final URL url) { OffsetUrlOpener(final URL url) {
super(url); super(url);