YARN-1866. Fixed an issue with renewal of RM-delegation tokens on restart or fail-over. Contributed by Jian He.
svn merge --ignore-ancestry -c 1581448 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1581450 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2cc8d2d65b
commit
dd8a9bbadf
|
@ -556,6 +556,9 @@ Release 2.4.0 - UNRELEASED
|
||||||
YARN-1852. Fixed RMAppAttempt to not resend AttemptFailed/AttemptKilled
|
YARN-1852. Fixed RMAppAttempt to not resend AttemptFailed/AttemptKilled
|
||||||
events to already recovered Failed/Killed RMApps. (Rohith via jianhe)
|
events to already recovered Failed/Killed RMApps. (Rohith via jianhe)
|
||||||
|
|
||||||
|
YARN-1866. Fixed an issue with renewal of RM-delegation tokens on restart or
|
||||||
|
fail-over. (Jian He via vinodkv)
|
||||||
|
|
||||||
Release 2.3.1 - UNRELEASED
|
Release 2.3.1 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -112,6 +112,8 @@ public class DelegationTokenRenewer extends AbstractService {
|
||||||
this.tokenRemovalDelayMs =
|
this.tokenRemovalDelayMs =
|
||||||
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
|
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
|
||||||
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
|
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
|
||||||
|
|
||||||
|
setLocalSecretManagerAndServiceAddr();
|
||||||
renewerService = createNewThreadPoolService(conf);
|
renewerService = createNewThreadPoolService(conf);
|
||||||
pendingEventQueue = new LinkedBlockingQueue<DelegationTokenRenewerEvent>();
|
pendingEventQueue = new LinkedBlockingQueue<DelegationTokenRenewerEvent>();
|
||||||
renewalTimer = new Timer(true);
|
renewalTimer = new Timer(true);
|
||||||
|
@ -134,6 +136,13 @@ public class DelegationTokenRenewer extends AbstractService {
|
||||||
return pool;
|
return pool;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// enable RM to short-circuit token operations directly to itself
|
||||||
|
private void setLocalSecretManagerAndServiceAddr() {
|
||||||
|
RMDelegationTokenIdentifier.Renewer.setSecretManager(rmContext
|
||||||
|
.getRMDelegationTokenSecretManager(), rmContext.getClientRMService()
|
||||||
|
.getBindAddress());
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void serviceStart() throws Exception {
|
protected void serviceStart() throws Exception {
|
||||||
dtCancelThread.start();
|
dtCancelThread.start();
|
||||||
|
@ -143,10 +152,8 @@ public class DelegationTokenRenewer extends AbstractService {
|
||||||
"DelayedTokenCanceller");
|
"DelayedTokenCanceller");
|
||||||
delayedRemovalThread.start();
|
delayedRemovalThread.start();
|
||||||
}
|
}
|
||||||
// enable RM to short-circuit token operations directly to itself
|
|
||||||
RMDelegationTokenIdentifier.Renewer.setSecretManager(
|
setLocalSecretManagerAndServiceAddr();
|
||||||
rmContext.getRMDelegationTokenSecretManager(),
|
|
||||||
rmContext.getClientRMService().getBindAddress());
|
|
||||||
serviceStateLock.writeLock().lock();
|
serviceStateLock.writeLock().lock();
|
||||||
isServiceStarted = true;
|
isServiceStarted = true;
|
||||||
serviceStateLock.writeLock().unlock();
|
serviceStateLock.writeLock().unlock();
|
||||||
|
|
|
@ -394,7 +394,7 @@ public class TestRMRestart {
|
||||||
Assert.assertEquals(4, rmAppState.size());
|
Assert.assertEquals(4, rmAppState.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test (timeout = 60000)
|
||||||
public void testRMRestartAppRunningAMFailed() throws Exception {
|
public void testRMRestartAppRunningAMFailed() throws Exception {
|
||||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
|
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
|
||||||
|
@ -440,7 +440,7 @@ public class TestRMRestart {
|
||||||
rm2.stop();
|
rm2.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test (timeout = 60000)
|
||||||
public void testRMRestartWaitForPreviousAMToFinish() throws Exception {
|
public void testRMRestartWaitForPreviousAMToFinish() throws Exception {
|
||||||
// testing 3 cases
|
// testing 3 cases
|
||||||
// After RM restarts
|
// After RM restarts
|
||||||
|
@ -607,7 +607,7 @@ public class TestRMRestart {
|
||||||
// store but before the RMAppAttempt notifies RMApp that it has succeeded. On
|
// store but before the RMAppAttempt notifies RMApp that it has succeeded. On
|
||||||
// recovery, RMAppAttempt should send the AttemptFinished event to RMApp so
|
// recovery, RMAppAttempt should send the AttemptFinished event to RMApp so
|
||||||
// that RMApp can recover its state.
|
// that RMApp can recover its state.
|
||||||
@Test
|
@Test (timeout = 60000)
|
||||||
public void testRMRestartWaitForPreviousSucceededAttempt() throws Exception {
|
public void testRMRestartWaitForPreviousSucceededAttempt() throws Exception {
|
||||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
||||||
MemoryRMStateStore memStore = new MemoryRMStateStore() {
|
MemoryRMStateStore memStore = new MemoryRMStateStore() {
|
||||||
|
@ -660,7 +660,7 @@ public class TestRMRestart {
|
||||||
rmAppState.get(app0.getApplicationId()).getState());
|
rmAppState.get(app0.getApplicationId()).getState());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test (timeout = 60000)
|
||||||
public void testRMRestartFailedApp() throws Exception {
|
public void testRMRestartFailedApp() throws Exception {
|
||||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
||||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||||
|
@ -709,7 +709,7 @@ public class TestRMRestart {
|
||||||
rm2.stop();
|
rm2.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test (timeout = 60000)
|
||||||
public void testRMRestartKilledApp() throws Exception{
|
public void testRMRestartKilledApp() throws Exception{
|
||||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
|
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
|
||||||
|
@ -757,7 +757,7 @@ public class TestRMRestart {
|
||||||
rm2.stop();
|
rm2.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test (timeout = 60000)
|
||||||
public void testRMRestartKilledAppWithNoAttempts() throws Exception {
|
public void testRMRestartKilledAppWithNoAttempts() throws Exception {
|
||||||
MemoryRMStateStore memStore = new MemoryRMStateStore() {
|
MemoryRMStateStore memStore = new MemoryRMStateStore() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -797,7 +797,7 @@ public class TestRMRestart {
|
||||||
Assert.assertTrue(loadedApp0.getAppAttempts().size() == 0);
|
Assert.assertTrue(loadedApp0.getAppAttempts().size() == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test (timeout = 60000)
|
||||||
public void testRMRestartSucceededApp() throws Exception {
|
public void testRMRestartSucceededApp() throws Exception {
|
||||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
|
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
|
||||||
|
@ -849,7 +849,7 @@ public class TestRMRestart {
|
||||||
rm2.stop();
|
rm2.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test (timeout = 60000)
|
||||||
public void testRMRestartGetApplicationList() throws Exception {
|
public void testRMRestartGetApplicationList() throws Exception {
|
||||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
||||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||||
|
@ -997,7 +997,7 @@ public class TestRMRestart {
|
||||||
appState.getAttempt(am.getApplicationAttemptId()).getState());
|
appState.getAttempt(am.getApplicationAttemptId()).getState());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test (timeout = 60000)
|
||||||
public void testRMRestartOnMaxAppAttempts() throws Exception {
|
public void testRMRestartOnMaxAppAttempts() throws Exception {
|
||||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
|
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
|
||||||
|
@ -1071,7 +1071,7 @@ public class TestRMRestart {
|
||||||
rm2.stop();
|
rm2.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test (timeout = 60000)
|
||||||
public void testDelegationTokenRestoredInDelegationTokenRenewer()
|
public void testDelegationTokenRestoredInDelegationTokenRenewer()
|
||||||
throws Exception {
|
throws Exception {
|
||||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
||||||
|
@ -1171,7 +1171,7 @@ public class TestRMRestart {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test (timeout = 60000)
|
||||||
public void testAppAttemptTokensRestoredOnRMRestart() throws Exception {
|
public void testAppAttemptTokensRestoredOnRMRestart() throws Exception {
|
||||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
||||||
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||||
|
@ -1261,7 +1261,7 @@ public class TestRMRestart {
|
||||||
rm2.stop();
|
rm2.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test (timeout = 60000)
|
||||||
public void testRMDelegationTokenRestoredOnRMRestart() throws Exception {
|
public void testRMDelegationTokenRestoredOnRMRestart() throws Exception {
|
||||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
||||||
conf.set(
|
conf.set(
|
||||||
|
@ -1414,7 +1414,7 @@ public class TestRMRestart {
|
||||||
|
|
||||||
// This is to test submit an application to the new RM with the old delegation
|
// This is to test submit an application to the new RM with the old delegation
|
||||||
// token got from previous RM.
|
// token got from previous RM.
|
||||||
@Test
|
@Test (timeout = 60000)
|
||||||
public void testAppSubmissionWithOldDelegationTokenAfterRMRestart()
|
public void testAppSubmissionWithOldDelegationTokenAfterRMRestart()
|
||||||
throws Exception {
|
throws Exception {
|
||||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
||||||
|
@ -1449,7 +1449,7 @@ public class TestRMRestart {
|
||||||
rm2.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
|
rm2.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test (timeout = 60000)
|
||||||
public void testRMStateStoreDispatcherDrainedOnRMStop() throws Exception {
|
public void testRMStateStoreDispatcherDrainedOnRMStop() throws Exception {
|
||||||
MemoryRMStateStore memStore = new MemoryRMStateStore() {
|
MemoryRMStateStore memStore = new MemoryRMStateStore() {
|
||||||
volatile boolean wait = true;
|
volatile boolean wait = true;
|
||||||
|
@ -1508,7 +1508,7 @@ public class TestRMRestart {
|
||||||
Assert.assertTrue(rmAppState.size() == NUM_APPS);
|
Assert.assertTrue(rmAppState.size() == NUM_APPS);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test (timeout = 60000)
|
||||||
public void testFinishedAppRemovalAfterRMRestart() throws Exception {
|
public void testFinishedAppRemovalAfterRMRestart() throws Exception {
|
||||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||||
conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 1);
|
conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 1);
|
||||||
|
@ -1580,7 +1580,7 @@ public class TestRMRestart {
|
||||||
// This is to test Killing application should be able to wait until app
|
// This is to test Killing application should be able to wait until app
|
||||||
// reaches killed state and also check that attempt state is saved before app
|
// reaches killed state and also check that attempt state is saved before app
|
||||||
// state is saved.
|
// state is saved.
|
||||||
@Test
|
@Test (timeout = 60000)
|
||||||
public void testClientRetryOnKillingApplication() throws Exception {
|
public void testClientRetryOnKillingApplication() throws Exception {
|
||||||
MemoryRMStateStore memStore = new TestMemoryRMStateStore();
|
MemoryRMStateStore memStore = new TestMemoryRMStateStore();
|
||||||
memStore.init(conf);
|
memStore.init(conf);
|
||||||
|
@ -1738,7 +1738,7 @@ public class TestRMRestart {
|
||||||
appsCompleted + appsCompletedCarryOn);
|
appsCompleted + appsCompletedCarryOn);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test (timeout = 60000)
|
||||||
public void testDecomissionedNMsMetricsOnRMRestart() throws Exception {
|
public void testDecomissionedNMsMetricsOnRMRestart() throws Exception {
|
||||||
YarnConfiguration conf = new YarnConfiguration();
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
|
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
|
||||||
|
@ -1875,6 +1875,13 @@ public class TestRMRestart {
|
||||||
super(conf, store);
|
super(conf, store);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init(Configuration conf) {
|
||||||
|
// reset localServiceAddress.
|
||||||
|
RMDelegationTokenIdentifier.Renewer.setSecretManager(null, null);
|
||||||
|
super.init(conf);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected ClientRMService createClientRMService() {
|
protected ClientRMService createClientRMService() {
|
||||||
return new ClientRMService(getRMContext(), getResourceScheduler(),
|
return new ClientRMService(getRMContext(), getResourceScheduler(),
|
||||||
|
|
|
@ -43,8 +43,6 @@ import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.junit.Assert;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -82,6 +80,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -179,7 +178,6 @@ public class TestDelegationTokenRenewer {
|
||||||
dispatcher = new AsyncDispatcher(eventQueue);
|
dispatcher = new AsyncDispatcher(eventQueue);
|
||||||
Renewer.reset();
|
Renewer.reset();
|
||||||
delegationTokenRenewer = createNewDelegationTokenRenewer(conf, counter);
|
delegationTokenRenewer = createNewDelegationTokenRenewer(conf, counter);
|
||||||
delegationTokenRenewer.init(conf);
|
|
||||||
RMContext mockContext = mock(RMContext.class);
|
RMContext mockContext = mock(RMContext.class);
|
||||||
ClientRMService mockClientRMService = mock(ClientRMService.class);
|
ClientRMService mockClientRMService = mock(ClientRMService.class);
|
||||||
when(mockContext.getDelegationTokenRenewer()).thenReturn(
|
when(mockContext.getDelegationTokenRenewer()).thenReturn(
|
||||||
|
@ -190,6 +188,7 @@ public class TestDelegationTokenRenewer {
|
||||||
InetSocketAddress.createUnresolved("localhost", 1234);
|
InetSocketAddress.createUnresolved("localhost", 1234);
|
||||||
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
|
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
|
||||||
delegationTokenRenewer.setRMContext(mockContext);
|
delegationTokenRenewer.setRMContext(mockContext);
|
||||||
|
delegationTokenRenewer.init(conf);
|
||||||
delegationTokenRenewer.start();
|
delegationTokenRenewer.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -515,7 +514,6 @@ public class TestDelegationTokenRenewer {
|
||||||
1000l);
|
1000l);
|
||||||
DelegationTokenRenewer localDtr =
|
DelegationTokenRenewer localDtr =
|
||||||
createNewDelegationTokenRenewer(lconf, counter);
|
createNewDelegationTokenRenewer(lconf, counter);
|
||||||
localDtr.init(lconf);
|
|
||||||
RMContext mockContext = mock(RMContext.class);
|
RMContext mockContext = mock(RMContext.class);
|
||||||
ClientRMService mockClientRMService = mock(ClientRMService.class);
|
ClientRMService mockClientRMService = mock(ClientRMService.class);
|
||||||
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
|
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
|
||||||
|
@ -526,6 +524,7 @@ public class TestDelegationTokenRenewer {
|
||||||
InetSocketAddress.createUnresolved("localhost", 1234);
|
InetSocketAddress.createUnresolved("localhost", 1234);
|
||||||
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
|
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
|
||||||
localDtr.setRMContext(mockContext);
|
localDtr.setRMContext(mockContext);
|
||||||
|
localDtr.init(lconf);
|
||||||
localDtr.start();
|
localDtr.start();
|
||||||
|
|
||||||
MyFS dfs = (MyFS)FileSystem.get(lconf);
|
MyFS dfs = (MyFS)FileSystem.get(lconf);
|
||||||
|
@ -592,7 +591,6 @@ public class TestDelegationTokenRenewer {
|
||||||
1000l);
|
1000l);
|
||||||
DelegationTokenRenewer localDtr =
|
DelegationTokenRenewer localDtr =
|
||||||
createNewDelegationTokenRenewer(conf, counter);
|
createNewDelegationTokenRenewer(conf, counter);
|
||||||
localDtr.init(lconf);
|
|
||||||
RMContext mockContext = mock(RMContext.class);
|
RMContext mockContext = mock(RMContext.class);
|
||||||
ClientRMService mockClientRMService = mock(ClientRMService.class);
|
ClientRMService mockClientRMService = mock(ClientRMService.class);
|
||||||
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
|
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
|
||||||
|
@ -603,6 +601,7 @@ public class TestDelegationTokenRenewer {
|
||||||
InetSocketAddress.createUnresolved("localhost", 1234);
|
InetSocketAddress.createUnresolved("localhost", 1234);
|
||||||
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
|
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
|
||||||
localDtr.setRMContext(mockContext);
|
localDtr.setRMContext(mockContext);
|
||||||
|
localDtr.init(lconf);
|
||||||
localDtr.start();
|
localDtr.start();
|
||||||
|
|
||||||
MyFS dfs = (MyFS)FileSystem.get(lconf);
|
MyFS dfs = (MyFS)FileSystem.get(lconf);
|
||||||
|
@ -704,7 +703,6 @@ public class TestDelegationTokenRenewer {
|
||||||
// fire up the renewer
|
// fire up the renewer
|
||||||
final DelegationTokenRenewer dtr =
|
final DelegationTokenRenewer dtr =
|
||||||
createNewDelegationTokenRenewer(conf, counter);
|
createNewDelegationTokenRenewer(conf, counter);
|
||||||
dtr.init(conf);
|
|
||||||
RMContext mockContext = mock(RMContext.class);
|
RMContext mockContext = mock(RMContext.class);
|
||||||
ClientRMService mockClientRMService = mock(ClientRMService.class);
|
ClientRMService mockClientRMService = mock(ClientRMService.class);
|
||||||
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
|
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
|
||||||
|
@ -713,6 +711,7 @@ public class TestDelegationTokenRenewer {
|
||||||
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
|
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
|
||||||
dtr.setRMContext(mockContext);
|
dtr.setRMContext(mockContext);
|
||||||
when(mockContext.getDelegationTokenRenewer()).thenReturn(dtr);
|
when(mockContext.getDelegationTokenRenewer()).thenReturn(dtr);
|
||||||
|
dtr.init(conf);
|
||||||
dtr.start();
|
dtr.start();
|
||||||
// submit a job that blocks during renewal
|
// submit a job that blocks during renewal
|
||||||
Thread submitThread = new Thread() {
|
Thread submitThread = new Thread() {
|
||||||
|
|
|
@ -19,7 +19,12 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.security;
|
package org.apache.hadoop.yarn.server.resourcemanager.security;
|
||||||
|
|
||||||
|
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -32,9 +37,13 @@ public class TestDelegationTokenRenewerLifecycle {
|
||||||
@Test
|
@Test
|
||||||
public void testStartupFailure() throws Exception {
|
public void testStartupFailure() throws Exception {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
DelegationTokenRenewer delegationTokenRenewer = new DelegationTokenRenewer();
|
DelegationTokenRenewer delegationTokenRenewer =
|
||||||
|
new DelegationTokenRenewer();
|
||||||
|
RMContext mockContext = mock(RMContext.class);
|
||||||
|
ClientRMService mockClientRMService = mock(ClientRMService.class);
|
||||||
|
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
|
||||||
|
delegationTokenRenewer.setRMContext(mockContext);
|
||||||
delegationTokenRenewer.init(conf);
|
delegationTokenRenewer.init(conf);
|
||||||
delegationTokenRenewer.stop();
|
delegationTokenRenewer.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue