YARN-360. Allow apps to concurrently register tokens for renewal. Contributed by Daryn Sharp.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1442441 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Siddharth Seth 2013-02-05 00:26:16 +00:00
parent ef2ff99d36
commit 5d679c4f43
4 changed files with 67 additions and 1 deletions

View File

@ -107,6 +107,9 @@ Release 2.0.3-alpha - Unreleased
YARN-277. Use AMRMClient in DistributedShell to exemplify the approach.
(Bikas Saha via hitesh)
YARN-360. Allow apps to concurrently register tokens for renewal.
(Daryn Sharp via sseth)
OPTIMIZATIONS
BUG FIXES

View File

@ -167,6 +167,11 @@
<Field name="minimumAllocation" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer"/>
<Field name="renewalTimer" />
<Bug code="IS"/>
</Match>
<!-- Don't care if putIfAbsent value is ignored -->
<Match>

View File

@ -261,7 +261,7 @@ private void addTokenToList(DelegationTokenToRenew t) {
* done else false.
* @throws IOException
*/
public synchronized void addApplication(
public void addApplication(
ApplicationId applicationId, Credentials ts, boolean shouldCancelAtEnd)
throws IOException {
if (ts == null) {

View File

@ -21,11 +21,17 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -50,6 +56,8 @@
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
/**
* unit test -
@ -541,4 +549,54 @@ public void testDTKeepAlive2() throws Exception {
fail("Renewal of cancelled token should have failed");
} catch (InvalidToken ite) {}
}
@Test(timeout=2000)
public void testConncurrentAddApplication()
throws IOException, InterruptedException, BrokenBarrierException {
final CyclicBarrier startBarrier = new CyclicBarrier(2);
final CyclicBarrier endBarrier = new CyclicBarrier(2);
// this token uses barriers to block during renew
final Credentials creds1 = new Credentials();
final Token<?> token1 = mock(Token.class);
creds1.addToken(new Text("token"), token1);
doReturn(true).when(token1).isManaged();
doAnswer(new Answer<Long>() {
public Long answer(InvocationOnMock invocation)
throws InterruptedException, BrokenBarrierException {
startBarrier.await();
endBarrier.await();
return Long.MAX_VALUE;
}}).when(token1).renew(any(Configuration.class));
// this dummy token fakes renewing
final Credentials creds2 = new Credentials();
final Token<?> token2 = mock(Token.class);
creds2.addToken(new Text("token"), token2);
doReturn(true).when(token2).isManaged();
doReturn(Long.MAX_VALUE).when(token2).renew(any(Configuration.class));
// fire up the renewer
final DelegationTokenRenewer dtr = new DelegationTokenRenewer();
dtr.init(conf);
dtr.start();
// submit a job that blocks during renewal
Thread submitThread = new Thread() {
@Override
public void run() {
try {
dtr.addApplication(mock(ApplicationId.class), creds1, false);
} catch (IOException e) {}
}
};
submitThread.start();
// wait till 1st submit blocks, then submit another
startBarrier.await();
dtr.addApplication(mock(ApplicationId.class), creds2, false);
// signal 1st to complete
endBarrier.await();
submitThread.join();
}
}