YARN-7371. Added allocateRequestId in NMContainerStatus for recovery. Contributed by Chandni Singh

This commit is contained in:
Jian He 2017-11-04 23:15:21 -07:00
parent a127f7b0fb
commit a55d0738f1
16 changed files with 177 additions and 61 deletions

View File

@ -131,12 +131,16 @@ public class ServiceTestUtils {
return fs;
}
protected MiniYARNCluster getYarnCluster() {
return yarnCluster;
}
protected void setupInternal(int numNodeManager)
throws Exception {
LOG.info("Starting up YARN cluster");
// Logger rootLogger = LogManager.getRootLogger();
// rootLogger.setLevel(Level.DEBUG);
setConf(new YarnConfiguration());
if (conf == null) {
setConf(new YarnConfiguration());
}
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128);
// reduce the teardown waiting time
conf.setLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, 1000);

View File

@ -18,12 +18,13 @@
package org.apache.hadoop.yarn.service;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.Component;
@ -41,17 +42,12 @@ import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.*;
import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.yarn.api.records.YarnApplicationState.FINISHED;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.*;
/**
* End to end tests to test deploying services with MiniYarnCluster and a in-JVM
@ -67,7 +63,8 @@ public class TestYarnNativeServices extends ServiceTestUtils {
@Before
public void setup() throws Exception {
setupInternal(NUM_NMS);
File tmpYarnDir = new File("target", "tmp");
FileUtils.deleteQuietly(tmpYarnDir);
}
@After
@ -84,6 +81,7 @@ public class TestYarnNativeServices extends ServiceTestUtils {
// 6. Destroy the service
@Test (timeout = 200000)
public void testCreateFlexStopDestroyService() throws Exception {
setupInternal(NUM_NMS);
ServiceClient client = createClient();
Service exampleApp = createExampleApplication();
client.actionCreate(exampleApp);
@ -135,6 +133,7 @@ public class TestYarnNativeServices extends ServiceTestUtils {
// Check containers for compa started before containers for compb
@Test (timeout = 200000)
public void testComponentStartOrder() throws Exception {
setupInternal(NUM_NMS);
ServiceClient client = createClient();
Service exampleApp = new Service();
exampleApp.setName("teststartorder");
@ -155,6 +154,70 @@ public class TestYarnNativeServices extends ServiceTestUtils {
client.actionDestroy(exampleApp.getName());
}
// Test to verify recovery of SeviceMaster after RM is restarted.
// 1. Create an example service.
// 2. Restart RM.
// 3. Fail the application attempt.
// 4. Verify ServiceMaster recovers.
@Test(timeout = 200000)
public void testRecoverComponentsAfterRMRestart() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.setBoolean(
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true);
conf.setLong(YarnConfiguration.NM_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
500L);
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC, true);
setConf(conf);
setupInternal(NUM_NMS);
ServiceClient client = createClient();
Service exampleApp = createExampleApplication();
client.actionCreate(exampleApp);
waitForAllCompToBeReady(client, exampleApp);
LOG.info("Restart the resource manager");
getYarnCluster().restartResourceManager(
getYarnCluster().getActiveRMIndex());
GenericTestUtils.waitFor(() ->
getYarnCluster().getResourceManager().getServiceState() ==
org.apache.hadoop.service.Service.STATE.STARTED, 2000, 200000);
Assert.assertTrue("node managers connected",
getYarnCluster().waitForNodeManagersToConnect(5000));
ApplicationId exampleAppId = ApplicationId.fromString(exampleApp.getId());
ApplicationAttemptId applicationAttemptId = client.getYarnClient()
.getApplicationReport(exampleAppId).getCurrentApplicationAttemptId();
Multimap<String, String> containersBeforeFailure = getContainersForAllComp(
client, exampleApp);
LOG.info("Fail the application attempt {}", applicationAttemptId);
client.getYarnClient().failApplicationAttempt(applicationAttemptId);
//wait until attempt 2 is running
GenericTestUtils.waitFor(() -> {
try {
ApplicationReport ar = client.getYarnClient()
.getApplicationReport(exampleAppId);
return ar.getCurrentApplicationAttemptId().getAttemptId() == 2 &&
ar.getYarnApplicationState() == YarnApplicationState.RUNNING;
} catch (YarnException | IOException e) {
throw new RuntimeException("while waiting", e);
}
}, 2000, 200000);
Multimap<String, String> containersAfterFailure = getContainersForAllComp(
client, exampleApp);
Assert.assertEquals("component container affected by restart",
containersBeforeFailure, containersAfterFailure);
LOG.info("Stop/destroy service {}", exampleApp);
client.actionStop(exampleApp.getName(), true);
client.actionDestroy(exampleApp.getName());
}
// Check containers launched are in dependency order
// Get all containers into a list and sort based on container launch time e.g.
// compa-c1, compa-c2, compb-c1, compb-c2;
@ -297,6 +360,24 @@ public class TestYarnNativeServices extends ServiceTestUtils {
}, 2000, 200000);
}
/**
* Get all containers of a service.
*/
private Multimap<String, String> getContainersForAllComp(ServiceClient client,
Service example) throws IOException, YarnException {
Multimap<String, String> allContainers = HashMultimap.create();
Service retrievedApp = client.getStatus(example.getName());
retrievedApp.getComponents().forEach(component -> {
if (component.getContainers() != null) {
component.getContainers().forEach(container -> {
allContainers.put(component.getName(), container.getId());
});
}
});
return allContainers;
}
private ServiceClient createClient() throws Exception {
ServiceClient client = new ServiceClient() {
@Override protected Path addJarResource(String appName,

View File

@ -115,7 +115,7 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
this(containerID, 0, hostName, appSubmitter, r, expiryTimeStamp,
masterKeyId, rmIdentifier, priority, creationTime,
logAggregationContext, nodeLabelExpression, containerType,
ExecutionType.GUARANTEED);
ExecutionType.GUARANTEED, -1);
}
public ContainerTokenIdentifier(ContainerId containerID, int containerVersion,
@ -123,6 +123,19 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
int masterKeyId, long rmIdentifier, Priority priority, long creationTime,
LogAggregationContext logAggregationContext, String nodeLabelExpression,
ContainerType containerType, ExecutionType executionType) {
this(containerID, containerVersion, hostName, appSubmitter, r,
expiryTimeStamp, masterKeyId, rmIdentifier, priority, creationTime,
logAggregationContext, nodeLabelExpression, containerType,
executionType, -1);
}
public ContainerTokenIdentifier(ContainerId containerID, int containerVersion,
String hostName, String appSubmitter, Resource r, long expiryTimeStamp,
int masterKeyId, long rmIdentifier, Priority priority, long creationTime,
LogAggregationContext logAggregationContext, String nodeLabelExpression,
ContainerType containerType, ExecutionType executionType,
long allocationRequestId) {
ContainerTokenIdentifierProto.Builder builder =
ContainerTokenIdentifierProto.newBuilder();
if (containerID != null) {
@ -152,6 +165,7 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
}
builder.setContainerType(convertToProtoFormat(containerType));
builder.setExecutionType(convertToProtoFormat(executionType));
builder.setAllocationRequestId(allocationRequestId);
proto = builder.build();
}
@ -243,6 +257,10 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
return new LogAggregationContextPBImpl(proto.getLogAggregationContext());
}
public long getAllocationRequestId() {
return proto.getAllocationRequestId();
}
@Override
public void write(DataOutput out) throws IOException {
LOG.debug("Writing ContainerTokenIdentifier to RPC layer: " + this);

View File

@ -53,6 +53,7 @@ message ContainerTokenIdentifierProto {
optional ContainerTypeProto containerType = 12;
optional ExecutionTypeProto executionType = 13 [default = GUARANTEED];
optional int32 version = 14 [default = 0];
optional int64 allocation_request_id = 15 [default = -1];
}
message ClientToAMTokenIdentifierProto {

View File

@ -41,14 +41,14 @@ public abstract class NMContainerStatus {
long creationTime) {
return newInstance(containerId, version, containerState, allocatedResource,
diagnostics, containerExitStatus, priority, creationTime,
CommonNodeLabelsManager.NO_LABEL, ExecutionType.GUARANTEED);
CommonNodeLabelsManager.NO_LABEL, ExecutionType.GUARANTEED, -1);
}
public static NMContainerStatus newInstance(ContainerId containerId,
int version, ContainerState containerState, Resource allocatedResource,
String diagnostics, int containerExitStatus, Priority priority,
long creationTime, String nodeLabelExpression,
ExecutionType executionType) {
ExecutionType executionType, long allocationRequestId) {
NMContainerStatus status =
Records.newRecord(NMContainerStatus.class);
status.setContainerId(containerId);
@ -61,6 +61,7 @@ public abstract class NMContainerStatus {
status.setCreationTime(creationTime);
status.setNodeLabelExpression(nodeLabelExpression);
status.setExecutionType(executionType);
status.setAllocationRequestId(allocationRequestId);
return status;
}
@ -130,6 +131,19 @@ public abstract class NMContainerStatus {
public abstract void setNodeLabelExpression(
String nodeLabelExpression);
/**
* @return the <em>ID</em> corresponding to the original allocation request.
*/
public abstract long getAllocationRequestId();
/**
* Set the <em>ID</em> corresponding to the original allocation request.
*
* @param allocationRequestId the <em>ID</em> corresponding to the original
* allocation request.
*/
public abstract void setAllocationRequestId(long allocationRequestId);
public int getVersion() {
return 0;
}

View File

@ -91,7 +91,8 @@ public class NMContainerStatusPBImpl extends NMContainerStatus {
.append("Diagnostics: ").append(getDiagnostics()).append(", ")
.append("ExitStatus: ").append(getContainerExitStatus()).append(", ")
.append("NodeLabelExpression: ").append(getNodeLabelExpression())
.append("Priority: ").append(getPriority())
.append("Priority: ").append(getPriority()).append(", ")
.append("AllocationRequestId: ").append(getAllocationRequestId())
.append("]");
return sb.toString();
}
@ -270,6 +271,18 @@ public class NMContainerStatusPBImpl extends NMContainerStatus {
builder.setExecutionType(convertToProtoFormat(executionType));
}
@Override
public long getAllocationRequestId() {
NMContainerStatusProtoOrBuilder p = viaProto ? proto : builder;
return (p.getAllocationRequestId());
}
@Override
public void setAllocationRequestId(long allocationRequestId) {
maybeInitBuilder();
builder.setAllocationRequestId(allocationRequestId);
}
private void mergeLocalToBuilder() {
if (this.containerId != null
&& !((ContainerIdPBImpl) containerId).getProto().equals(

View File

@ -564,7 +564,7 @@ public class OpportunisticContainerAllocator {
tokenSecretManager.getCurrentKey().getKeyId(), rmIdentifier,
schedulerKey.getPriority(), currTime,
null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK,
ExecutionType.OPPORTUNISTIC);
ExecutionType.OPPORTUNISTIC, schedulerKey.getAllocationRequestId());
byte[] pwd =
tokenSecretManager.createPassword(containerTokenIdentifier);
Token containerToken = newContainerToken(node.getNodeId(), pwd,

View File

@ -176,6 +176,7 @@ message NMContainerStatusProto {
optional string nodeLabelExpression = 8;
optional int32 version = 9;
optional ExecutionTypeProto executionType = 10 [default = GUARANTEED];
optional int64 allocation_request_id = 11 [default = -1];
}
message SCMUploaderNotifyRequestProto {

View File

@ -437,7 +437,7 @@ public class ContainerManagerImpl extends CompositeService implements
throws IOException {
StartContainerRequest req = rcs.getStartRequest();
ContainerLaunchContext launchContext = req.getContainerLaunchContext();
ContainerTokenIdentifier token = null;
ContainerTokenIdentifier token;
if(rcs.getCapability() != null) {
ContainerTokenIdentifier originalToken =
BuilderUtils.newContainerTokenIdentifier(req.getContainerToken());
@ -449,7 +449,8 @@ public class ContainerManagerImpl extends CompositeService implements
originalToken.getCreationTime(),
originalToken.getLogAggregationContext(),
originalToken.getNodeLabelExpression(),
originalToken.getContainerType(), originalToken.getExecutionType());
originalToken.getContainerType(), originalToken.getExecutionType(),
originalToken.getAllocationRequestId());
} else {
token = BuilderUtils.newContainerTokenIdentifier(req.getContainerToken());

View File

@ -844,12 +844,14 @@ public class ContainerImpl implements Container {
public NMContainerStatus getNMContainerStatus() {
this.readLock.lock();
try {
return NMContainerStatus.newInstance(this.containerId, this.version,
getCurrentState(), getResource(), diagnostics.toString(), exitCode,
return NMContainerStatus.newInstance(this.containerId,
this.version, getCurrentState(), getResource(),
diagnostics.toString(), exitCode,
containerTokenIdentifier.getPriority(),
containerTokenIdentifier.getCreationTime(),
containerTokenIdentifier.getNodeLabelExpression(),
containerTokenIdentifier.getExecutionType());
containerTokenIdentifier.getExecutionType(),
containerTokenIdentifier.getAllocationRequestId());
} finally {
this.readLock.unlock();
}

View File

@ -571,6 +571,7 @@ public abstract class AbstractYarnScheduler
status.getPriority(), null);
container.setVersion(status.getVersion());
container.setExecutionType(status.getExecutionType());
container.setAllocationRequestId(status.getAllocationRequestId());
ApplicationAttemptId attemptId =
container.getId().getApplicationAttemptId();
RMContainer rmContainer = new RMContainerImpl(container,

View File

@ -655,7 +655,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
container.getNodeId(), getUser(), container.getResource(),
container.getPriority(), rmContainer.getCreationTime(),
this.logAggregationContext, rmContainer.getNodeLabelExpression(),
containerType, container.getExecutionType()));
containerType, container.getExecutionType(),
container.getAllocationRequestId()));
updateNMToken(container);
} catch (IllegalArgumentException e) {
// DNS might be down, skip returning this container.

View File

@ -167,7 +167,7 @@ public class RMContainerTokenSecretManager extends
}
/**
* Helper function for creating ContainerTokens
* Helper function for creating ContainerTokens.
*
* @param containerId Container Id
* @param containerVersion Container Version
@ -183,37 +183,13 @@ public class RMContainerTokenSecretManager extends
Resource capability, Priority priority, long createTime) {
return createContainerToken(containerId, containerVersion, nodeId,
appSubmitter, capability, priority, createTime,
null, null, ContainerType.TASK);
null, null, ContainerType.TASK,
ExecutionType.GUARANTEED, -1);
}
/**
* Helper function for creating ContainerTokens.
*
* @param containerId containerId.
* @param containerVersion containerVersion.
* @param nodeId nodeId.
* @param appSubmitter appSubmitter.
* @param capability capability.
* @param priority priority.
* @param createTime createTime.
* @param logAggregationContext logAggregationContext.
* @param nodeLabelExpression nodeLabelExpression.
* @param containerType containerType.
* @return the container-token.
*/
public Token createContainerToken(ContainerId containerId,
int containerVersion, NodeId nodeId, String appSubmitter,
Resource capability, Priority priority, long createTime,
LogAggregationContext logAggregationContext, String nodeLabelExpression,
ContainerType containerType) {
return createContainerToken(containerId, containerVersion, nodeId,
appSubmitter, capability, priority, createTime, null, null,
ContainerType.TASK, ExecutionType.GUARANTEED);
}
/**
* Helper function for creating ContainerTokens
*
* @param containerId Container Id
* @param containerVersion Container version
* @param nodeId Node Id
@ -225,13 +201,15 @@ public class RMContainerTokenSecretManager extends
* @param nodeLabelExpression Node Label Expression
* @param containerType Container Type
* @param execType Execution Type
* @param allocationRequestId allocationRequestId
* @return the container-token
*/
public Token createContainerToken(ContainerId containerId,
int containerVersion, NodeId nodeId, String appSubmitter,
Resource capability, Priority priority, long createTime,
LogAggregationContext logAggregationContext, String nodeLabelExpression,
ContainerType containerType, ExecutionType execType) {
ContainerType containerType, ExecutionType execType,
long allocationRequestId) {
byte[] password;
ContainerTokenIdentifier tokenIdentifier;
long expiryTimeStamp =
@ -246,7 +224,7 @@ public class RMContainerTokenSecretManager extends
this.currentMasterKey.getMasterKey().getKeyId(),
ResourceManager.getClusterTimeStamp(), priority, createTime,
logAggregationContext, nodeLabelExpression, containerType,
execType);
execType, allocationRequestId);
password = this.createPassword(tokenIdentifier);
} finally {

View File

@ -2113,7 +2113,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
NMContainerStatus.newInstance(containerId, 0, containerState,
Resource.newInstance(1024, 1), "recover container", 0,
Priority.newInstance(0), 0, nodeLabelExpression,
ExecutionType.GUARANTEED);
ExecutionType.GUARANTEED, -1);
return containerReport;
}

View File

@ -2088,17 +2088,17 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
NMContainerStatus.newInstance(c1, 1, ContainerState.RUNNING,
Resource.newInstance(1024, 1), "Dummy Queued OC",
ContainerExitStatus.INVALID, Priority.newInstance(5), 1234, "",
ExecutionType.OPPORTUNISTIC);
ExecutionType.OPPORTUNISTIC, -1);
NMContainerStatus runningOpp =
NMContainerStatus.newInstance(c2, 1, ContainerState.RUNNING,
Resource.newInstance(2048, 1), "Dummy Running OC",
ContainerExitStatus.INVALID, Priority.newInstance(6), 1234, "",
ExecutionType.OPPORTUNISTIC);
ExecutionType.OPPORTUNISTIC, -1);
NMContainerStatus runningGuar =
NMContainerStatus.newInstance(c3, 1, ContainerState.RUNNING,
Resource.newInstance(2048, 1), "Dummy Running GC",
ContainerExitStatus.INVALID, Priority.newInstance(6), 1234, "",
ExecutionType.GUARANTEED);
ExecutionType.GUARANTEED, -1);
req.setContainerStatuses(Arrays.asList(queuedOpp, runningOpp, runningGuar));
// trying to register a invalid node.
RegisterNodeManagerResponse response =

View File

@ -295,12 +295,13 @@ public class TestContainerAllocation {
int containerVersion, NodeId nodeId, String appSubmitter,
Resource capability, Priority priority, long createTime,
LogAggregationContext logAggregationContext, String nodeLabelExp,
ContainerType containerType, ExecutionType executionType) {
ContainerType containerType, ExecutionType executionType,
long allocationRequestId) {
numRetries++;
return super.createContainerToken(containerId, containerVersion,
nodeId, appSubmitter, capability, priority, createTime,
logAggregationContext, nodeLabelExp, containerType,
executionType);
executionType, allocationRequestId);
}
};
}