YARN-2834. Fixed ResourceManager to ignore token-renewal failures on recovery consistent with the (somewhat incorrect) behaviour in the non-recovery case. Contributed by Jian He.

(cherry picked from commit e76faebc95)
This commit is contained in:
Vinod Kumar Vavilapalli 2014-11-09 18:56:06 -08:00
parent cff08dab5e
commit d76fc94b21
5 changed files with 70 additions and 60 deletions

View File

@ -898,6 +898,10 @@ Release 2.6.0 - 2014-11-15
YARN-2830. Add backwords compatible ContainerId.newInstance constructor. YARN-2830. Add backwords compatible ContainerId.newInstance constructor.
(jeagles via acmurthy) (jeagles via acmurthy)
YARN-2834. Fixed ResourceManager to ignore token-renewal failures on recovery
consistent with the (somewhat incorrect) behaviour in the non-recovery case.
(Jian He via vinodkv)
Release 2.5.2 - UNRELEASED Release 2.5.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -714,7 +714,7 @@ public class RMAppImpl implements RMApp, Recoverable {
} }
@Override @Override
public void recover(RMState state) throws Exception{ public void recover(RMState state) {
ApplicationState appState = state.getApplicationState().get(getApplicationId()); ApplicationState appState = state.getApplicationState().get(getApplicationId());
this.recoveredFinalState = appState.getState(); this.recoveredFinalState = appState.getState();
LOG.info("Recovering app: " + getApplicationId() + " with " + LOG.info("Recovering app: " + getApplicationId() + " with " +
@ -830,14 +830,7 @@ public class RMAppImpl implements RMApp, Recoverable {
public RMAppState transition(RMAppImpl app, RMAppEvent event) { public RMAppState transition(RMAppImpl app, RMAppEvent event) {
RMAppRecoverEvent recoverEvent = (RMAppRecoverEvent) event; RMAppRecoverEvent recoverEvent = (RMAppRecoverEvent) event;
try { app.recover(recoverEvent.getRMState());
app.recover(recoverEvent.getRMState());
} catch (Exception e) {
String msg = app.applicationId + " failed to recover. " + e.getMessage();
failToRecoverApp(app, event, msg, e);
return RMAppState.FINAL_SAVING;
}
// The app has completed. // The app has completed.
if (app.recoveredFinalState != null) { if (app.recoveredFinalState != null) {
app.recoverAppAttempts(); app.recoverAppAttempts();
@ -852,10 +845,10 @@ public class RMAppImpl implements RMApp, Recoverable {
app.getApplicationId(), app.parseCredentials(), app.getApplicationId(), app.parseCredentials(),
app.submissionContext.getCancelTokensWhenComplete(), app.getUser()); app.submissionContext.getCancelTokensWhenComplete(), app.getUser());
} catch (Exception e) { } catch (Exception e) {
String msg = "Failed to renew delegation token on recovery for " String msg = "Failed to renew token for " + app.applicationId
+ app.applicationId + e.getMessage(); + " on recovery : " + e.getMessage();
failToRecoverApp(app, event, msg, e); app.diagnostics.append(msg);
return RMAppState.FINAL_SAVING; LOG.error(msg, e);
} }
} }
@ -892,14 +885,6 @@ public class RMAppImpl implements RMApp, Recoverable {
// Thus we return ACCECPTED state on recovery. // Thus we return ACCECPTED state on recovery.
return RMAppState.ACCEPTED; return RMAppState.ACCEPTED;
} }
private void failToRecoverApp(RMAppImpl app, RMAppEvent event, String msg,
Exception e) {
app.diagnostics.append(msg);
LOG.error(msg, e);
app.rememberTargetTransitionsAndStoreState(event, new FinalTransition(
RMAppState.FAILED), RMAppState.FAILED, RMAppState.FAILED);
}
} }
private static final class AddApplicationToSchedulerTransition extends private static final class AddApplicationToSchedulerTransition extends

View File

@ -789,7 +789,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
} }
@Override @Override
public void recover(RMState state) throws Exception { public void recover(RMState state) {
ApplicationState appState = ApplicationState appState =
state.getApplicationState().get(getAppAttemptId().getApplicationId()); state.getApplicationState().get(getAppAttemptId().getApplicationId());
ApplicationAttemptState attemptState = ApplicationAttemptState attemptState =
@ -823,7 +823,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
} }
private void recoverAppAttemptCredentials(Credentials appAttemptTokens, private void recoverAppAttemptCredentials(Credentials appAttemptTokens,
RMAppAttemptState state) throws IOException { RMAppAttemptState state) {
if (appAttemptTokens == null || state == RMAppAttemptState.FAILED if (appAttemptTokens == null || state == RMAppAttemptState.FAILED
|| state == RMAppAttemptState.FINISHED || state == RMAppAttemptState.FINISHED
|| state == RMAppAttemptState.KILLED) { || state == RMAppAttemptState.KILLED) {

View File

@ -18,15 +18,15 @@
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -35,9 +35,10 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -50,9 +51,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@ -71,9 +70,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
@ -1011,4 +1014,50 @@ public class TestWorkPreservingRMRestart {
am0.unregisterAppAttempt(false); am0.unregisterAppAttempt(false);
} }
@Test (timeout = 30000)
public void testAppFailedToRenewTokenOnRecovery() throws Exception {
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
UserGroupInformation.setConfiguration(conf);
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
MockRM rm1 = new TestSecurityMockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
nm1.registerNode();
RMApp app1 = rm1.submitApp(200);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
MockRM rm2 = new TestSecurityMockRM(conf, memStore) {
protected DelegationTokenRenewer createDelegationTokenRenewer() {
return new DelegationTokenRenewer() {
@Override
public void addApplicationSync(ApplicationId applicationId,
Credentials ts, boolean shouldCancelAtEnd, String user)
throws IOException {
throw new IOException("Token renew failed !!");
}
};
}
};
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
rm2.start();
NMContainerStatus containerStatus =
TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 1,
ContainerState.RUNNING);
nm1.registerNode(Arrays.asList(containerStatus), null);
// am re-register
rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
am1.registerAppAttempt(true);
rm2.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
// Because the token expired, am could crash.
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
rm2.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
rm2.waitForState(app1.getApplicationId(), RMAppState.FAILED);
}
} }

