YARN-2893. AMLaucher: sporadic job failures due to EOFException in readTokenStorageStream. (Zhihai Xu via gera)

This commit is contained in:
Gera Shegalov 2015-05-01 14:49:09 -07:00
parent 6f541edce0
commit f8204e241d
5 changed files with 153 additions and 21 deletions

View File

@ -287,6 +287,9 @@ Release 2.8.0 - UNRELEASED
YARN-3564. Fix TestContainerAllocation.testAMContainerAllocationWhenDNSUnavailable
fails randomly. (Jian He via wangda)
YARN-2893. AMLaucher: sporadic job failures due to EOFException in
readTokenStorageStream. (Zhihai Xu via gera)
Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -281,29 +281,29 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
RMAppImpl application =
createAndPopulateNewRMApp(submissionContext, submitTime, user, false);
ApplicationId appId = submissionContext.getApplicationId();
if (UserGroupInformation.isSecurityEnabled()) {
try {
Credentials credentials = null;
try {
credentials = parseCredentials(submissionContext);
if (UserGroupInformation.isSecurityEnabled()) {
this.rmContext.getDelegationTokenRenewer().addApplicationAsync(appId,
parseCredentials(submissionContext),
submissionContext.getCancelTokensWhenComplete(),
credentials, submissionContext.getCancelTokensWhenComplete(),
application.getUser());
} catch (Exception e) {
LOG.warn("Unable to parse credentials.", e);
// Sending APP_REJECTED is fine, since we assume that the
// RMApp is in NEW state and thus we haven't yet informed the
// scheduler about the existence of the application
assert application.getState() == RMAppState.NEW;
} else {
// Dispatcher is not yet started at this time, so these START events
// enqueued should be guaranteed to be first processed when dispatcher
// gets started.
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppRejectedEvent(applicationId, e.getMessage()));
throw RPCUtil.getRemoteException(e);
.handle(new RMAppEvent(applicationId, RMAppEventType.START));
}
} else {
// Dispatcher is not yet started at this time, so these START events
// enqueued should be guaranteed to be first processed when dispatcher
// gets started.
} catch (Exception e) {
LOG.warn("Unable to parse credentials.", e);
// Sending APP_REJECTED is fine, since we assume that the
// RMApp is in NEW state and thus we haven't yet informed the
// scheduler about the existence of the application
assert application.getState() == RMAppState.NEW;
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.START));
.handle(new RMAppRejectedEvent(applicationId, e.getMessage()));
throw RPCUtil.getRemoteException(e);
}
}

View File

