YARN-562. Modified NM to reject any containers allocated by a previous ResourceManager. Contributed by Jian He.
MAPREDUCE-5167. Update MR App after YARN-562 to use the new builder API for the container. Contributed by Jian He. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1476034 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
06fb184d4d
commit
fbb55784d9
|
@ -356,6 +356,9 @@ Release 2.0.5-beta - UNRELEASED
|
||||||
MAPREDUCE-5178. Update MR App to set progress in ApplicationReport after
|
MAPREDUCE-5178. Update MR App to set progress in ApplicationReport after
|
||||||
YARN-577. (Hitesh Shah via vinodkv)
|
YARN-577. (Hitesh Shah via vinodkv)
|
||||||
|
|
||||||
|
MAPREDUCE-5167. Update MR App after YARN-562 to use the new builder API
|
||||||
|
for the container. (Jian He via vinodkv)
|
||||||
|
|
||||||
Release 2.0.4-alpha - UNRELEASED
|
Release 2.0.4-alpha - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -1094,12 +1094,12 @@ public abstract class TaskAttemptImpl implements
|
||||||
+ taInfo.getPort());
|
+ taInfo.getPort());
|
||||||
String nodeHttpAddress = StringInterner.weakIntern(taInfo.getHostname() + ":"
|
String nodeHttpAddress = StringInterner.weakIntern(taInfo.getHostname() + ":"
|
||||||
+ taInfo.getHttpPort());
|
+ taInfo.getHttpPort());
|
||||||
// Resource/Priority/Tokens are only needed while launching the
|
// Resource/Priority/Tokens and RMIdentifier are only needed while
|
||||||
// container on an NM, these are already completed tasks, so setting them to
|
// launching the container on an NM, these are already completed tasks, so
|
||||||
// null
|
// setting them to null and RMIdentifier as 0
|
||||||
container =
|
container =
|
||||||
BuilderUtils.newContainer(containerId, containerNodeId,
|
BuilderUtils.newContainer(containerId, containerNodeId,
|
||||||
nodeHttpAddress, null, null, null);
|
nodeHttpAddress, null, null, null, 0);
|
||||||
computeRackAndLocality();
|
computeRackAndLocality();
|
||||||
launchTime = taInfo.getStartTime();
|
launchTime = taInfo.getStartTime();
|
||||||
finishTime = (taInfo.getFinishTime() != -1) ?
|
finishTime = (taInfo.getFinishTime() != -1) ?
|
||||||
|
|
|
@ -519,7 +519,7 @@ public class MRApp extends MRAppMaster {
|
||||||
cId.setId(containerCount++);
|
cId.setId(containerCount++);
|
||||||
NodeId nodeId = BuilderUtils.newNodeId(NM_HOST, NM_PORT);
|
NodeId nodeId = BuilderUtils.newNodeId(NM_HOST, NM_PORT);
|
||||||
Container container = BuilderUtils.newContainer(cId, nodeId,
|
Container container = BuilderUtils.newContainer(cId, nodeId,
|
||||||
NM_HOST + ":" + NM_HTTP_PORT, null, null, null);
|
NM_HOST + ":" + NM_HTTP_PORT, null, null, null, 0);
|
||||||
JobID id = TypeConverter.fromYarn(applicationId);
|
JobID id = TypeConverter.fromYarn(applicationId);
|
||||||
JobId jobId = TypeConverter.toYarn(id);
|
JobId jobId = TypeConverter.toYarn(id);
|
||||||
getContext().getEventHandler().handle(new JobHistoryEvent(jobId,
|
getContext().getEventHandler().handle(new JobHistoryEvent(jobId,
|
||||||
|
|
|
@ -243,7 +243,7 @@ public class MRAppBenchmark {
|
||||||
.newContainer(containerId, BuilderUtils.newNodeId("host"
|
.newContainer(containerId, BuilderUtils.newNodeId("host"
|
||||||
+ containerId.getId(), 2345),
|
+ containerId.getId(), 2345),
|
||||||
"host" + containerId.getId() + ":5678", req
|
"host" + containerId.getId() + ":5678", req
|
||||||
.getCapability(), req.getPriority(), null));
|
.getCapability(), req.getPriority(), null, 0));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -187,6 +187,9 @@ Release 2.0.5-beta - UNRELEASED
|
||||||
YARN-595. Refactor fair scheduler to use common Resources. (Sandy Ryza
|
YARN-595. Refactor fair scheduler to use common Resources. (Sandy Ryza
|
||||||
via tomwhite)
|
via tomwhite)
|
||||||
|
|
||||||
|
YARN-562. Modified NM to reject any containers allocated by a previous
|
||||||
|
ResourceManager. (Jian He via vinodkv)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -135,4 +135,16 @@ public interface Container extends Comparable<Container> {
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
void setContainerToken(ContainerToken containerToken);
|
void setContainerToken(ContainerToken containerToken);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the RMIdentifier of RM in which containers are allocated
|
||||||
|
* @return RMIdentifier
|
||||||
|
*/
|
||||||
|
@Private
|
||||||
|
@Unstable
|
||||||
|
long getRMIdentifer();
|
||||||
|
|
||||||
|
@Private
|
||||||
|
@Unstable
|
||||||
|
void setRMIdentifier(long rmIdentifier);
|
||||||
}
|
}
|
||||||
|
|
|
@ -230,6 +230,18 @@ public class ContainerPBImpl extends ProtoBase<ContainerProto> implements Contai
|
||||||
this.containerToken = containerToken;
|
this.containerToken = containerToken;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getRMIdentifer() {
|
||||||
|
ContainerProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
return p.getRmIdentifier();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setRMIdentifier(long rmIdentifier) {
|
||||||
|
maybeInitBuilder();
|
||||||
|
builder.setRmIdentifier((rmIdentifier));
|
||||||
|
}
|
||||||
|
|
||||||
private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
|
private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
|
||||||
return new ContainerIdPBImpl(p);
|
return new ContainerIdPBImpl(p);
|
||||||
}
|
}
|
||||||
|
|
|
@ -68,6 +68,7 @@ message ContainerProto {
|
||||||
optional ResourceProto resource = 4;
|
optional ResourceProto resource = 4;
|
||||||
optional PriorityProto priority = 5;
|
optional PriorityProto priority = 5;
|
||||||
optional hadoop.common.TokenProto container_token = 6;
|
optional hadoop.common.TokenProto container_token = 6;
|
||||||
|
optional int64 rm_identifier = 7;
|
||||||
}
|
}
|
||||||
|
|
||||||
enum YarnApplicationStateProto {
|
enum YarnApplicationStateProto {
|
||||||
|
|
|
@ -56,7 +56,7 @@ public class TestAMRMClientAsync {
|
||||||
BuilderUtils.newContainerId(0, 0, 0, 0),
|
BuilderUtils.newContainerId(0, 0, 0, 0),
|
||||||
ContainerState.COMPLETE, "", 0));
|
ContainerState.COMPLETE, "", 0));
|
||||||
List<Container> allocated1 = Arrays.asList(
|
List<Container> allocated1 = Arrays.asList(
|
||||||
BuilderUtils.newContainer(null, null, null, null, null, null));
|
BuilderUtils.newContainer(null, null, null, null, null, null, 0));
|
||||||
final AllocateResponse response1 = createAllocateResponse(
|
final AllocateResponse response1 = createAllocateResponse(
|
||||||
new ArrayList<ContainerStatus>(), allocated1);
|
new ArrayList<ContainerStatus>(), allocated1);
|
||||||
final AllocateResponse response2 = createAllocateResponse(completed1,
|
final AllocateResponse response2 = createAllocateResponse(completed1,
|
||||||
|
|
|
@ -237,9 +237,9 @@ public class BuilderUtils {
|
||||||
return containerStatus;
|
return containerStatus;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Container newContainer(ContainerId containerId,
|
public static Container newContainer(ContainerId containerId, NodeId nodeId,
|
||||||
NodeId nodeId, String nodeHttpAddress,
|
String nodeHttpAddress, Resource resource, Priority priority,
|
||||||
Resource resource, Priority priority, ContainerToken containerToken) {
|
ContainerToken containerToken, long rmIdentifier) {
|
||||||
Container container = recordFactory.newRecordInstance(Container.class);
|
Container container = recordFactory.newRecordInstance(Container.class);
|
||||||
container.setId(containerId);
|
container.setId(containerId);
|
||||||
container.setNodeId(nodeId);
|
container.setNodeId(nodeId);
|
||||||
|
@ -247,6 +247,7 @@ public class BuilderUtils {
|
||||||
container.setResource(resource);
|
container.setResource(resource);
|
||||||
container.setPriority(priority);
|
container.setPriority(priority);
|
||||||
container.setContainerToken(containerToken);
|
container.setContainerToken(containerToken);
|
||||||
|
container.setRMIdentifier(rmIdentifier);
|
||||||
return container;
|
return container;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -105,7 +105,7 @@ public class TestContainerLaunchRPC {
|
||||||
containerId.setId(100);
|
containerId.setId(100);
|
||||||
Container container =
|
Container container =
|
||||||
BuilderUtils.newContainer(containerId, null, null, recordFactory
|
BuilderUtils.newContainer(containerId, null, null, recordFactory
|
||||||
.newRecordInstance(Resource.class), null, null);
|
.newRecordInstance(Resource.class), null, null, 0);
|
||||||
|
|
||||||
StartContainerRequest scRequest = recordFactory
|
StartContainerRequest scRequest = recordFactory
|
||||||
.newRecordInstance(StartContainerRequest.class);
|
.newRecordInstance(StartContainerRequest.class);
|
||||||
|
|
|
@ -128,7 +128,7 @@ public class TestRPC {
|
||||||
containerId.setId(100);
|
containerId.setId(100);
|
||||||
Container mockContainer =
|
Container mockContainer =
|
||||||
BuilderUtils.newContainer(containerId, null, null, recordFactory
|
BuilderUtils.newContainer(containerId, null, null, recordFactory
|
||||||
.newRecordInstance(Resource.class), null, null);
|
.newRecordInstance(Resource.class), null, null, 0);
|
||||||
// containerLaunchContext.env = new HashMap<CharSequence, CharSequence>();
|
// containerLaunchContext.env = new HashMap<CharSequence, CharSequence>();
|
||||||
// containerLaunchContext.command = new ArrayList<CharSequence>();
|
// containerLaunchContext.command = new ArrayList<CharSequence>();
|
||||||
|
|
||||||
|
|
|
@ -30,4 +30,7 @@ public interface RegisterNodeManagerResponse {
|
||||||
|
|
||||||
void setNodeAction(NodeAction nodeAction);
|
void setNodeAction(NodeAction nodeAction);
|
||||||
|
|
||||||
|
long getRMIdentifier();
|
||||||
|
|
||||||
|
void setRMIdentifier(long rmIdentifier);
|
||||||
}
|
}
|
||||||
|
|
|
@ -121,6 +121,18 @@ public class RegisterNodeManagerResponsePBImpl extends ProtoBase<RegisterNodeMan
|
||||||
rebuild = true;
|
rebuild = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getRMIdentifier() {
|
||||||
|
RegisterNodeManagerResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
return (p.getRmIdentifier());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setRMIdentifier(long rmIdentifier) {
|
||||||
|
maybeInitBuilder();
|
||||||
|
builder.setRmIdentifier(rmIdentifier);
|
||||||
|
}
|
||||||
|
|
||||||
private NodeAction convertFromProtoFormat(NodeActionProto p) {
|
private NodeAction convertFromProtoFormat(NodeActionProto p) {
|
||||||
return NodeAction.valueOf(p.name());
|
return NodeAction.valueOf(p.name());
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,6 +33,7 @@ message RegisterNodeManagerRequestProto {
|
||||||
message RegisterNodeManagerResponseProto {
|
message RegisterNodeManagerResponseProto {
|
||||||
optional MasterKeyProto master_key = 1;
|
optional MasterKeyProto master_key = 1;
|
||||||
optional NodeActionProto nodeAction = 2;
|
optional NodeActionProto nodeAction = 2;
|
||||||
|
optional int64 rm_identifier = 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
message NodeHeartbeatRequestProto {
|
message NodeHeartbeatRequestProto {
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager;
|
||||||
|
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.ContainerManager;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
|
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
|
||||||
|
@ -48,4 +49,6 @@ public interface Context {
|
||||||
NMContainerTokenSecretManager getContainerTokenSecretManager();
|
NMContainerTokenSecretManager getContainerTokenSecretManager();
|
||||||
|
|
||||||
NodeHealthStatus getNodeHealthStatus();
|
NodeHealthStatus getNodeHealthStatus();
|
||||||
|
|
||||||
|
ContainerManager getContainerManager();
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.hadoop.util.ShutdownHookManager;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.yarn.YarnException;
|
import org.apache.hadoop.yarn.YarnException;
|
||||||
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
|
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
|
||||||
|
import org.apache.hadoop.yarn.api.ContainerManager;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
|
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
|
||||||
|
@ -164,6 +165,7 @@ public class NodeManager extends CompositeService
|
||||||
addService(nodeHealthChecker);
|
addService(nodeHealthChecker);
|
||||||
dirsHandler = nodeHealthChecker.getDiskHandler();
|
dirsHandler = nodeHealthChecker.getDiskHandler();
|
||||||
|
|
||||||
|
|
||||||
nodeStatusUpdater =
|
nodeStatusUpdater =
|
||||||
createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
|
createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
|
||||||
|
|
||||||
|
@ -174,6 +176,7 @@ public class NodeManager extends CompositeService
|
||||||
createContainerManager(context, exec, del, nodeStatusUpdater,
|
createContainerManager(context, exec, del, nodeStatusUpdater,
|
||||||
this.aclsManager, dirsHandler);
|
this.aclsManager, dirsHandler);
|
||||||
addService(containerManager);
|
addService(containerManager);
|
||||||
|
((NMContext) context).setContainerManager(containerManager);
|
||||||
|
|
||||||
Service webServer = createWebServer(context, containerManager
|
Service webServer = createWebServer(context, containerManager
|
||||||
.getContainersMonitor(), this.aclsManager, dirsHandler);
|
.getContainersMonitor(), this.aclsManager, dirsHandler);
|
||||||
|
@ -221,11 +224,13 @@ public class NodeManager extends CompositeService
|
||||||
DefaultMetricsSystem.shutdown();
|
DefaultMetricsSystem.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void cleanupContainersOnResync() {
|
protected void resyncWithRM() {
|
||||||
//we do not want to block dispatcher thread here
|
//we do not want to block dispatcher thread here
|
||||||
new Thread() {
|
new Thread() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
LOG.info("Notifying ContainerManager to block new container-requests");
|
||||||
|
containerManager.setBlockNewContainerRequests(true);
|
||||||
cleanupContainers(NodeManagerEventType.RESYNC);
|
cleanupContainers(NodeManagerEventType.RESYNC);
|
||||||
((NodeStatusUpdaterImpl) nodeStatusUpdater ).rebootNodeStatusUpdater();
|
((NodeStatusUpdaterImpl) nodeStatusUpdater ).rebootNodeStatusUpdater();
|
||||||
}
|
}
|
||||||
|
@ -296,7 +301,7 @@ public class NodeManager extends CompositeService
|
||||||
new ConcurrentSkipListMap<ContainerId, Container>();
|
new ConcurrentSkipListMap<ContainerId, Container>();
|
||||||
|
|
||||||
private final NMContainerTokenSecretManager containerTokenSecretManager;
|
private final NMContainerTokenSecretManager containerTokenSecretManager;
|
||||||
|
private ContainerManager containerManager;
|
||||||
private final NodeHealthStatus nodeHealthStatus = RecordFactoryProvider
|
private final NodeHealthStatus nodeHealthStatus = RecordFactoryProvider
|
||||||
.getRecordFactory(null).newRecordInstance(NodeHealthStatus.class);
|
.getRecordFactory(null).newRecordInstance(NodeHealthStatus.class);
|
||||||
|
|
||||||
|
@ -333,6 +338,15 @@ public class NodeManager extends CompositeService
|
||||||
public NodeHealthStatus getNodeHealthStatus() {
|
public NodeHealthStatus getNodeHealthStatus() {
|
||||||
return this.nodeHealthStatus;
|
return this.nodeHealthStatus;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ContainerManager getContainerManager() {
|
||||||
|
return this.containerManager;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setContainerManager(ContainerManager containerManager) {
|
||||||
|
this.containerManager = containerManager;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -376,7 +390,7 @@ public class NodeManager extends CompositeService
|
||||||
stop();
|
stop();
|
||||||
break;
|
break;
|
||||||
case RESYNC:
|
case RESYNC:
|
||||||
cleanupContainersOnResync();
|
resyncWithRM();
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
LOG.warn("Invalid shutdown event " + event.getType() + ". Ignoring.");
|
LOG.warn("Invalid shutdown event " + event.getType() + ". Ignoring.");
|
||||||
|
|
|
@ -24,5 +24,8 @@ import org.apache.hadoop.yarn.service.Service;
|
||||||
public interface NodeStatusUpdater extends Service {
|
public interface NodeStatusUpdater extends Service {
|
||||||
|
|
||||||
void sendOutofBandHeartBeat();
|
void sendOutofBandHeartBeat();
|
||||||
|
|
||||||
NodeStatus getNodeStatusAndUpdateContainersInContext();
|
NodeStatus getNodeStatusAndUpdateContainersInContext();
|
||||||
|
|
||||||
|
long getRMIdentifier();
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||||
|
import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
|
||||||
import org.apache.hadoop.yarn.server.api.ResourceTracker;
|
import org.apache.hadoop.yarn.server.api.ResourceTracker;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||||
|
@ -56,6 +57,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResp
|
||||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||||
import org.apache.hadoop.yarn.service.AbstractService;
|
import org.apache.hadoop.yarn.service.AbstractService;
|
||||||
|
@ -95,6 +97,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
|
|
||||||
private Runnable statusUpdaterRunnable;
|
private Runnable statusUpdaterRunnable;
|
||||||
private Thread statusUpdater;
|
private Thread statusUpdater;
|
||||||
|
private long rmIdentifier = ResourceManagerConstants.RM_INVALID_IDENTIFIER;
|
||||||
|
|
||||||
public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
|
public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
|
||||||
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
|
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
|
||||||
|
@ -267,6 +270,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
this.resourceTracker = getRMClient();
|
this.resourceTracker = getRMClient();
|
||||||
regNMResponse =
|
regNMResponse =
|
||||||
this.resourceTracker.registerNodeManager(request);
|
this.resourceTracker.registerNodeManager(request);
|
||||||
|
this.rmIdentifier = regNMResponse.getRMIdentifier();
|
||||||
break;
|
break;
|
||||||
} catch(Throwable e) {
|
} catch(Throwable e) {
|
||||||
LOG.warn("Trying to connect to ResourceManager, " +
|
LOG.warn("Trying to connect to ResourceManager, " +
|
||||||
|
@ -308,7 +312,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
|
|
||||||
LOG.info("Registered with ResourceManager as " + this.nodeId
|
LOG.info("Registered with ResourceManager as " + this.nodeId
|
||||||
+ " with total resource of " + this.totalResource);
|
+ " with total resource of " + this.totalResource);
|
||||||
|
LOG.info("Notifying ContainerManager to unblock new container-requests");
|
||||||
|
((ContainerManagerImpl) this.context.getContainerManager())
|
||||||
|
.setBlockNewContainerRequests(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<ApplicationId> createKeepAliveApplicationList() {
|
private List<ApplicationId> createKeepAliveApplicationList() {
|
||||||
|
@ -334,6 +340,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
return appList;
|
return appList;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public NodeStatus getNodeStatusAndUpdateContainersInContext() {
|
public NodeStatus getNodeStatusAndUpdateContainersInContext() {
|
||||||
|
|
||||||
NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class);
|
NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class);
|
||||||
|
@ -407,6 +414,11 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getRMIdentifier() {
|
||||||
|
return this.rmIdentifier;
|
||||||
|
}
|
||||||
|
|
||||||
protected void startStatusUpdater() {
|
protected void startStatusUpdater() {
|
||||||
|
|
||||||
statusUpdaterRunnable = new Runnable() {
|
statusUpdaterRunnable = new Runnable() {
|
||||||
|
@ -478,6 +490,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
if (response.getNodeAction() == NodeAction.RESYNC) {
|
if (response.getNodeAction() == NodeAction.RESYNC) {
|
||||||
LOG.info("Node is out of sync with ResourceManager,"
|
LOG.info("Node is out of sync with ResourceManager,"
|
||||||
+ " hence rebooting.");
|
+ " hence rebooting.");
|
||||||
|
// Invalidate the RMIdentifier while resync
|
||||||
|
NodeStatusUpdaterImpl.this.rmIdentifier =
|
||||||
|
ResourceManagerConstants.RM_INVALID_IDENTIFIER;
|
||||||
dispatcher.getEventHandler().handle(
|
dispatcher.getEventHandler().handle(
|
||||||
new NodeManagerEvent(NodeManagerEventType.RESYNC));
|
new NodeManagerEvent(NodeManagerEventType.RESYNC));
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -23,10 +23,9 @@ import static org.apache.hadoop.yarn.service.Service.STATE.STARTED;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -125,6 +124,7 @@ public class ContainerManagerImpl extends CompositeService implements
|
||||||
private final ApplicationACLsManager aclsManager;
|
private final ApplicationACLsManager aclsManager;
|
||||||
|
|
||||||
private final DeletionService deletionService;
|
private final DeletionService deletionService;
|
||||||
|
private AtomicBoolean blockNewContainerRequests = new AtomicBoolean(false);
|
||||||
|
|
||||||
public ContainerManagerImpl(Context context, ContainerExecutor exec,
|
public ContainerManagerImpl(Context context, ContainerExecutor exec,
|
||||||
DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
|
DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
|
||||||
|
@ -240,6 +240,9 @@ public class ContainerManagerImpl extends CompositeService implements
|
||||||
refreshServiceAcls(conf, new NMPolicyProvider());
|
refreshServiceAcls(conf, new NMPolicyProvider());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LOG.info("Blocking new container-requests as container manager rpc" +
|
||||||
|
" server is still starting.");
|
||||||
|
this.setBlockNewContainerRequests(true);
|
||||||
server.start();
|
server.start();
|
||||||
InetSocketAddress connectAddress = NetUtils.getConnectAddress(server);
|
InetSocketAddress connectAddress = NetUtils.getConnectAddress(server);
|
||||||
this.context.getNodeId().setHost(connectAddress.getHostName());
|
this.context.getNodeId().setHost(connectAddress.getHostName());
|
||||||
|
@ -393,6 +396,13 @@ public class ContainerManagerImpl extends CompositeService implements
|
||||||
@Override
|
@Override
|
||||||
public StartContainerResponse startContainer(StartContainerRequest request)
|
public StartContainerResponse startContainer(StartContainerRequest request)
|
||||||
throws YarnRemoteException {
|
throws YarnRemoteException {
|
||||||
|
|
||||||
|
if (blockNewContainerRequests.get()) {
|
||||||
|
throw RPCUtil.getRemoteException(new NMNotYetReadyException(
|
||||||
|
"Rejecting new containers as NodeManager has not" +
|
||||||
|
" yet connected with ResourceManager"));
|
||||||
|
}
|
||||||
|
|
||||||
ContainerLaunchContext launchContext = request.getContainerLaunchContext();
|
ContainerLaunchContext launchContext = request.getContainerLaunchContext();
|
||||||
org.apache.hadoop.yarn.api.records.Container lauchContainer =
|
org.apache.hadoop.yarn.api.records.Container lauchContainer =
|
||||||
request.getContainer();
|
request.getContainer();
|
||||||
|
@ -402,6 +412,16 @@ public class ContainerManagerImpl extends CompositeService implements
|
||||||
UserGroupInformation remoteUgi = getRemoteUgi(containerIDStr);
|
UserGroupInformation remoteUgi = getRemoteUgi(containerIDStr);
|
||||||
authorizeRequest(containerIDStr, launchContext, lauchContainer, remoteUgi);
|
authorizeRequest(containerIDStr, launchContext, lauchContainer, remoteUgi);
|
||||||
|
|
||||||
|
// Is the container coming from unknown RM
|
||||||
|
if (lauchContainer.getRMIdentifer() != nodeStatusUpdater
|
||||||
|
.getRMIdentifier()) {
|
||||||
|
String msg = "\nContainer "+ containerIDStr
|
||||||
|
+ " rejected as it is allocated by a previous RM";
|
||||||
|
LOG.error(msg);
|
||||||
|
throw RPCUtil
|
||||||
|
.getRemoteException(new InvalidContainerException(msg));
|
||||||
|
}
|
||||||
|
|
||||||
LOG.info("Start request for " + containerIDStr + " by user "
|
LOG.info("Start request for " + containerIDStr + " by user "
|
||||||
+ launchContext.getUser());
|
+ launchContext.getUser());
|
||||||
|
|
||||||
|
@ -615,6 +635,10 @@ public class ContainerManagerImpl extends CompositeService implements
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setBlockNewContainerRequests(boolean blockNewContainerRequests) {
|
||||||
|
this.blockNewContainerRequests.set(blockNewContainerRequests);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void stateChanged(Service service) {
|
public void stateChanged(Service service) {
|
||||||
// TODO Auto-generated method stub
|
// TODO Auto-generated method stub
|
||||||
|
|
|
@ -168,4 +168,9 @@ public class DummyContainerManager extends ContainerManagerImpl {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setBlockNewContainerRequests(boolean blockNewContainerRequests) {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -142,6 +142,17 @@ public class TestContainerManagerWithLCE extends TestContainerManager {
|
||||||
super.testLocalFilesCleanup();
|
super.testLocalFilesCleanup();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void testContainerLaunchFromPreviousRM() throws InterruptedException,
|
||||||
|
IOException {
|
||||||
|
// Don't run the test if the binary is not available.
|
||||||
|
if (!shouldRunTest()) {
|
||||||
|
LOG.info("LCE binary path is not passed. Not running the test");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
LOG.info("Running testContainerLaunchFromPreviousRM");
|
||||||
|
super.testContainerLaunchFromPreviousRM();
|
||||||
|
}
|
||||||
private boolean shouldRunTest() {
|
private boolean shouldRunTest() {
|
||||||
return System
|
return System
|
||||||
.getProperty(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH) != null;
|
.getProperty(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH) != null;
|
||||||
|
|
|
@ -24,17 +24,12 @@ import static org.mockito.Mockito.when;
|
||||||
import java.io.BufferedReader;
|
import java.io.BufferedReader;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileReader;
|
import java.io.FileReader;
|
||||||
import java.io.FileWriter;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.PrintWriter;
|
import java.io.PrintWriter;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.BrokenBarrierException;
|
|
||||||
import java.util.concurrent.ConcurrentMap;
|
|
||||||
import java.util.concurrent.CyclicBarrier;
|
|
||||||
|
|
||||||
import junit.framework.Assert;
|
import junit.framework.Assert;
|
||||||
|
|
||||||
|
@ -59,12 +54,9 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.URL;
|
import org.apache.hadoop.yarn.api.records.URL;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
|
||||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
@ -86,7 +78,6 @@ public class TestNodeManagerShutdown {
|
||||||
static final String user = "nobody";
|
static final String user = "nobody";
|
||||||
private FileContext localFS;
|
private FileContext localFS;
|
||||||
private ContainerId cId;
|
private ContainerId cId;
|
||||||
private CyclicBarrier syncBarrier = new CyclicBarrier(2);
|
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() throws UnsupportedFileSystemException {
|
public void setup() throws UnsupportedFileSystemException {
|
||||||
|
@ -110,7 +101,7 @@ public class TestNodeManagerShutdown {
|
||||||
NodeManager nm = getNodeManager();
|
NodeManager nm = getNodeManager();
|
||||||
nm.init(createNMConfig());
|
nm.init(createNMConfig());
|
||||||
nm.start();
|
nm.start();
|
||||||
startContainers(nm);
|
startContainer(nm, cId, localFS, tmpDir, processStartFile);
|
||||||
|
|
||||||
final int MAX_TRIES=20;
|
final int MAX_TRIES=20;
|
||||||
int numTries = 0;
|
int numTries = 0;
|
||||||
|
@ -151,28 +142,12 @@ public class TestNodeManagerShutdown {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
public static void startContainer(NodeManager nm, ContainerId cId,
|
||||||
@Test
|
FileContext localFS, File scriptFileDir, File processStartFile)
|
||||||
public void testKillContainersOnResync() throws IOException, InterruptedException {
|
throws IOException {
|
||||||
NodeManager nm = new TestNodeManager();
|
|
||||||
YarnConfiguration conf = createNMConfig();
|
|
||||||
nm.init(conf);
|
|
||||||
nm.start();
|
|
||||||
startContainers(nm);
|
|
||||||
|
|
||||||
assert ((TestNodeManager) nm).getNMRegistrationCount() == 1;
|
|
||||||
nm.getNMDispatcher().getEventHandler().
|
|
||||||
handle( new NodeManagerEvent(NodeManagerEventType.RESYNC));
|
|
||||||
try {
|
|
||||||
syncBarrier.await();
|
|
||||||
} catch (BrokenBarrierException e) {
|
|
||||||
}
|
|
||||||
assert ((TestNodeManager) nm).getNMRegistrationCount() == 2;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void startContainers(NodeManager nm) throws IOException {
|
|
||||||
ContainerManagerImpl containerManager = nm.getContainerManager();
|
ContainerManagerImpl containerManager = nm.getContainerManager();
|
||||||
File scriptFile = createUnhaltingScriptFile();
|
File scriptFile =
|
||||||
|
createUnhaltingScriptFile(cId, scriptFileDir, processStartFile);
|
||||||
|
|
||||||
ContainerLaunchContext containerLaunchContext =
|
ContainerLaunchContext containerLaunchContext =
|
||||||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||||
|
@ -218,7 +193,7 @@ public class TestNodeManagerShutdown {
|
||||||
Assert.assertEquals(ContainerState.RUNNING, containerStatus.getState());
|
Assert.assertEquals(ContainerState.RUNNING, containerStatus.getState());
|
||||||
}
|
}
|
||||||
|
|
||||||
private ContainerId createContainerId() {
|
public static ContainerId createContainerId() {
|
||||||
ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
|
ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
|
||||||
appId.setClusterTimestamp(0);
|
appId.setClusterTimestamp(0);
|
||||||
appId.setId(0);
|
appId.setId(0);
|
||||||
|
@ -247,8 +222,9 @@ public class TestNodeManagerShutdown {
|
||||||
* Creates a script to run a container that will run forever unless
|
* Creates a script to run a container that will run forever unless
|
||||||
* stopped by external means.
|
* stopped by external means.
|
||||||
*/
|
*/
|
||||||
private File createUnhaltingScriptFile() throws IOException {
|
private static File createUnhaltingScriptFile(ContainerId cId,
|
||||||
File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
|
File scriptFileDir, File processStartFile) throws IOException {
|
||||||
|
File scriptFile = Shell.appendScriptExtension(scriptFileDir, "scriptFile");
|
||||||
PrintWriter fileWriter = new PrintWriter(scriptFile);
|
PrintWriter fileWriter = new PrintWriter(scriptFile);
|
||||||
if (Shell.WINDOWS) {
|
if (Shell.WINDOWS) {
|
||||||
fileWriter.println("@echo \"Running testscript for delayed kill\"");
|
fileWriter.println("@echo \"Running testscript for delayed kill\"");
|
||||||
|
@ -282,48 +258,4 @@ public class TestNodeManagerShutdown {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
class TestNodeManager extends NodeManager {
|
|
||||||
|
|
||||||
private int registrationCount = 0;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
|
|
||||||
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
|
|
||||||
return new TestNodeStatusUpdaterImpl(context, dispatcher,
|
|
||||||
healthChecker, metrics);
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getNMRegistrationCount() {
|
|
||||||
return registrationCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
class TestNodeStatusUpdaterImpl extends MockNodeStatusUpdater {
|
|
||||||
|
|
||||||
public TestNodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
|
|
||||||
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
|
|
||||||
super(context, dispatcher, healthChecker, metrics);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void registerWithRM() throws YarnRemoteException {
|
|
||||||
super.registerWithRM();
|
|
||||||
registrationCount++;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void rebootNodeStatusUpdater() {
|
|
||||||
ConcurrentMap<ContainerId, org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container> containers =
|
|
||||||
getNMContext().getContainers();
|
|
||||||
// ensure that containers are empty before restart nodeStatusUpdater
|
|
||||||
Assert.assertTrue(containers.isEmpty());
|
|
||||||
super.rebootNodeStatusUpdater();
|
|
||||||
try {
|
|
||||||
syncBarrier.await();
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
} catch (BrokenBarrierException e) {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.yarn.YarnException;
|
import org.apache.hadoop.yarn.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.api.ContainerManager;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
|
|
@ -156,7 +156,13 @@ public abstract class BaseContainerManagerTest {
|
||||||
dirsHandler = nodeHealthChecker.getDiskHandler();
|
dirsHandler = nodeHealthChecker.getDiskHandler();
|
||||||
containerManager =
|
containerManager =
|
||||||
new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater,
|
new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater,
|
||||||
metrics, new ApplicationACLsManager(conf), dirsHandler);
|
metrics, new ApplicationACLsManager(conf), dirsHandler) {
|
||||||
|
@Override
|
||||||
|
public void setBlockNewContainerRequests(
|
||||||
|
boolean blockNewContainerRequests) {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
};
|
||||||
containerManager.init(conf);
|
containerManager.init(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,9 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager;
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager;
|
||||||
|
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.BufferedReader;
|
import java.io.BufferedReader;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileReader;
|
import java.io.FileReader;
|
||||||
|
@ -49,13 +52,18 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.URL;
|
import org.apache.hadoop.yarn.api.records.URL;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||||
|
import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
|
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.LocalRMInterface;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
||||||
|
@ -63,7 +71,6 @@ import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import static org.mockito.Mockito.*;
|
|
||||||
|
|
||||||
public class TestContainerManager extends BaseContainerManagerTest {
|
public class TestContainerManager extends BaseContainerManagerTest {
|
||||||
|
|
||||||
|
@ -411,7 +418,13 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
||||||
|
|
||||||
containerManager =
|
containerManager =
|
||||||
new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater,
|
new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater,
|
||||||
metrics, new ApplicationACLsManager(conf), dirsHandler);
|
metrics, new ApplicationACLsManager(conf), dirsHandler) {
|
||||||
|
@Override
|
||||||
|
public void setBlockNewContainerRequests(
|
||||||
|
boolean blockNewContainerRequests) {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
};
|
||||||
containerManager.init(conf);
|
containerManager.init(conf);
|
||||||
containerManager.start();
|
containerManager.start();
|
||||||
|
|
||||||
|
@ -524,4 +537,77 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
||||||
Assert.assertFalse(targetFile.getAbsolutePath() + " exists!!",
|
Assert.assertFalse(targetFile.getAbsolutePath() + " exists!!",
|
||||||
targetFile.exists());
|
targetFile.exists());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testContainerLaunchFromPreviousRM() throws IOException,
|
||||||
|
InterruptedException {
|
||||||
|
// There is no real RM registration, simulate and set RMIdentifier
|
||||||
|
NodeStatusUpdater nodeStatusUpdater = mock(NodeStatusUpdater.class);
|
||||||
|
when(nodeStatusUpdater.getRMIdentifier()).thenReturn((long) 1234);
|
||||||
|
containerManager =
|
||||||
|
new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater,
|
||||||
|
metrics, new ApplicationACLsManager(conf), dirsHandler) {
|
||||||
|
@Override
|
||||||
|
public void setBlockNewContainerRequests(
|
||||||
|
boolean blockNewContainerRequests) {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
};
|
||||||
|
containerManager.init(conf);
|
||||||
|
containerManager.start();
|
||||||
|
|
||||||
|
ContainerLaunchContext containerLaunchContext =
|
||||||
|
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||||
|
|
||||||
|
ContainerId cId1 = createContainerId();
|
||||||
|
ContainerId cId2 = createContainerId();
|
||||||
|
containerLaunchContext.setUser(user);
|
||||||
|
containerLaunchContext
|
||||||
|
.setLocalResources(new HashMap<String, LocalResource>());
|
||||||
|
containerLaunchContext.setUser(containerLaunchContext.getUser());
|
||||||
|
Resource mockResource = mock(Resource.class);
|
||||||
|
|
||||||
|
Container mockContainer1 = mock(Container.class);
|
||||||
|
when(mockContainer1.getId()).thenReturn(cId1);
|
||||||
|
// Construct the Container with Invalid RMIdentifier
|
||||||
|
when(mockContainer1.getRMIdentifer()).thenReturn(
|
||||||
|
(long) ResourceManagerConstants.RM_INVALID_IDENTIFIER);
|
||||||
|
StartContainerRequest startRequest1 =
|
||||||
|
recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||||
|
startRequest1.setContainerLaunchContext(containerLaunchContext);
|
||||||
|
startRequest1.setContainer(mockContainer1);
|
||||||
|
boolean catchException = false;
|
||||||
|
try {
|
||||||
|
containerManager.startContainer(startRequest1);
|
||||||
|
} catch (YarnRemoteException e) {
|
||||||
|
catchException = true;
|
||||||
|
Assert.assertTrue(e.getMessage().contains(
|
||||||
|
"Container " + cId1 + " rejected as it is allocated by a previous RM"));
|
||||||
|
// TO DO: This should be replaced to explicitly check exception
|
||||||
|
// class name after YARN-142
|
||||||
|
Assert.assertTrue(e.getRemoteTrace().contains(
|
||||||
|
InvalidContainerException.class.getName()));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify that startContainer fail because of invalid container request
|
||||||
|
Assert.assertTrue(catchException);
|
||||||
|
|
||||||
|
// Construct the Container with a RMIdentifier within current RM
|
||||||
|
Container mockContainer2 = mock(Container.class);
|
||||||
|
when(mockContainer2.getId()).thenReturn(cId2);
|
||||||
|
when(mockContainer2.getRMIdentifer()).thenReturn((long) 1234);
|
||||||
|
when(mockContainer2.getResource()).thenReturn(mockResource);
|
||||||
|
StartContainerRequest startRequest2 =
|
||||||
|
recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||||
|
startRequest2.setContainerLaunchContext(containerLaunchContext);
|
||||||
|
startRequest2.setContainer(mockContainer2);
|
||||||
|
boolean noException = true;
|
||||||
|
try {
|
||||||
|
containerManager.startContainer(startRequest2);
|
||||||
|
} catch (YarnRemoteException e) {
|
||||||
|
noException = false;
|
||||||
|
}
|
||||||
|
// Verify that startContainer get no YarnRemoteException
|
||||||
|
Assert.assertTrue(noException);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -196,6 +196,7 @@ public class ResourceTrackerService extends AbstractService implements
|
||||||
+ capability + ", assigned nodeId " + nodeId);
|
+ capability + ", assigned nodeId " + nodeId);
|
||||||
|
|
||||||
response.setNodeAction(NodeAction.NORMAL);
|
response.setNodeAction(NodeAction.NORMAL);
|
||||||
|
response.setRMIdentifier(ResourceManager.clusterTimeStamp);
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
|
@ -1243,7 +1244,7 @@ public class LeafQueue implements CSQueue {
|
||||||
// Create the container
|
// Create the container
|
||||||
Container container = BuilderUtils.newContainer(containerId, nodeId,
|
Container container = BuilderUtils.newContainer(containerId, nodeId,
|
||||||
node.getRMNode().getHttpAddress(), capability, priority,
|
node.getRMNode().getHttpAddress(), capability, priority,
|
||||||
null);
|
null, ResourceManager.clusterTimeStamp);
|
||||||
|
|
||||||
return container;
|
return container;
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||||
|
@ -173,7 +174,7 @@ public class AppSchedulable extends Schedulable {
|
||||||
// Create the container
|
// Create the container
|
||||||
Container container = BuilderUtils.newContainer(containerId, nodeId,
|
Container container = BuilderUtils.newContainer(containerId, nodeId,
|
||||||
node.getRMNode().getHttpAddress(), capability, priority,
|
node.getRMNode().getHttpAddress(), capability, priority,
|
||||||
containerToken);
|
containerToken, ResourceManager.clusterTimeStamp);
|
||||||
|
|
||||||
return container;
|
return container;
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
|
||||||
|
@ -565,7 +566,7 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
|
||||||
// Create the container
|
// Create the container
|
||||||
Container container = BuilderUtils.newContainer(containerId, nodeId,
|
Container container = BuilderUtils.newContainer(containerId, nodeId,
|
||||||
node.getRMNode().getHttpAddress(), capability, priority,
|
node.getRMNode().getHttpAddress(), capability, priority,
|
||||||
containerToken);
|
containerToken, ResourceManager.clusterTimeStamp);
|
||||||
|
|
||||||
// Allocate!
|
// Allocate!
|
||||||
|
|
||||||
|
|
|
@ -188,6 +188,7 @@ public class NodeManager implements ContainerManager {
|
||||||
this.nodeId, nodeHttpAddress,
|
this.nodeId, nodeHttpAddress,
|
||||||
requestContainer.getResource(),
|
requestContainer.getResource(),
|
||||||
null, null // DKDC - Doesn't matter
|
null, null // DKDC - Doesn't matter
|
||||||
|
, 0
|
||||||
);
|
);
|
||||||
|
|
||||||
ContainerStatus containerStatus =
|
ContainerStatus containerStatus =
|
||||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
||||||
|
@ -267,6 +268,21 @@ public class TestResourceTrackerService {
|
||||||
Assert.assertEquals(NodeAction.SHUTDOWN,response.getNodeAction());
|
Assert.assertEquals(NodeAction.SHUTDOWN,response.getNodeAction());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetRMIdentifierInRegistration() throws Exception {
|
||||||
|
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
rm = new MockRM(conf);
|
||||||
|
rm.start();
|
||||||
|
|
||||||
|
MockNM nm = new MockNM("host1:1234", 5120, rm.getResourceTrackerService());
|
||||||
|
RegisterNodeManagerResponse response = nm.registerNode();
|
||||||
|
|
||||||
|
// Verify the RMIdentifier is correctly set in RegisterNodeManagerResponse
|
||||||
|
Assert.assertEquals(ResourceManager.clusterTimeStamp,
|
||||||
|
response.getRMIdentifier());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testReboot() throws Exception {
|
public void testReboot() throws Exception {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
|
|
|
@ -69,7 +69,7 @@ public class TestRMContainerImpl {
|
||||||
Priority priority = BuilderUtils.newPriority(5);
|
Priority priority = BuilderUtils.newPriority(5);
|
||||||
|
|
||||||
Container container = BuilderUtils.newContainer(containerId, nodeId,
|
Container container = BuilderUtils.newContainer(containerId, nodeId,
|
||||||
"host:3465", resource, priority, null);
|
"host:3465", resource, priority, null, 0);
|
||||||
|
|
||||||
RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
|
RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
|
||||||
nodeId, eventHandler, expirer);
|
nodeId, eventHandler, expirer);
|
||||||
|
@ -139,7 +139,7 @@ public class TestRMContainerImpl {
|
||||||
Priority priority = BuilderUtils.newPriority(5);
|
Priority priority = BuilderUtils.newPriority(5);
|
||||||
|
|
||||||
Container container = BuilderUtils.newContainer(containerId, nodeId,
|
Container container = BuilderUtils.newContainer(containerId, nodeId,
|
||||||
"host:3465", resource, priority, null);
|
"host:3465", resource, priority, null, 0);
|
||||||
|
|
||||||
RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
|
RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
|
||||||
nodeId, eventHandler, expirer);
|
nodeId, eventHandler, expirer);
|
||||||
|
|
|
@ -360,7 +360,7 @@ public class TestContainerManagerSecurity {
|
||||||
Container container =
|
Container container =
|
||||||
BuilderUtils.newContainer(newTokenId.getContainerID(), null, null,
|
BuilderUtils.newContainer(newTokenId.getContainerID(), null, null,
|
||||||
BuilderUtils.newResource(newTokenId.getResource().getMemory(),
|
BuilderUtils.newResource(newTokenId.getResource().getMemory(),
|
||||||
newTokenId.getResource().getVirtualCores()), null, null);
|
newTokenId.getResource().getVirtualCores()), null, null, 0);
|
||||||
StartContainerRequest request = Records.newRecord(StartContainerRequest.class);
|
StartContainerRequest request = Records.newRecord(StartContainerRequest.class);
|
||||||
request.setContainerLaunchContext(context);
|
request.setContainerLaunchContext(context);
|
||||||
request.setContainer(container);
|
request.setContainer(container);
|
||||||
|
@ -547,7 +547,7 @@ public class TestContainerManagerSecurity {
|
||||||
createContainerLaunchContextForTest(tokenId);
|
createContainerLaunchContextForTest(tokenId);
|
||||||
Container container =
|
Container container =
|
||||||
BuilderUtils.newContainer(tokenId.getContainerID(), null, null,
|
BuilderUtils.newContainer(tokenId.getContainerID(), null, null,
|
||||||
BuilderUtils.newResource(2048, 1), null, null);
|
BuilderUtils.newResource(2048, 1), null, null, 0);
|
||||||
request.setContainerLaunchContext(context);
|
request.setContainerLaunchContext(context);
|
||||||
request.setContainer(container);
|
request.setContainer(container);
|
||||||
try {
|
try {
|
||||||
|
@ -575,7 +575,7 @@ public class TestContainerManagerSecurity {
|
||||||
Container container =
|
Container container =
|
||||||
BuilderUtils.newContainer(tokenId.getContainerID(), null, null,
|
BuilderUtils.newContainer(tokenId.getContainerID(), null, null,
|
||||||
BuilderUtils.newResource(tokenId.getResource().getMemory(), tokenId
|
BuilderUtils.newResource(tokenId.getResource().getMemory(), tokenId
|
||||||
.getResource().getVirtualCores()), null, null);
|
.getResource().getVirtualCores()), null, null, 0);
|
||||||
request.setContainerLaunchContext(context);
|
request.setContainerLaunchContext(context);
|
||||||
request.setContainer(container);
|
request.setContainer(container);
|
||||||
try {
|
try {
|
||||||
|
|
Loading…
Reference in New Issue