YARN-6555. Store application flow context in NM state store for work-preserving restart. (Rohith Sharma K S via Haibo Chen)

This commit is contained in:
Haibo Chen 2017-05-25 21:15:27 -07:00
parent 2b5ad48762
commit 47474fffac
4 changed files with 111 additions and 34 deletions

View File

@ -85,6 +85,7 @@ import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.FlowContextProto;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ContainerType;
@ -381,10 +382,20 @@ public class ContainerManagerImpl extends CompositeService implements
new LogAggregationContextPBImpl(p.getLogAggregationContext());
}
FlowContext fc = null;
if (p.getFlowContext() != null) {
FlowContextProto fcp = p.getFlowContext();
fc = new FlowContext(fcp.getFlowName(), fcp.getFlowVersion(),
fcp.getFlowRunId());
if (LOG.isDebugEnabled()) {
LOG.debug(
"Recovering Flow context: " + fc + " for an application " + appId);
}
}
LOG.info("Recovering application " + appId);
//TODO: Recover flow and flow run ID
ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), appId,
creds, context, p.getAppLogAggregationInitedTime());
ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), fc,
appId, creds, context, p.getAppLogAggregationInitedTime());
context.getApplications().put(appId, app);
app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext));
}
@ -936,7 +947,7 @@ public class ContainerManagerImpl extends CompositeService implements
private ContainerManagerApplicationProto buildAppProto(ApplicationId appId,
String user, Credentials credentials,
Map<ApplicationAccessType, String> appAcls,
LogAggregationContext logAggregationContext) {
LogAggregationContext logAggregationContext, FlowContext flowContext) {
ContainerManagerApplicationProto.Builder builder =
ContainerManagerApplicationProto.newBuilder();
@ -971,6 +982,16 @@ public class ContainerManagerImpl extends CompositeService implements
}
}
builder.clearFlowContext();
if (flowContext != null && flowContext.getFlowName() != null
&& flowContext.getFlowVersion() != null) {
FlowContextProto fcp =
FlowContextProto.newBuilder().setFlowName(flowContext.getFlowName())
.setFlowVersion(flowContext.getFlowVersion())
.setFlowRunId(flowContext.getFlowRunId()).build();
builder.setFlowContext(fcp);
}
return builder.build();
}
@ -1016,25 +1037,29 @@ public class ContainerManagerImpl extends CompositeService implements
this.readLock.lock();
try {
if (!isServiceStopped()) {
// Create the application
// populate the flow context from the launch context if the timeline
// service v.2 is enabled
FlowContext flowContext = null;
if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
String flowName = launchContext.getEnvironment().get(
TimelineUtils.FLOW_NAME_TAG_PREFIX);
String flowVersion = launchContext.getEnvironment().get(
TimelineUtils.FLOW_VERSION_TAG_PREFIX);
String flowRunIdStr = launchContext.getEnvironment().get(
TimelineUtils.FLOW_RUN_ID_TAG_PREFIX);
long flowRunId = 0L;
if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) {
flowRunId = Long.parseLong(flowRunIdStr);
}
flowContext =
new FlowContext(flowName, flowVersion, flowRunId);
}
if (!context.getApplications().containsKey(applicationID)) {
// Create the application
// populate the flow context from the launch context if the timeline
// service v.2 is enabled
FlowContext flowContext = null;
if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
String flowName = launchContext.getEnvironment()
.get(TimelineUtils.FLOW_NAME_TAG_PREFIX);
String flowVersion = launchContext.getEnvironment()
.get(TimelineUtils.FLOW_VERSION_TAG_PREFIX);
String flowRunIdStr = launchContext.getEnvironment()
.get(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX);
long flowRunId = 0L;
if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) {
flowRunId = Long.parseLong(flowRunIdStr);
}
flowContext = new FlowContext(flowName, flowVersion, flowRunId);
if (LOG.isDebugEnabled()) {
LOG.debug("Flow context: " + flowContext
+ " created for an application " + applicationID);
}
}
Application application =
new ApplicationImpl(dispatcher, user, flowContext,
applicationID, credentials, context);
@ -1048,7 +1073,7 @@ public class ContainerManagerImpl extends CompositeService implements
container.getLaunchContext().getApplicationACLs();
context.getNMStateStore().storeApplication(applicationID,
buildAppProto(applicationID, user, credentials, appAcls,
logAggregationContext));
logAggregationContext, flowContext));
dispatcher.getEventHandler().handle(new ApplicationInitEvent(
applicationID, appAcls, logAggregationContext));
}

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.FlowContextProto;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
@ -105,13 +106,6 @@ public class ApplicationImpl implements Application {
this(dispatcher, user, null, appId, credentials, context, -1L);
}
public ApplicationImpl(Dispatcher dispatcher, String user,
ApplicationId appId, Credentials credentials, Context context,
long recoveredLogInitedTime) {
this(dispatcher, user, null, appId, credentials, context,
recoveredLogInitedTime);
}
public ApplicationImpl(Dispatcher dispatcher, String user,
FlowContext flowContext, ApplicationId appId, Credentials credentials,
Context context, long recoveredLogInitedTime) {
@ -171,6 +165,15 @@ public class ApplicationImpl implements Application {
public long getFlowRunId() {
return flowRunId;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder("{");
sb.append("Flow Name=").append(getFlowName());
sb.append(" Flow Versioin=").append(getFlowVersion());
sb.append(" Flow Run Id=").append(getFlowRunId()).append(" }");
return sb.toString();
}
}
@Override
@ -390,6 +393,16 @@ public class ApplicationImpl implements Application {
builder.setAppLogAggregationInitedTime(app.applicationLogInitedTimestamp);
builder.clearFlowContext();
if (app.flowContext != null && app.flowContext.getFlowName() != null
&& app.flowContext.getFlowVersion() != null) {
FlowContextProto fcp = FlowContextProto.newBuilder()
.setFlowName(app.flowContext.getFlowName())
.setFlowVersion(app.flowContext.getFlowVersion())
.setFlowRunId(app.flowContext.getFlowRunId()).build();
builder.setFlowContext(fcp);
}
return builder.build();
}

View File

@ -31,6 +31,7 @@ message ContainerManagerApplicationProto {
repeated ApplicationACLMapProto acls = 4;
optional LogAggregationContextProto log_aggregation_context = 5;
optional int64 appLogAggregationInitedTime = 6 [ default = -1 ];
optional FlowContextProto flowContext = 7;
}
message DeletionServiceDeleteTaskProto {
@ -52,3 +53,9 @@ message LogDeleterProto {
optional string user = 1;
optional int64 deletionTime = 2;
}
message FlowContextProto {
optional string flowName = 1;
optional string flowVersion = 2;
optional int64 flowRunId = 3;
}

View File

@ -107,6 +107,7 @@ import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecret
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.junit.Before;
import org.junit.Test;
@ -136,6 +137,11 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogDir.getAbsolutePath());
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
// enable atsv2 by default in test
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
// Default delSrvc
delSrvc = createDeletionService();
delSrvc.init(conf);
@ -144,6 +150,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
nodeHealthChecker = new NodeHealthCheckerService(
NodeManager.getNodeHealthScriptRunner(conf), dirsHandler);
nodeHealthChecker.init(conf);
}
@Test
@ -161,6 +168,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
cm.start();
// add an application by starting a container
String appName = "app_name1";
String appUser = "app_user1";
String modUser = "modify_user1";
String viewUser = "view_user1";
@ -170,7 +178,8 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
ApplicationAttemptId.newInstance(appId, 1);
ContainerId cid = ContainerId.newContainerId(attemptId, 1);
Map<String, LocalResource> localResources = Collections.emptyMap();
Map<String, String> containerEnv = Collections.emptyMap();
Map<String, String> containerEnv = new HashMap<>();
setFlowContext(containerEnv, appName, appId);
List<String> containerCmds = Collections.emptyList();
Map<String, ByteBuffer> serviceData = Collections.emptyMap();
Credentials containerCreds = new Credentials();
@ -318,7 +327,8 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
ApplicationAttemptId.newInstance(appId, 1);
ContainerId cid = ContainerId.newContainerId(attemptId, 1);
Map<String, LocalResource> localResources = Collections.emptyMap();
Map<String, String> containerEnv = Collections.emptyMap();
Map<String, String> containerEnv = new HashMap<>();
setFlowContext(containerEnv, "app_name1", appId);
List<String> containerCmds = Collections.emptyList();
Map<String, ByteBuffer> serviceData = Collections.emptyMap();
@ -400,7 +410,8 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
ApplicationAttemptId attemptId =
ApplicationAttemptId.newInstance(appId, 1);
ContainerId cid = ContainerId.newContainerId(attemptId, 1);
Map<String, String> containerEnv = Collections.emptyMap();
Map<String, String> containerEnv = new HashMap<>();
setFlowContext(containerEnv, "app_name1", appId);
Map<String, ByteBuffer> serviceData = Collections.emptyMap();
Credentials containerCreds = new Credentials();
DataOutputBuffer dob = new DataOutputBuffer();
@ -476,7 +487,8 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
ApplicationAttemptId.newInstance(appId, 1);
ContainerId cid = ContainerId.newContainerId(attemptId, 1);
Map<String, LocalResource> localResources = Collections.emptyMap();
Map<String, String> containerEnv = Collections.emptyMap();
Map<String, String> containerEnv = new HashMap<>();
setFlowContext(containerEnv, "app_name1", appId);
List<String> containerCmds = Collections.emptyList();
Map<String, ByteBuffer> serviceData = Collections.emptyMap();
Credentials containerCreds = new Credentials();
@ -760,4 +772,24 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
containerManager.dispatcher.disableExitOnDispatchException();
return containerManager;
}
private void setFlowContext(Map<String, String> containerEnv, String appName,
ApplicationId appId) {
if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
setFlowTags(containerEnv, TimelineUtils.FLOW_NAME_TAG_PREFIX,
TimelineUtils.generateDefaultFlowName(appName, appId));
setFlowTags(containerEnv, TimelineUtils.FLOW_VERSION_TAG_PREFIX,
TimelineUtils.DEFAULT_FLOW_VERSION);
setFlowTags(containerEnv, TimelineUtils.FLOW_RUN_ID_TAG_PREFIX,
String.valueOf(System.currentTimeMillis()));
}
}
private static void setFlowTags(Map<String, String> environment,
String tagPrefix, String value) {
if (!value.isEmpty()) {
environment.put(tagPrefix, value);
}
}
}