@ -28,6 +28,7 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
@ -200,7 +201,9 @@ public class AMLauncher implements Runnable {
return container;
}
private void setupTokens(
@Private
@VisibleForTesting
protected void setupTokens(
ContainerLaunchContext container, ContainerId containerID)
throws IOException {
Map<String, String> environment = container.getEnvironment();
@ -220,10 +223,12 @@ public class AMLauncher implements Runnable {
Credentials credentials = new Credentials();
DataInputByteBuffer dibb = new DataInputByteBuffer();
if (container.getTokens() != null) {
ByteBuffer tokens = container.getTokens();
if (tokens != null) {
// TODO: Don't do this kind of checks everywhere.
dibb.reset(container.getTokens());
dibb.reset(tokens);
credentials.readTokenStorageStream(dibb);
tokens.rewind();
}
// Add AMRMToken

View File

@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
@ -33,6 +35,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
@ -479,6 +482,63 @@ public class TestAppManager{
getAppEventType());
}
@Test
public void testRMAppSubmitWithInvalidTokens() throws Exception {
// Setup invalid security tokens
DataOutputBuffer dob = new DataOutputBuffer();
ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0,
dob.getLength());
asContext.getAMContainerSpec().setTokens(securityTokens);
try {
appMonitor.submitApplication(asContext, "test");
Assert.fail("Application submission should fail because" +
" Tokens are invalid.");
} catch (YarnException e) {
// Exception is expected
Assert.assertTrue("The thrown exception is not" +
" java.io.EOFException",
e.getMessage().contains("java.io.EOFException"));
}
int timeoutSecs = 0;
while ((getAppEventType() == RMAppEventType.KILL) &&
timeoutSecs++ < 20) {
Thread.sleep(1000);
}
Assert.assertEquals("app event type sent is wrong",
RMAppEventType.APP_REJECTED, getAppEventType());
asContext.getAMContainerSpec().setTokens(null);
}
@Test
public void testRMAppSubmitWithValidTokens() throws Exception {
// Setup valid security tokens
DataOutputBuffer dob = new DataOutputBuffer();
Credentials credentials = new Credentials();
credentials.writeTokenStorageToStream(dob);
ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0,
dob.getLength());
asContext.getAMContainerSpec().setTokens(securityTokens);
appMonitor.submitApplication(asContext, "test");
RMApp app = rmContext.getRMApps().get(appId);
Assert.assertNotNull("app is null", app);
Assert.assertEquals("app id doesn't match", appId,
app.getApplicationId());
Assert.assertEquals("app state doesn't match", RMAppState.NEW,
app.getState());
verify(metricsPublisher).appACLsUpdated(
any(RMApp.class), any(String.class), anyLong());
// wait for event to be processed
int timeoutSecs = 0;
while ((getAppEventType() == RMAppEventType.KILL) &&
timeoutSecs++ < 20) {
Thread.sleep(1000);
}
Assert.assertEquals("app event type sent is wrong", RMAppEventType.START,
getAppEventType());
asContext.getAMContainerSpec().setTokens(null);
}
@Test (timeout = 30000)
public void testRMAppSubmitMaxAppAttempts() throws Exception {
int[] globalMaxAppAttempts = new int[] { 10, 1 };

View File

@ -26,6 +26,9 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@ -38,6 +41,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.SerializedException;
@ -47,7 +51,10 @@ import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@ -238,4 +245,61 @@ public class TestApplicationMasterLauncher {
} catch (ApplicationAttemptNotFoundException e) {
}
}
@Test
public void testSetupTokens() throws Exception {
MockRM rm = new MockRM();
rm.start();
MockNM nm1 = rm.registerNode("h1:1234", 5000);
RMApp app = rm.submitApp(2000);
/// kick the scheduling
nm1.nodeHeartbeat(true);
RMAppAttempt attempt = app.getCurrentAppAttempt();
MyAMLauncher launcher = new MyAMLauncher(rm.getRMContext(),
attempt, AMLauncherEventType.LAUNCH, rm.getConfig());
DataOutputBuffer dob = new DataOutputBuffer();
Credentials ts = new Credentials();
ts.writeTokenStorageToStream(dob);
ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(),
0, dob.getLength());
ContainerLaunchContext amContainer =
ContainerLaunchContext.newInstance(null, null,
null, null, securityTokens, null);
ContainerId containerId = ContainerId.newContainerId(
attempt.getAppAttemptId(), 0L);
try {
launcher.setupTokens(amContainer, containerId);
} catch (Exception e) {
// ignore the first fake exception
}
try {
launcher.setupTokens(amContainer, containerId);
} catch (java.io.EOFException e) {
Assert.fail("EOFException should not happen.");
}
}
static class MyAMLauncher extends AMLauncher {
int count;
public MyAMLauncher(RMContext rmContext, RMAppAttempt application,
AMLauncherEventType eventType, Configuration conf) {
super(rmContext, application, eventType, conf);
count = 0;
}
protected org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>
createAndSetAMRMToken() {
count++;
if (count == 1) {
throw new RuntimeException("createAndSetAMRMToken failure");
}
return null;
}
protected void setupTokens(ContainerLaunchContext container,
ContainerId containerID) throws IOException {
super.setupTokens(container, containerID);
}
}
}