svn merge -c 1481075 Merging from trunk to branch-2 to fix HADOOP-9549.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1481077 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Kihwal Lee 2013-05-10 16:25:03 +00:00
parent 211a9d99db
commit d87e8d26b4
3 changed files with 245 additions and 162 deletions

View File

@ -150,6 +150,8 @@ Release 2.0.5-beta - UNRELEASED
HADOOP-9550. Remove aspectj dependency. (kkambatl via tucu) HADOOP-9550. Remove aspectj dependency. (kkambatl via tucu)
HADOOP-9549. WebHdfsFileSystem hangs on close(). (daryn via kihwal)
Release 2.0.4-alpha - 2013-04-25 Release 2.0.4-alpha - 2013-04-25
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -61,10 +61,12 @@ public class DelegationTokenRenewer
private long renewalTime; private long renewalTime;
/** a weak reference to the file system so that it can be garbage collected */ /** a weak reference to the file system so that it can be garbage collected */
private final WeakReference<T> weakFs; private final WeakReference<T> weakFs;
private Token<?> token;
private RenewAction(final T fs) { private RenewAction(final T fs) {
this.weakFs = new WeakReference<T>(fs); this.weakFs = new WeakReference<T>(fs);
updateRenewalTime(); this.token = fs.getRenewToken();
updateRenewalTime(renewCycle);
} }
/** Get the delay until this event should happen. */ /** Get the delay until this event should happen. */
@ -83,28 +85,32 @@ public class DelegationTokenRenewer
@Override @Override
public int hashCode() { public int hashCode() {
return (int)renewalTime ^ (int)(renewalTime >>> 32); return token.hashCode();
} }
@Override @Override
public boolean equals(final Object that) { public boolean equals(final Object that) {
if (that == null || !(that instanceof RenewAction)) { if (this == that) {
return true;
} else if (that == null || !(that instanceof RenewAction)) {
return false; return false;
} }
return compareTo((Delayed)that) == 0; return token.equals(((RenewAction<?>)that).token);
} }
/** /**
* Set a new time for the renewal. * Set a new time for the renewal.
* It can only be called when the action is not in the queue. * It can only be called when the action is not in the queue or any
* collection because the hashCode may change
* @param newTime the new time * @param newTime the new time
*/ */
private void updateRenewalTime() { private void updateRenewalTime(long delay) {
renewalTime = renewCycle + Time.now(); renewalTime = Time.now() + delay - delay/10;
} }
/** /**
* Renew or replace the delegation token for this file system. * Renew or replace the delegation token for this file system.
* It can only be called when the action is not in the queue.
* @return * @return
* @throws IOException * @throws IOException
*/ */
@ -114,14 +120,17 @@ public class DelegationTokenRenewer
if (b) { if (b) {
synchronized(fs) { synchronized(fs) {
try { try {
fs.getRenewToken().renew(fs.getConf()); long expires = token.renew(fs.getConf());
updateRenewalTime(expires - Time.now());
} catch (IOException ie) { } catch (IOException ie) {
try { try {
Token<?>[] tokens = fs.addDelegationTokens(null, null); Token<?>[] tokens = fs.addDelegationTokens(null, null);
if (tokens.length == 0) { if (tokens.length == 0) {
throw new IOException("addDelegationTokens returned no tokens"); throw new IOException("addDelegationTokens returned no tokens");
} }
fs.setDelegationToken(tokens[0]); token = tokens[0];
updateRenewalTime(renewCycle);
fs.setDelegationToken(token);
} catch (IOException ie2) { } catch (IOException ie2) {
throw new IOException("Can't renew or get new delegation token ", ie); throw new IOException("Can't renew or get new delegation token ", ie);
} }
@ -131,20 +140,27 @@ public class DelegationTokenRenewer
return b; return b;
} }
private void cancel() throws IOException, InterruptedException {
final T fs = weakFs.get();
if (fs != null) {
token.cancel(fs.getConf());
}
}
@Override @Override
public String toString() { public String toString() {
Renewable fs = weakFs.get(); Renewable fs = weakFs.get();
return fs == null? "evaporated token renew" return fs == null? "evaporated token renew"
: "The token will be renewed in " + getDelay(TimeUnit.SECONDS) : "The token will be renewed in " + getDelay(TimeUnit.SECONDS)
+ " secs, renewToken=" + fs.getRenewToken(); + " secs, renewToken=" + token;
} }
} }
/** Wait for 95% of a day between renewals */ /** assumes renew cycle for a token is 24 hours... */
private static final int RENEW_CYCLE = 24 * 60 * 60 * 950; private static final long RENEW_CYCLE = 24 * 60 * 60 * 1000;
@InterfaceAudience.Private @InterfaceAudience.Private
protected static int renewCycle = RENEW_CYCLE; protected static long renewCycle = RENEW_CYCLE;
/** Queue to maintain the RenewActions to be processed by the {@link #run()} */ /** Queue to maintain the RenewActions to be processed by the {@link #run()} */
private volatile DelayQueue<RenewAction<?>> queue = new DelayQueue<RenewAction<?>>(); private volatile DelayQueue<RenewAction<?>> queue = new DelayQueue<RenewAction<?>>();
@ -173,11 +189,34 @@ public class DelegationTokenRenewer
return INSTANCE; return INSTANCE;
} }
@VisibleForTesting
static synchronized void reset() {
if (INSTANCE != null) {
INSTANCE.queue.clear();
INSTANCE.interrupt();
try {
INSTANCE.join();
} catch (InterruptedException e) {
LOG.warn("Failed to reset renewer");
} finally {
INSTANCE = null;
}
}
}
/** Add a renew action to the queue. */ /** Add a renew action to the queue. */
public synchronized <T extends FileSystem & Renewable> void addRenewAction(final T fs) { @SuppressWarnings("static-access")
queue.add(new RenewAction<T>(fs)); public <T extends FileSystem & Renewable> void addRenewAction(final T fs) {
if (!isAlive()) { synchronized (this) {
start(); if (!isAlive()) {
start();
}
}
RenewAction<T> action = new RenewAction<T>(fs);
if (action.token != null) {
queue.add(action);
} else {
fs.LOG.error("does not have a token for renewal");
} }
} }
@ -186,21 +225,18 @@ public class DelegationTokenRenewer
* *
* @throws IOException * @throws IOException
*/ */
public synchronized <T extends FileSystem & Renewable> void removeRenewAction( public <T extends FileSystem & Renewable> void removeRenewAction(
final T fs) throws IOException { final T fs) throws IOException {
for (RenewAction<?> action : queue) { RenewAction<T> action = new RenewAction<T>(fs);
if (action.weakFs.get() == fs) { if (queue.remove(action)) {
try { try {
fs.getRenewToken().cancel(fs.getConf()); action.cancel();
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
LOG.error("Interrupted while canceling token for " + fs.getUri() LOG.error("Interrupted while canceling token for " + fs.getUri()
+ "filesystem"); + "filesystem");
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(ie.getStackTrace()); LOG.debug(ie.getStackTrace());
}
} }
queue.remove(action);
return;
} }
} }
} }
@ -211,12 +247,9 @@ public class DelegationTokenRenewer
for(;;) { for(;;) {
RenewAction<?> action = null; RenewAction<?> action = null;
try { try {
synchronized (this) { action = queue.take();
action = queue.take(); if (action.renew()) {
if (action.renew()) { queue.add(action);
action.updateRenewalTime();
queue.add(action);
}
} }
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
return; return;

View File

@ -17,155 +17,203 @@
*/ */
package org.apache.hadoop.fs; package org.apache.hadoop.fs;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.DelegationTokenRenewer.Renewable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Progressable;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestDelegationTokenRenewer { public class TestDelegationTokenRenewer {
private static final int RENEW_CYCLE = 1000; public abstract class RenewableFileSystem extends FileSystem
private static final int MAX_RENEWALS = 100; implements Renewable { }
@SuppressWarnings("rawtypes") private static final long RENEW_CYCLE = 1000;
static class TestToken extends Token {
public volatile int renewCount = 0;
public volatile boolean cancelled = false;
@Override
public long renew(Configuration conf) {
if (renewCount == MAX_RENEWALS) {
Thread.currentThread().interrupt();
} else {
renewCount++;
}
return renewCount;
}
@Override
public void cancel(Configuration conf) {
cancelled = true;
}
}
static class TestFileSystem extends FileSystem implements
DelegationTokenRenewer.Renewable {
private Configuration mockConf = mock(Configuration.class);;
private TestToken testToken = new TestToken();
@Override
public Configuration getConf() {
return mockConf;
}
@Override
public Token<?> getRenewToken() {
return testToken;
}
@Override
public URI getUri() {
return null;
}
@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
return null;
}
@Override
public FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
return null;
}
@Override
public FSDataOutputStream append(Path f, int bufferSize,
Progressable progress) throws IOException {
return null;
}
@Override
public boolean rename(Path src, Path dst) throws IOException {
return false;
}
@Override
public boolean delete(Path f, boolean recursive) throws IOException {
return false;
}
@Override
public FileStatus[] listStatus(Path f) throws FileNotFoundException,
IOException {
return null;
}
@Override
public void setWorkingDirectory(Path new_dir) {
}
@Override
public Path getWorkingDirectory() {
return null;
}
@Override
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
return false;
}
@Override
public FileStatus getFileStatus(Path f) throws IOException {
return null;
}
@Override
public <T extends TokenIdentifier> void setDelegationToken(Token<T> token) {
return;
}
}
private DelegationTokenRenewer renewer; private DelegationTokenRenewer renewer;
Configuration conf;
FileSystem fs;
@Before @Before
public void setup() { public void setup() {
DelegationTokenRenewer.renewCycle = RENEW_CYCLE; DelegationTokenRenewer.renewCycle = RENEW_CYCLE;
DelegationTokenRenewer.reset();
renewer = DelegationTokenRenewer.getInstance(); renewer = DelegationTokenRenewer.getInstance();
} }
@SuppressWarnings("unchecked")
@Test @Test
public void testAddRemoveRenewAction() throws IOException, public void testAddRemoveRenewAction() throws IOException,
InterruptedException { InterruptedException {
TestFileSystem tfs = new TestFileSystem(); Text service = new Text("myservice");
renewer.addRenewAction(tfs); Configuration conf = mock(Configuration.class);
Token<?> token = mock(Token.class);
doReturn(service).when(token).getService();
doAnswer(new Answer<Long>() {
public Long answer(InvocationOnMock invocation) {
return Time.now() + RENEW_CYCLE;
}
}).when(token).renew(any(Configuration.class));
RenewableFileSystem fs = mock(RenewableFileSystem.class);
doReturn(conf).when(fs).getConf();
doReturn(token).when(fs).getRenewToken();
renewer.addRenewAction(fs);
assertEquals("FileSystem not added to DelegationTokenRenewer", 1, assertEquals("FileSystem not added to DelegationTokenRenewer", 1,
renewer.getRenewQueueLength()); renewer.getRenewQueueLength());
Thread.sleep(RENEW_CYCLE*2);
verify(token, atLeast(2)).renew(eq(conf));
verify(token, atMost(3)).renew(eq(conf));
verify(token, never()).cancel(any(Configuration.class));
renewer.removeRenewAction(fs);
verify(token).cancel(eq(conf));
for (int i = 0; i < 60; i++) { verify(fs, never()).getDelegationToken(null);
Thread.sleep(RENEW_CYCLE); verify(fs, never()).setDelegationToken(any(Token.class));
if (tfs.testToken.renewCount > 0) {
renewer.removeRenewAction(tfs);
break;
}
}
assertTrue("Token not renewed even after 1 minute",
(tfs.testToken.renewCount > 0));
assertEquals("FileSystem not removed from DelegationTokenRenewer", 0, assertEquals("FileSystem not removed from DelegationTokenRenewer", 0,
renewer.getRenewQueueLength()); renewer.getRenewQueueLength());
assertTrue("Token not cancelled", tfs.testToken.cancelled); }
@Test
public void testAddRenewActionWithNoToken() throws IOException,
InterruptedException {
Configuration conf = mock(Configuration.class);
RenewableFileSystem fs = mock(RenewableFileSystem.class);
doReturn(conf).when(fs).getConf();
doReturn(null).when(fs).getRenewToken();
renewer.addRenewAction(fs);
verify(fs).getRenewToken();
assertEquals(0, renewer.getRenewQueueLength());
}
@Test
public void testGetNewTokenOnRenewFailure() throws IOException,
InterruptedException {
Text service = new Text("myservice");
Configuration conf = mock(Configuration.class);
final Token<?> token1 = mock(Token.class);
doReturn(service).when(token1).getService();
doThrow(new IOException("boom")).when(token1).renew(eq(conf));
final Token<?> token2 = mock(Token.class);
doReturn(service).when(token2).getService();
doAnswer(new Answer<Long>() {
public Long answer(InvocationOnMock invocation) {
return Time.now() + RENEW_CYCLE;
}
}).when(token2).renew(eq(conf));
RenewableFileSystem fs = mock(RenewableFileSystem.class);
doReturn(conf).when(fs).getConf();
doReturn(token1).doReturn(token2).when(fs).getRenewToken();
doReturn(token2).when(fs).getDelegationToken(null);
doAnswer(new Answer<Token<?>[]>() {
public Token<?>[] answer(InvocationOnMock invocation) {
return new Token<?>[]{token2};
}
}).when(fs).addDelegationTokens(null, null);
renewer.addRenewAction(fs);
assertEquals(1, renewer.getRenewQueueLength());
Thread.sleep(RENEW_CYCLE);
verify(fs).getRenewToken();
verify(token1, atLeast(1)).renew(eq(conf));
verify(token1, atMost(2)).renew(eq(conf));
verify(fs).addDelegationTokens(null, null);
verify(fs).setDelegationToken(eq(token2));
assertEquals(1, renewer.getRenewQueueLength());
renewer.removeRenewAction(fs);
verify(token2).cancel(eq(conf));
assertEquals(0, renewer.getRenewQueueLength());
}
@Test
public void testStopRenewalWhenFsGone() throws IOException,
InterruptedException {
Configuration conf = mock(Configuration.class);
Token<?> token = mock(Token.class);
doReturn(new Text("myservice")).when(token).getService();
doAnswer(new Answer<Long>() {
public Long answer(InvocationOnMock invocation) {
return Time.now() + RENEW_CYCLE;
}
}).when(token).renew(any(Configuration.class));
RenewableFileSystem fs = mock(RenewableFileSystem.class);
doReturn(conf).when(fs).getConf();
doReturn(token).when(fs).getRenewToken();
renewer.addRenewAction(fs);
assertEquals(1, renewer.getRenewQueueLength());
Thread.sleep(RENEW_CYCLE);
verify(token, atLeast(1)).renew(eq(conf));
verify(token, atMost(2)).renew(eq(conf));
// drop weak ref
fs = null;
System.gc(); System.gc(); System.gc();
// next renew should detect the fs as gone
Thread.sleep(RENEW_CYCLE);
verify(token, atLeast(1)).renew(eq(conf));
verify(token, atMost(2)).renew(eq(conf));
assertEquals(0, renewer.getRenewQueueLength());
}
@Test(timeout=4000)
public void testMultipleTokensDoNotDeadlock() throws IOException,
InterruptedException {
Configuration conf = mock(Configuration.class);
FileSystem fs = mock(FileSystem.class);
doReturn(conf).when(fs).getConf();
long distantFuture = Time.now() + 3600 * 1000; // 1h
Token<?> token1 = mock(Token.class);
doReturn(new Text("myservice1")).when(token1).getService();
doReturn(distantFuture).when(token1).renew(eq(conf));
Token<?> token2 = mock(Token.class);
doReturn(new Text("myservice2")).when(token2).getService();
doReturn(distantFuture).when(token2).renew(eq(conf));
RenewableFileSystem fs1 = mock(RenewableFileSystem.class);
doReturn(conf).when(fs1).getConf();
doReturn(token1).when(fs1).getRenewToken();
RenewableFileSystem fs2 = mock(RenewableFileSystem.class);
doReturn(conf).when(fs2).getConf();
doReturn(token2).when(fs2).getRenewToken();
renewer.addRenewAction(fs1);
renewer.addRenewAction(fs2);
assertEquals(2, renewer.getRenewQueueLength());
renewer.removeRenewAction(fs1);
assertEquals(1, renewer.getRenewQueueLength());
renewer.removeRenewAction(fs2);
assertEquals(0, renewer.getRenewQueueLength());
verify(token1).cancel(eq(conf));
verify(token2).cancel(eq(conf));
} }
} }