View File

@ -539,34 +539,6 @@ public class TestRMAppTransitions {
testCreateAppSubmittedRecovery(sub); testCreateAppSubmittedRecovery(sub);
} }
@Test (timeout = 30000)
public void testAppRecoverToFailed() throws IOException {
LOG.info("--- START: testAppRecoverToFailed ---");
ApplicationSubmissionContext sub =
Records.newRecord(ApplicationSubmissionContext.class);
ContainerLaunchContext clc =
Records.newRecord(ContainerLaunchContext.class);
Credentials credentials = new Credentials();
DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob);
ByteBuffer securityTokens =
ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
clc.setTokens(securityTokens);
sub.setAMContainerSpec(clc);
RMApp application = createNewTestApp(sub);
// NEW => FINAL_SAVING, event RMAppEventType.RECOVER
RMState state = new RMState();
RMAppEvent event =
new RMAppRecoverEvent(application.getApplicationId(), state);
// NPE will throw on recovery.
application.handle(event);
assertAppState(RMAppState.FINAL_SAVING, application);
sendAppUpdateSavedEvent(application);
rmDispatcher.await();
assertAppState(RMAppState.FAILED, application);
}
@Test (timeout = 30000) @Test (timeout = 30000)
public void testAppNewKill() throws IOException { public void testAppNewKill() throws IOException {
LOG.info("--- START: testAppNewKill ---"); LOG.info("--- START: testAppNewKill ---");