YARN-2010. Handle app-recovery failures gracefully. (Jian He and Karthik Kambatla via kasha)

(cherry picked from commit b2cd269802)
This commit is contained in:
Karthik Kambatla 2014-11-04 17:44:59 -08:00
parent bfcf0c83a2
commit 3e4b280de7
9 changed files with 196 additions and 73 deletions

View File

@ -829,6 +829,9 @@ Release 2.6.0 - UNRELEASED
YARN-2752. Made ContainerExecutor append "nice -n" arg only when priority
adjustment flag is set. (Xuan Gong via zjshen)
YARN-2010. Handle app-recovery failures gracefully.
(Jian He and Karthik Kambatla via kasha)
Release 2.5.2 - UNRELEASED

View File

@ -47,6 +47,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRecoverEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@ -274,12 +275,11 @@ protected void submitApplication(
ApplicationId appId = submissionContext.getApplicationId();
if (UserGroupInformation.isSecurityEnabled()) {
Credentials credentials = null;
try {
credentials = parseCredentials(submissionContext);
this.rmContext.getDelegationTokenRenewer().addApplicationAsync(appId,
credentials, submissionContext.getCancelTokensWhenComplete(),
application.getUser());
parseCredentials(submissionContext),
submissionContext.getCancelTokensWhenComplete(),
application.getUser());
} catch (Exception e) {
LOG.warn("Unable to parse credentials.", e);
// Sending APP_REJECTED is fine, since we assume that the
@ -299,10 +299,8 @@ protected void submitApplication(
}
}
@SuppressWarnings("unchecked")
protected void
recoverApplication(ApplicationState appState, RMState rmState)
throws Exception {
protected void recoverApplication(ApplicationState appState, RMState rmState)
throws Exception {
ApplicationSubmissionContext appContext =
appState.getApplicationSubmissionContext();
ApplicationId appId = appState.getAppId();
@ -311,33 +309,7 @@ protected void submitApplication(
RMAppImpl application =
createAndPopulateNewRMApp(appContext, appState.getSubmitTime(),
appState.getUser());
application.recover(rmState);
if (isApplicationInFinalState(appState.getState())) {
// We are synchronously moving the application into final state so that
// momentarily client will not see this application in NEW state. Also
// for finished applications we will avoid renewing tokens.
application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER));
return;
}
if (UserGroupInformation.isSecurityEnabled()) {
Credentials credentials = null;
try {
credentials = parseCredentials(appContext);
// synchronously renew delegation token on recovery.
rmContext.getDelegationTokenRenewer().addApplicationSync(appId,
credentials, appContext.getCancelTokensWhenComplete(),
application.getUser());
application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER));
} catch (Exception e) {
LOG.warn("Unable to parse and renew delegation tokens.", e);
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppRejectedEvent(appId, e.getMessage()));
throw e;
}
} else {
application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER));
}
application.handle(new RMAppRecoverEvent(appId, rmState));
}
private RMAppImpl createAndPopulateNewRMApp(
@ -416,18 +388,9 @@ private ResourceRequest validateAndCreateResourceRequest(
return null;
}
private boolean isApplicationInFinalState(RMAppState rmAppState) {
if (rmAppState == RMAppState.FINISHED || rmAppState == RMAppState.FAILED
|| rmAppState == RMAppState.KILLED) {
return true;
} else {
return false;
}
}
protected Credentials parseCredentials(ApplicationSubmissionContext application)
throws IOException {
protected Credentials parseCredentials(
ApplicationSubmissionContext application) throws IOException {
Credentials credentials = new Credentials();
DataInputByteBuffer dibb = new DataInputByteBuffer();
ByteBuffer tokens = application.getAMContainerSpec().getTokens();

View File

@ -18,8 +18,10 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
@ -36,6 +38,8 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -825,6 +829,15 @@ private static final class RMAppRecoveredTransition implements
@Override
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
RMAppRecoverEvent recoverEvent = (RMAppRecoverEvent) event;
try {
app.recover(recoverEvent.getRMState());
} catch (Exception e) {
String msg = app.applicationId + " failed to recover. " + e.getMessage();
failToRecoverApp(app, event, msg, e);
return RMAppState.FINAL_SAVING;
}
// The app has completed.
if (app.recoveredFinalState != null) {
app.recoverAppAttempts();
@ -832,6 +845,20 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
return app.recoveredFinalState;
}
if (UserGroupInformation.isSecurityEnabled()) {
// synchronously renew delegation token on recovery.
try {
app.rmContext.getDelegationTokenRenewer().addApplicationSync(
app.getApplicationId(), app.parseCredentials(),
app.submissionContext.getCancelTokensWhenComplete(), app.getUser());
} catch (Exception e) {
String msg = "Failed to renew delegation token on recovery for "
+ app.applicationId + e.getMessage();
failToRecoverApp(app, event, msg, e);
return RMAppState.FINAL_SAVING;
}
}
// No existent attempts means the attempt associated with this app was not
// started or started but not yet saved.
if (app.attempts.isEmpty()) {
@ -865,6 +892,14 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
// Thus we return ACCECPTED state on recovery.
return RMAppState.ACCEPTED;
}
private void failToRecoverApp(RMAppImpl app, RMAppEvent event, String msg,
Exception e) {
app.diagnostics.append(msg);
LOG.error(msg, e);
app.rememberTargetTransitionsAndStoreState(event, new FinalTransition(
RMAppState.FAILED), RMAppState.FAILED, RMAppState.FAILED);
}
}
private static final class AddApplicationToSchedulerTransition extends
@ -1296,4 +1331,16 @@ public void setSystemClock(Clock clock) {
public ReservationId getReservationId() {
return submissionContext.getReservationID();
}
protected Credentials parseCredentials() throws IOException {
Credentials credentials = new Credentials();
DataInputByteBuffer dibb = new DataInputByteBuffer();
ByteBuffer tokens = submissionContext.getAMContainerSpec().getTokens();
if (tokens != null) {
dibb.reset(tokens);
credentials.readTokenStorageStream(dibb);
tokens.rewind();
}
return credentials;
}
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
public class RMAppRecoverEvent extends RMAppEvent {
private final RMState state;
public RMAppRecoverEvent(ApplicationId appId, RMState state) {
super(appId, RMAppEventType.RECOVER);
this.state = state;
}
public RMState getRMState() {
return state;
}
}

View File

@ -833,8 +833,10 @@ private void recoverAppAttemptCredentials(Credentials appAttemptTokens,
if (UserGroupInformation.isSecurityEnabled()) {
byte[] clientTokenMasterKeyBytes = appAttemptTokens.getSecretKey(
RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME);
clientTokenMasterKey = rmContext.getClientToAMTokenSecretManager()
.registerMasterKey(applicationAttemptId, clientTokenMasterKeyBytes);
if (clientTokenMasterKeyBytes != null) {
clientTokenMasterKey = rmContext.getClientToAMTokenSecretManager()
.registerMasterKey(applicationAttemptId, clientTokenMasterKeyBytes);
}
}
this.amrmToken =

View File

@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@Private
public class QueueNotFoundException extends YarnRuntimeException {
private static final long serialVersionUID = 187239430L;
public QueueNotFoundException(String message) {
super(message);
}
}

View File

@ -80,6 +80,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueNotFoundException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping;
@ -676,15 +677,13 @@ private synchronized void addApplication(ApplicationId applicationId,
//During a restart, this indicates a queue was removed, which is
//not presently supported
if (isAppRecovering) {
//throwing RuntimeException because some other exceptions are caught
//(including YarnRuntimeException) and we want this to force an exit
String queueErrorMsg = "Queue named " + queueName
String queueErrorMsg = "Queue named " + queueName
+ " missing during application recovery."
+ " Queue removal during recovery is not presently supported by the"
+ " capacity scheduler, please restart with all queues configured"
+ " which were present before shutdown/restart.";
LOG.fatal(queueErrorMsg);
throw new RuntimeException(queueErrorMsg);
throw new QueueNotFoundException(queueErrorMsg);
}
String message = "Application " + applicationId +
" submitted by user " + user + " to unknown queue: " + queueName;

View File

@ -37,6 +37,7 @@
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -61,6 +62,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueNotFoundException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
@ -570,10 +572,10 @@ public void testCapacitySchedulerRecovery() throws Exception {
// submission
//2. Remove one of the queues, restart the RM
//3. Verify that the expected exception was thrown
@Test (timeout = 30000)
@Test (timeout = 30000, expected = QueueNotFoundException.class)
public void testCapacitySchedulerQueueRemovedRecovery() throws Exception {
if (!schedulerClass.equals(CapacityScheduler.class)) {
return;
throw new QueueNotFoundException("Dummy");
}
conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
@ -614,17 +616,7 @@ public void testCapacitySchedulerQueueRemovedRecovery() throws Exception {
new CapacitySchedulerConfiguration(conf);
setupQueueConfigurationOnlyA(csConf);
rm2 = new MockRM(csConf, memStore);
boolean runtimeThrown = false;
try {
rm2.start();
} catch (RuntimeException e) {
//we're catching it because we want to verify the message
//and we don't want to set it as an expected exception for the
//test because we only want it to happen here
assertTrue(e.getMessage().contains(B + " missing"));
runtimeThrown = true;
}
assertTrue(runtimeThrown);
rm2.start();
}
private void checkParentQueue(ParentQueue parentQueue, int numContainers,

View File

@ -28,6 +28,7 @@
import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
@ -35,6 +36,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
@ -43,6 +46,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
@ -73,9 +77,11 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -199,10 +205,11 @@ public void setUp() throws Exception {
AMLivelinessMonitor amFinishingMonitor = mock(AMLivelinessMonitor.class);
store = mock(RMStateStore.class);
writer = mock(RMApplicationHistoryWriter.class);
DelegationTokenRenewer renewer = mock(DelegationTokenRenewer.class);
RMContext realRMContext =
new RMContextImpl(rmDispatcher,
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
null, new AMRMTokenSecretManager(conf, this.rmContext),
renewer, new AMRMTokenSecretManager(conf, this.rmContext),
new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM(),
@ -387,8 +394,12 @@ protected RMApp testCreateAppSubmittedRecovery(
ApplicationSubmissionContext submissionContext) throws IOException {
RMApp application = createNewTestApp(submissionContext);
// NEW => SUBMITTED event RMAppEventType.RECOVER
RMState state = new RMState();
ApplicationState appState = new ApplicationState(123, 123, null, "user");
state.getApplicationState().put(application.getApplicationId(), appState);
RMAppEvent event =
new RMAppEvent(application.getApplicationId(), RMAppEventType.RECOVER);
new RMAppRecoverEvent(application.getApplicationId(), state);
application.handle(event);
assertStartTimeSet(application);
assertAppState(RMAppState.SUBMITTED, application);
@ -514,7 +525,46 @@ public void testAppSuccessPath() throws IOException {
@Test (timeout = 30000)
public void testAppRecoverPath() throws IOException {
LOG.info("--- START: testAppRecoverPath ---");
testCreateAppSubmittedRecovery(null);
ApplicationSubmissionContext sub =
Records.newRecord(ApplicationSubmissionContext.class);
ContainerLaunchContext clc =
Records.newRecord(ContainerLaunchContext.class);
Credentials credentials = new Credentials();
DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob);
ByteBuffer securityTokens =
ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
clc.setTokens(securityTokens);
sub.setAMContainerSpec(clc);
testCreateAppSubmittedRecovery(sub);
}
@Test (timeout = 30000)
public void testAppRecoverToFailed() throws IOException {
LOG.info("--- START: testAppRecoverToFailed ---");
ApplicationSubmissionContext sub =
Records.newRecord(ApplicationSubmissionContext.class);
ContainerLaunchContext clc =
Records.newRecord(ContainerLaunchContext.class);
Credentials credentials = new Credentials();
DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob);
ByteBuffer securityTokens =
ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
clc.setTokens(securityTokens);
sub.setAMContainerSpec(clc);
RMApp application = createNewTestApp(sub);
// NEW => FINAL_SAVING, event RMAppEventType.RECOVER
RMState state = new RMState();
RMAppEvent event =
new RMAppRecoverEvent(application.getApplicationId(), state);
// NPE will throw on recovery.
application.handle(event);
assertAppState(RMAppState.FINAL_SAVING, application);
sendAppUpdateSavedEvent(application);
rmDispatcher.await();
assertAppState(RMAppState.FAILED, application);
}
@Test (timeout = 30000)
@ -917,7 +967,6 @@ public void testAppsRecoveringStates() throws Exception {
}
}
@SuppressWarnings("deprecation")
public void testRecoverApplication(ApplicationState appState, RMState rmState)
throws Exception {
ApplicationSubmissionContext submissionContext =
@ -932,15 +981,15 @@ public void testRecoverApplication(ApplicationState appState, RMState rmState)
RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
submissionContext.getResource(), 1));
Assert.assertEquals(RMAppState.NEW, application.getState());
application.recover(rmState);
RMAppEvent recoverEvent =
new RMAppRecoverEvent(application.getApplicationId(), rmState);
// Trigger RECOVER event.
application.handle(recoverEvent);
// Application final status looked from recoveredFinalStatus
Assert.assertTrue("Application is not in recoveredFinalStatus.",
RMAppImpl.isAppInFinalState(application));
// Trigger RECOVER event.
application.handle(new RMAppEvent(appState.getAppId(),
RMAppEventType.RECOVER));
rmDispatcher.await();
RMAppState finalState = appState.getState();
Assert.assertEquals("Application is not in finalState.", finalState,