From f8204e241d9271497defd4d42646fb89c61cefe3 Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Fri, 1 May 2015 14:49:09 -0700 Subject: [PATCH] YARN-2893. AMLaucher: sporadic job failures due to EOFException in readTokenStorageStream. (Zhihai Xu via gera) --- hadoop-yarn-project/CHANGES.txt | 3 + .../server/resourcemanager/RMAppManager.java | 36 +++++------ .../amlauncher/AMLauncher.java | 11 +++- .../resourcemanager/TestAppManager.java | 60 +++++++++++++++++ .../TestApplicationMasterLauncher.java | 64 +++++++++++++++++++ 5 files changed, 153 insertions(+), 21 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 6f382017ffa..c110f88b28b 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -287,6 +287,9 @@ Release 2.8.0 - UNRELEASED YARN-3564. Fix TestContainerAllocation.testAMContainerAllocationWhenDNSUnavailable fails randomly. (Jian He via wangda) + YARN-2893. AMLaucher: sporadic job failures due to EOFException in + readTokenStorageStream. (Zhihai Xu via gera) + Release 2.7.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index e511ff0489a..ca21f113f52 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -281,29 +281,29 @@ public class RMAppManager implements EventHandler, RMAppImpl application = createAndPopulateNewRMApp(submissionContext, submitTime, user, false); ApplicationId appId = submissionContext.getApplicationId(); - - if (UserGroupInformation.isSecurityEnabled()) { - try { + Credentials credentials = null; + try { + credentials = parseCredentials(submissionContext); + if (UserGroupInformation.isSecurityEnabled()) { this.rmContext.getDelegationTokenRenewer().addApplicationAsync(appId, - parseCredentials(submissionContext), - submissionContext.getCancelTokensWhenComplete(), + credentials, submissionContext.getCancelTokensWhenComplete(), application.getUser()); - } catch (Exception e) { - LOG.warn("Unable to parse credentials.", e); - // Sending APP_REJECTED is fine, since we assume that the - // RMApp is in NEW state and thus we haven't yet informed the - // scheduler about the existence of the application - assert application.getState() == RMAppState.NEW; + } else { + // Dispatcher is not yet started at this time, so these START events + // enqueued should be guaranteed to be first processed when dispatcher + // gets started. this.rmContext.getDispatcher().getEventHandler() - .handle(new RMAppRejectedEvent(applicationId, e.getMessage())); - throw RPCUtil.getRemoteException(e); + .handle(new RMAppEvent(applicationId, RMAppEventType.START)); } - } else { - // Dispatcher is not yet started at this time, so these START events - // enqueued should be guaranteed to be first processed when dispatcher - // gets started. + } catch (Exception e) { + LOG.warn("Unable to parse credentials.", e); + // Sending APP_REJECTED is fine, since we assume that the + // RMApp is in NEW state and thus we haven't yet informed the + // scheduler about the existence of the application + assert application.getState() == RMAppState.NEW; this.rmContext.getDispatcher().getEventHandler() - .handle(new RMAppEvent(applicationId, RMAppEventType.START)); + .handle(new RMAppRejectedEvent(applicationId, e.getMessage())); + throw RPCUtil.getRemoteException(e); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java index 0dd9ba15d24..b44d13bd588 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java @@ -28,6 +28,7 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.DataOutputBuffer; @@ -200,7 +201,9 @@ public class AMLauncher implements Runnable { return container; } - private void setupTokens( + @Private + @VisibleForTesting + protected void setupTokens( ContainerLaunchContext container, ContainerId containerID) throws IOException { Map environment = container.getEnvironment(); @@ -220,10 +223,12 @@ public class AMLauncher implements Runnable { Credentials credentials = new Credentials(); DataInputByteBuffer dibb = new DataInputByteBuffer(); - if (container.getTokens() != null) { + ByteBuffer tokens = container.getTokens(); + if (tokens != null) { // TODO: Don't do this kind of checks everywhere. - dibb.reset(container.getTokens()); + dibb.reset(tokens); credentials.readTokenStorageStream(dibb); + tokens.rewind(); } // Add AMRMToken diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java index 5ebc68ca18f..3db8b7c3394 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java @@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; @@ -33,6 +35,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.List; import java.util.concurrent.ConcurrentMap; @@ -479,6 +482,63 @@ public class TestAppManager{ getAppEventType()); } + @Test + public void testRMAppSubmitWithInvalidTokens() throws Exception { + // Setup invalid security tokens + DataOutputBuffer dob = new DataOutputBuffer(); + ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, + dob.getLength()); + asContext.getAMContainerSpec().setTokens(securityTokens); + try { + appMonitor.submitApplication(asContext, "test"); + Assert.fail("Application submission should fail because" + + " Tokens are invalid."); + } catch (YarnException e) { + // Exception is expected + Assert.assertTrue("The thrown exception is not" + + " java.io.EOFException", + e.getMessage().contains("java.io.EOFException")); + } + int timeoutSecs = 0; + while ((getAppEventType() == RMAppEventType.KILL) && + timeoutSecs++ < 20) { + Thread.sleep(1000); + } + Assert.assertEquals("app event type sent is wrong", + RMAppEventType.APP_REJECTED, getAppEventType()); + asContext.getAMContainerSpec().setTokens(null); + } + + @Test + public void testRMAppSubmitWithValidTokens() throws Exception { + // Setup valid security tokens + DataOutputBuffer dob = new DataOutputBuffer(); + Credentials credentials = new Credentials(); + credentials.writeTokenStorageToStream(dob); + ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, + dob.getLength()); + asContext.getAMContainerSpec().setTokens(securityTokens); + appMonitor.submitApplication(asContext, "test"); + RMApp app = rmContext.getRMApps().get(appId); + Assert.assertNotNull("app is null", app); + Assert.assertEquals("app id doesn't match", appId, + app.getApplicationId()); + Assert.assertEquals("app state doesn't match", RMAppState.NEW, + app.getState()); + verify(metricsPublisher).appACLsUpdated( + any(RMApp.class), any(String.class), anyLong()); + + // wait for event to be processed + int timeoutSecs = 0; + while ((getAppEventType() == RMAppEventType.KILL) && + timeoutSecs++ < 20) { + Thread.sleep(1000); + } + Assert.assertEquals("app event type sent is wrong", RMAppEventType.START, + getAppEventType()); + asContext.getAMContainerSpec().setTokens(null); + } + @Test (timeout = 30000) public void testRMAppSubmitMaxAppAttempts() throws Exception { int[] globalMaxAppAttempts = new int[] { 10, 1 }; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java index 11cd1fd61a3..9a4395e20fe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java @@ -26,6 +26,9 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; @@ -38,6 +41,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.SerializedException; @@ -47,7 +51,10 @@ import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher; +import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; @@ -238,4 +245,61 @@ public class TestApplicationMasterLauncher { } catch (ApplicationAttemptNotFoundException e) { } } + + @Test + public void testSetupTokens() throws Exception { + MockRM rm = new MockRM(); + rm.start(); + MockNM nm1 = rm.registerNode("h1:1234", 5000); + RMApp app = rm.submitApp(2000); + /// kick the scheduling + nm1.nodeHeartbeat(true); + RMAppAttempt attempt = app.getCurrentAppAttempt(); + MyAMLauncher launcher = new MyAMLauncher(rm.getRMContext(), + attempt, AMLauncherEventType.LAUNCH, rm.getConfig()); + DataOutputBuffer dob = new DataOutputBuffer(); + Credentials ts = new Credentials(); + ts.writeTokenStorageToStream(dob); + ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), + 0, dob.getLength()); + ContainerLaunchContext amContainer = + ContainerLaunchContext.newInstance(null, null, + null, null, securityTokens, null); + ContainerId containerId = ContainerId.newContainerId( + attempt.getAppAttemptId(), 0L); + + try { + launcher.setupTokens(amContainer, containerId); + } catch (Exception e) { + // ignore the first fake exception + } + try { + launcher.setupTokens(amContainer, containerId); + } catch (java.io.EOFException e) { + Assert.fail("EOFException should not happen."); + } + } + + static class MyAMLauncher extends AMLauncher { + int count; + public MyAMLauncher(RMContext rmContext, RMAppAttempt application, + AMLauncherEventType eventType, Configuration conf) { + super(rmContext, application, eventType, conf); + count = 0; + } + + protected org.apache.hadoop.security.token.Token + createAndSetAMRMToken() { + count++; + if (count == 1) { + throw new RuntimeException("createAndSetAMRMToken failure"); + } + return null; + } + + protected void setupTokens(ContainerLaunchContext container, + ContainerId containerID) throws IOException { + super.setupTokens(container, containerID); + } + } }