From 9b4ead92c811b02bdfc62acf00fc364533eecab8 Mon Sep 17 00:00:00 2001 From: Sunil G Date: Wed, 11 Jul 2018 12:26:32 +0530 Subject: [PATCH] YARN-8512. ATSv2 entities are not published to HBase from second attempt onwards. Contributed by Rohith Sharma K S. (cherry picked from commit 7f1d3d0e9dbe328fae0d43421665e0b6907b33fe) --- .../ContainerManagerImpl.java | 69 +++++++++--- .../application/ApplicationImpl.java | 7 +- .../BaseContainerManagerTest.java | 25 +++++ .../TestContainerManagerRecovery.java | 106 ++++++++++++++++-- 4 files changed, 180 insertions(+), 27 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 34709104264..ad63720d9fb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -1102,24 +1102,8 @@ public class ContainerManagerImpl extends CompositeService implements // Create the application // populate the flow context from the launch context if the timeline // service v.2 is enabled - FlowContext flowContext = null; - if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) { - String flowName = launchContext.getEnvironment() - .get(TimelineUtils.FLOW_NAME_TAG_PREFIX); - String flowVersion = launchContext.getEnvironment() - .get(TimelineUtils.FLOW_VERSION_TAG_PREFIX); - String flowRunIdStr = launchContext.getEnvironment() - .get(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX); - long flowRunId = 0L; - if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) { - flowRunId = Long.parseLong(flowRunIdStr); - } - flowContext = new FlowContext(flowName, flowVersion, flowRunId); - if (LOG.isDebugEnabled()) { - LOG.debug("Flow context: " + flowContext - + " created for an application " + applicationID); - } - } + FlowContext flowContext = + getFlowContext(launchContext, applicationID); Application application = new ApplicationImpl(dispatcher, user, flowContext, @@ -1138,6 +1122,31 @@ public class ContainerManagerImpl extends CompositeService implements dispatcher.getEventHandler().handle(new ApplicationInitEvent( applicationID, appAcls, logAggregationContext)); } + } else if (containerTokenIdentifier.getContainerType() + == ContainerType.APPLICATION_MASTER) { + FlowContext flowContext = + getFlowContext(launchContext, applicationID); + if (flowContext != null) { + ApplicationImpl application = + (ApplicationImpl) context.getApplications().get(applicationID); + + // update flowContext reference in ApplicationImpl + application.setFlowContext(flowContext); + + // Required to update state store for recovery. + context.getNMStateStore().storeApplication(applicationID, + buildAppProto(applicationID, user, credentials, + container.getLaunchContext().getApplicationACLs(), + containerTokenIdentifier.getLogAggregationContext(), + flowContext)); + + LOG.info( + "Updated application reference with flowContext " + flowContext + + " for app " + applicationID); + } else { + LOG.info("TimelineService V2.0 is not enabled. Skipping updating " + + "flowContext for application " + applicationID); + } } this.context.getNMStateStore().storeContainer(containerId, @@ -1163,6 +1172,30 @@ public class ContainerManagerImpl extends CompositeService implements } } + private FlowContext getFlowContext(ContainerLaunchContext launchContext, + ApplicationId applicationID) { + FlowContext flowContext = null; + if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) { + String flowName = launchContext.getEnvironment() + .get(TimelineUtils.FLOW_NAME_TAG_PREFIX); + String flowVersion = launchContext.getEnvironment() + .get(TimelineUtils.FLOW_VERSION_TAG_PREFIX); + String flowRunIdStr = launchContext.getEnvironment() + .get(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX); + long flowRunId = 0L; + if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) { + flowRunId = Long.parseLong(flowRunIdStr); + } + flowContext = new FlowContext(flowName, flowVersion, flowRunId); + if (LOG.isDebugEnabled()) { + LOG.debug( + "Flow context: " + flowContext + " created for an application " + + applicationID); + } + } + return flowContext; + } + protected ContainerTokenIdentifier verifyAndGetContainerTokenIdentifier( org.apache.hadoop.yarn.api.records.Token token, ContainerTokenIdentifier containerTokenIdentifier) throws YarnException, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index 6d84fb29076..ad995fb3a28 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -25,6 +25,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; + +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,7 +68,6 @@ import org.apache.hadoop.yarn.state.MultipleArcTransition; import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; -import com.google.common.annotations.VisibleForTesting; /** * The state machine for the representation of an Application @@ -688,4 +689,8 @@ public class ApplicationImpl implements Application { public long getFlowRunId() { return flowContext == null ? 0L : flowContext.getFlowRunId(); } + + public void setFlowContext(FlowContext fc) { + this.flowContext = fc; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index 93d0afb1185..b31601c8c96 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -428,6 +428,16 @@ public abstract class BaseContainerManagerTest { containerTokenSecretManager, logAggregationContext); } + public static Token createContainerToken(ContainerId cId, long rmIdentifier, + NodeId nodeId, String user, + NMContainerTokenSecretManager containerTokenSecretManager, + LogAggregationContext logAggregationContext, ContainerType containerType) + throws IOException { + Resource r = BuilderUtils.newResource(1024, 1); + return createContainerToken(cId, rmIdentifier, nodeId, user, r, + containerTokenSecretManager, logAggregationContext, containerType); + } + public static Token createContainerToken(ContainerId cId, long rmIdentifier, NodeId nodeId, String user, Resource resource, NMContainerTokenSecretManager containerTokenSecretManager, @@ -442,6 +452,21 @@ public abstract class BaseContainerManagerTest { containerTokenIdentifier); } + public static Token createContainerToken(ContainerId cId, long rmIdentifier, + NodeId nodeId, String user, Resource resource, + NMContainerTokenSecretManager containerTokenSecretManager, + LogAggregationContext logAggregationContext, ContainerType continerType) + throws IOException { + ContainerTokenIdentifier containerTokenIdentifier = + new ContainerTokenIdentifier(cId, nodeId.toString(), user, resource, + System.currentTimeMillis() + 100000L, 123, rmIdentifier, + Priority.newInstance(0), 0, logAggregationContext, null, + continerType); + return BuilderUtils.newContainerToken(nodeId, + containerTokenSecretManager.retrievePassword(containerTokenIdentifier), + containerTokenIdentifier); + } + public static Token createContainerToken(ContainerId cId, int version, long rmIdentifier, NodeId nodeId, String user, Resource resource, NMContainerTokenSecretManager containerTokenSecretManager, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java index bf8b500b87f..0a834af84a2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.mock; @@ -74,6 +75,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.NMTokenIdentifier; +import org.apache.hadoop.yarn.server.api.ContainerType; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; @@ -205,7 +207,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest { "includePatternInRollingAggregation", "excludePatternInRollingAggregation"); StartContainersResponse startResponse = startContainer(context, cm, cid, - clc, logAggregationContext); + clc, logAggregationContext, ContainerType.TASK); assertTrue(startResponse.getFailedRequests().isEmpty()); assertEquals(1, context.getApplications().size()); Application app = context.getApplications().get(appId); @@ -342,7 +344,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest { null, null); StartContainersResponse startResponse = startContainer(context, cm, cid, - clc, null); + clc, null, ContainerType.TASK); assertTrue(startResponse.getFailedRequests().isEmpty()); assertEquals(1, context.getApplications().size()); Application app = context.getApplications().get(appId); @@ -579,7 +581,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest { cm.init(conf); cm.start(); StartContainersResponse startResponse = startContainer(context, cm, cid, - clc, logAggregationContext); + clc, logAggregationContext, ContainerType.TASK); assertEquals(1, startResponse.getSuccessfullyStartedContainers().size()); cm.stop(); verify(cm).handle(isA(CMgrCompletedAppsEvent.class)); @@ -595,7 +597,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest { cm.init(conf); cm.start(); startResponse = startContainer(context, cm, cid, - clc, logAggregationContext); + clc, logAggregationContext, ContainerType.TASK); assertEquals(1, startResponse.getSuccessfullyStartedContainers().size()); cm.stop(); memStore.close(); @@ -612,7 +614,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest { cm.init(conf); cm.start(); startResponse = startContainer(context, cm, cid, - clc, logAggregationContext); + clc, logAggregationContext, ContainerType.TASK); assertEquals(1, startResponse.getSuccessfullyStartedContainers().size()); cm.stop(); memStore.close(); @@ -661,7 +663,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest { localResources, containerEnv, commands, serviceData, containerTokens, acls); StartContainersResponse startResponse = startContainer( - context, cm, cid, clc, null); + context, cm, cid, clc, null, ContainerType.TASK); assertTrue(startResponse.getFailedRequests().isEmpty()); assertEquals(1, context.getApplications().size()); // make sure the container reaches RUNNING state @@ -736,14 +738,15 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest { private StartContainersResponse startContainer(Context context, final ContainerManagerImpl cm, ContainerId cid, - ContainerLaunchContext clc, LogAggregationContext logAggregationContext) + ContainerLaunchContext clc, LogAggregationContext logAggregationContext, + ContainerType containerType) throws Exception { UserGroupInformation user = UserGroupInformation.createRemoteUser( cid.getApplicationAttemptId().toString()); StartContainerRequest scReq = StartContainerRequest.newInstance( clc, TestContainerManager.createContainerToken(cid, 0, context.getNodeId(), user.getShortUserName(), - context.getContainerTokenSecretManager(), logAggregationContext)); + context.getContainerTokenSecretManager(), logAggregationContext, containerType)); final List scReqList = new ArrayList(); scReqList.add(scReq); @@ -910,4 +913,91 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest { } } + @Test + public void testApplicationRecoveryAfterFlowContextUpdated() + throws Exception { + conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); + conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true); + conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true); + conf.set(YarnConfiguration.YARN_ADMIN_ACL, "yarn_admin_user"); + NMStateStoreService stateStore = new NMMemoryStateStoreService(); + stateStore.init(conf); + stateStore.start(); + Context context = createContext(conf, stateStore); + ContainerManagerImpl cm = createContainerManager(context); + cm.init(conf); + cm.start(); + + // add an application by starting a container + String appName = "app_name1"; + ApplicationId appId = ApplicationId.newInstance(0, 1); + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); + + // create 1nd attempt container with containerId 2 + ContainerId cid = ContainerId.newContainerId(attemptId, 2); + Map localResources = Collections.emptyMap(); + Map containerEnv = new HashMap<>(); + + List containerCmds = Collections.emptyList(); + Map serviceData = Collections.emptyMap(); + Credentials containerCreds = new Credentials(); + DataOutputBuffer dob = new DataOutputBuffer(); + containerCreds.writeTokenStorageToStream(dob); + ByteBuffer containerTokens = + ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + Map acls = + new HashMap(); + ContainerLaunchContext clc = ContainerLaunchContext + .newInstance(localResources, containerEnv, containerCmds, serviceData, + containerTokens, acls); + // create the logAggregationContext + LogAggregationContext logAggregationContext = LogAggregationContext + .newInstance("includePattern", "excludePattern", + "includePatternInRollingAggregation", + "excludePatternInRollingAggregation"); + + StartContainersResponse startResponse = + startContainer(context, cm, cid, clc, logAggregationContext, + ContainerType.TASK); + assertTrue(startResponse.getFailedRequests().isEmpty()); + assertEquals(1, context.getApplications().size()); + ApplicationImpl app = + (ApplicationImpl) context.getApplications().get(appId); + assertNotNull(app); + waitForAppState(app, ApplicationState.INITING); + assertNull(app.getFlowName()); + + // 2nd attempt + ApplicationAttemptId attemptId2 = + ApplicationAttemptId.newInstance(appId, 2); + // create 2nd attempt master container + ContainerId cid2 = ContainerId.newContainerId(attemptId, 1); + setFlowContext(containerEnv, appName, appId); + // once again create for updating launch context + clc = ContainerLaunchContext + .newInstance(localResources, containerEnv, containerCmds, serviceData, + containerTokens, acls); + // start container with container type AM. + startResponse = + startContainer(context, cm, cid2, clc, logAggregationContext, + ContainerType.APPLICATION_MASTER); + assertTrue(startResponse.getFailedRequests().isEmpty()); + assertEquals(1, context.getApplications().size()); + waitForAppState(app, ApplicationState.INITING); + assertEquals(appName, app.getFlowName()); + + // reset container manager and verify flow context information + cm.stop(); + context = createContext(conf, stateStore); + cm = createContainerManager(context); + cm.init(conf); + cm.start(); + assertEquals(1, context.getApplications().size()); + app = (ApplicationImpl) context.getApplications().get(appId); + assertNotNull(app); + assertEquals(appName, app.getFlowName()); + waitForAppState(app, ApplicationState.INITING); + + cm.stop(); + } }