Merge r1603028 from trunk. YARN-1885. Fixed a bug that RM may not send application-clean-up signal to NMs where the completed applications previously ran in case of RM restart. Contributed by Wangda Tan
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1603030 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a18dce22b9
commit
d79919f175
|
@ -233,6 +233,10 @@ Release 2.5.0 - UNRELEASED
|
||||||
YARN-2155. FairScheduler: Incorrect threshold check for preemption.
|
YARN-2155. FairScheduler: Incorrect threshold check for preemption.
|
||||||
(Wei Yan via kasha)
|
(Wei Yan via kasha)
|
||||||
|
|
||||||
|
YARN-1885. Fixed a bug that RM may not send application-clean-up signal
|
||||||
|
to NMs where the completed applications previously ran in case of RM restart.
|
||||||
|
(Wangda Tan via jianhe)
|
||||||
|
|
||||||
Release 2.4.1 - 2014-06-23
|
Release 2.4.1 - 2014-06-23
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -60,7 +60,7 @@ public class TestResourceTrackerOnHA extends ProtocolHATestBase{
|
||||||
// make sure registerNodeManager works when failover happens
|
// make sure registerNodeManager works when failover happens
|
||||||
RegisterNodeManagerRequest request =
|
RegisterNodeManagerRequest request =
|
||||||
RegisterNodeManagerRequest.newInstance(nodeId, 0, resource,
|
RegisterNodeManagerRequest.newInstance(nodeId, 0, resource,
|
||||||
YarnVersionInfo.getVersion(), null);
|
YarnVersionInfo.getVersion(), null, null);
|
||||||
resourceTracker.registerNodeManager(request);
|
resourceTracker.registerNodeManager(request);
|
||||||
Assert.assertTrue(waitForNodeManagerToConnect(10000, nodeId));
|
Assert.assertTrue(waitForNodeManagerToConnect(10000, nodeId));
|
||||||
|
|
||||||
|
|
|
@ -20,15 +20,17 @@ package org.apache.hadoop.yarn.server.api.protocolrecords;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
|
||||||
public abstract class RegisterNodeManagerRequest {
|
public abstract class RegisterNodeManagerRequest {
|
||||||
|
|
||||||
public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
|
public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
|
||||||
int httpPort, Resource resource, String nodeManagerVersionId,
|
int httpPort, Resource resource, String nodeManagerVersionId,
|
||||||
List<NMContainerStatus> containerStatuses) {
|
List<NMContainerStatus> containerStatuses,
|
||||||
|
List<ApplicationId> runningApplications) {
|
||||||
RegisterNodeManagerRequest request =
|
RegisterNodeManagerRequest request =
|
||||||
Records.newRecord(RegisterNodeManagerRequest.class);
|
Records.newRecord(RegisterNodeManagerRequest.class);
|
||||||
request.setHttpPort(httpPort);
|
request.setHttpPort(httpPort);
|
||||||
|
@ -36,6 +38,7 @@ public abstract class RegisterNodeManagerRequest {
|
||||||
request.setNodeId(nodeId);
|
request.setNodeId(nodeId);
|
||||||
request.setNMVersion(nodeManagerVersionId);
|
request.setNMVersion(nodeManagerVersionId);
|
||||||
request.setContainerStatuses(containerStatuses);
|
request.setContainerStatuses(containerStatuses);
|
||||||
|
request.setRunningApplications(runningApplications);
|
||||||
return request;
|
return request;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -45,10 +48,30 @@ public abstract class RegisterNodeManagerRequest {
|
||||||
public abstract String getNMVersion();
|
public abstract String getNMVersion();
|
||||||
public abstract List<NMContainerStatus> getNMContainerStatuses();
|
public abstract List<NMContainerStatus> getNMContainerStatuses();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* We introduce this here because currently YARN RM doesn't persist nodes info
|
||||||
|
* for application running. When RM restart happened, we cannot determinate if
|
||||||
|
* a node should do application cleanup (like log-aggregation, status update,
|
||||||
|
* etc.) or not. <p/>
|
||||||
|
* When we have this running application list in node manager register
|
||||||
|
* request, we can recover nodes info for running applications. And then we
|
||||||
|
* can take actions accordingly
|
||||||
|
*
|
||||||
|
* @return running application list in this node
|
||||||
|
*/
|
||||||
|
public abstract List<ApplicationId> getRunningApplications();
|
||||||
|
|
||||||
public abstract void setNodeId(NodeId nodeId);
|
public abstract void setNodeId(NodeId nodeId);
|
||||||
public abstract void setHttpPort(int port);
|
public abstract void setHttpPort(int port);
|
||||||
public abstract void setResource(Resource resource);
|
public abstract void setResource(Resource resource);
|
||||||
public abstract void setNMVersion(String version);
|
public abstract void setNMVersion(String version);
|
||||||
public abstract void setContainerStatuses(
|
public abstract void setContainerStatuses(
|
||||||
List<NMContainerStatus> containerStatuses);
|
List<NMContainerStatus> containerStatuses);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Setter for {@link RegisterNodeManagerRequest#getRunningApplications()}
|
||||||
|
* @param runningApplications running application in this node
|
||||||
|
*/
|
||||||
|
public abstract void setRunningApplications(
|
||||||
|
List<ApplicationId> runningApplications);
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,12 +20,23 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
|
||||||
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto;
|
||||||
|
@ -44,6 +55,7 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
|
||||||
private Resource resource = null;
|
private Resource resource = null;
|
||||||
private NodeId nodeId = null;
|
private NodeId nodeId = null;
|
||||||
private List<NMContainerStatus> containerStatuses = null;
|
private List<NMContainerStatus> containerStatuses = null;
|
||||||
|
private List<ApplicationId> runningApplications = null;
|
||||||
|
|
||||||
public RegisterNodeManagerRequestPBImpl() {
|
public RegisterNodeManagerRequestPBImpl() {
|
||||||
builder = RegisterNodeManagerRequestProto.newBuilder();
|
builder = RegisterNodeManagerRequestProto.newBuilder();
|
||||||
|
@ -65,6 +77,9 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
|
||||||
if (this.containerStatuses != null) {
|
if (this.containerStatuses != null) {
|
||||||
addNMContainerStatusesToProto();
|
addNMContainerStatusesToProto();
|
||||||
}
|
}
|
||||||
|
if (this.runningApplications != null) {
|
||||||
|
addRunningApplicationsToProto();
|
||||||
|
}
|
||||||
if (this.resource != null) {
|
if (this.resource != null) {
|
||||||
builder.setResource(convertToProtoFormat(this.resource));
|
builder.setResource(convertToProtoFormat(this.resource));
|
||||||
}
|
}
|
||||||
|
@ -158,6 +173,66 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
|
||||||
maybeInitBuilder();
|
maybeInitBuilder();
|
||||||
builder.setHttpPort(httpPort);
|
builder.setHttpPort(httpPort);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<ApplicationId> getRunningApplications() {
|
||||||
|
initRunningApplications();
|
||||||
|
return runningApplications;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initRunningApplications() {
|
||||||
|
if (this.runningApplications != null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
List<ApplicationIdProto> list = p.getRunningApplicationsList();
|
||||||
|
this.runningApplications = new ArrayList<ApplicationId>();
|
||||||
|
for (ApplicationIdProto c : list) {
|
||||||
|
this.runningApplications.add(convertFromProtoFormat(c));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setRunningApplications(List<ApplicationId> apps) {
|
||||||
|
if (apps == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
initRunningApplications();
|
||||||
|
this.runningApplications.addAll(apps);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addRunningApplicationsToProto() {
|
||||||
|
maybeInitBuilder();
|
||||||
|
builder.clearRunningApplications();
|
||||||
|
if (runningApplications == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Iterable<ApplicationIdProto> it = new Iterable<ApplicationIdProto>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Iterator<ApplicationIdProto> iterator() {
|
||||||
|
return new Iterator<ApplicationIdProto>() {
|
||||||
|
Iterator<ApplicationId> iter = runningApplications.iterator();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasNext() {
|
||||||
|
return iter.hasNext();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ApplicationIdProto next() {
|
||||||
|
return convertToProtoFormat(iter.next());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void remove() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
};
|
||||||
|
builder.addAllRunningApplications(it);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<NMContainerStatus> getNMContainerStatuses() {
|
public List<NMContainerStatus> getNMContainerStatuses() {
|
||||||
|
@ -216,6 +291,14 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
|
||||||
maybeInitBuilder();
|
maybeInitBuilder();
|
||||||
builder.setNmVersion(version);
|
builder.setNmVersion(version);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
|
||||||
|
return new ApplicationIdPBImpl(p);
|
||||||
|
}
|
||||||
|
|
||||||
|
private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
|
||||||
|
return ((ApplicationIdPBImpl)t).getProto();
|
||||||
|
}
|
||||||
|
|
||||||
private NodeIdPBImpl convertFromProtoFormat(NodeIdProto p) {
|
private NodeIdPBImpl convertFromProtoFormat(NodeIdProto p) {
|
||||||
return new NodeIdPBImpl(p);
|
return new NodeIdPBImpl(p);
|
||||||
|
|
|
@ -31,6 +31,7 @@ message RegisterNodeManagerRequestProto {
|
||||||
optional ResourceProto resource = 4;
|
optional ResourceProto resource = 4;
|
||||||
optional string nm_version = 5;
|
optional string nm_version = 5;
|
||||||
repeated NMContainerStatusProto container_statuses = 6;
|
repeated NMContainerStatusProto container_statuses = 6;
|
||||||
|
repeated ApplicationIdProto runningApplications = 7;
|
||||||
}
|
}
|
||||||
|
|
||||||
message RegisterNodeManagerResponseProto {
|
message RegisterNodeManagerResponseProto {
|
||||||
|
@ -66,4 +67,4 @@ message NMContainerStatusProto {
|
||||||
optional PriorityProto priority = 4;
|
optional PriorityProto priority = 4;
|
||||||
optional string diagnostics = 5 [default = "N/A"];
|
optional string diagnostics = 5 [default = "N/A"];
|
||||||
optional int32 container_exit_status = 6;
|
optional int32 container_exit_status = 6;
|
||||||
}
|
}
|
||||||
|
|
|
@ -83,7 +83,8 @@ public class TestProtocolRecords {
|
||||||
RegisterNodeManagerRequest request =
|
RegisterNodeManagerRequest request =
|
||||||
RegisterNodeManagerRequest.newInstance(
|
RegisterNodeManagerRequest.newInstance(
|
||||||
NodeId.newInstance("1.1.1.1", 1000), 8080,
|
NodeId.newInstance("1.1.1.1", 1000), 8080,
|
||||||
Resource.newInstance(1024, 1), "NM-version-id", reports);
|
Resource.newInstance(1024, 1), "NM-version-id", reports,
|
||||||
|
Arrays.asList(appId));
|
||||||
RegisterNodeManagerRequest requestProto =
|
RegisterNodeManagerRequest requestProto =
|
||||||
new RegisterNodeManagerRequestPBImpl(
|
new RegisterNodeManagerRequestPBImpl(
|
||||||
((RegisterNodeManagerRequestPBImpl) request).getProto());
|
((RegisterNodeManagerRequestPBImpl) request).getProto());
|
||||||
|
@ -95,5 +96,7 @@ public class TestProtocolRecords {
|
||||||
requestProto.getNodeId());
|
requestProto.getNodeId());
|
||||||
Assert.assertEquals(Resource.newInstance(1024, 1),
|
Assert.assertEquals(Resource.newInstance(1024, 1),
|
||||||
requestProto.getResource());
|
requestProto.getResource());
|
||||||
|
Assert.assertEquals(1, requestProto.getRunningApplications().size());
|
||||||
|
Assert.assertEquals(appId, requestProto.getRunningApplications().get(0));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,81 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.api.protocolrecords;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestRegisterNodeManagerRequest {
|
||||||
|
@Test
|
||||||
|
public void testRegisterNodeManagerRequest() {
|
||||||
|
RegisterNodeManagerRequest request =
|
||||||
|
RegisterNodeManagerRequest.newInstance(
|
||||||
|
NodeId.newInstance("host", 1234), 1234, Resource.newInstance(0, 0),
|
||||||
|
"version", Arrays.asList(NMContainerStatus.newInstance(
|
||||||
|
ContainerId.newInstance(
|
||||||
|
ApplicationAttemptId.newInstance(
|
||||||
|
ApplicationId.newInstance(1234L, 1), 1), 1),
|
||||||
|
ContainerState.RUNNING, Resource.newInstance(1024, 1), "good",
|
||||||
|
-1)), Arrays.asList(ApplicationId.newInstance(1234L, 1),
|
||||||
|
ApplicationId.newInstance(1234L, 2)));
|
||||||
|
|
||||||
|
// serialze to proto, and get request from proto
|
||||||
|
RegisterNodeManagerRequest request1 =
|
||||||
|
new RegisterNodeManagerRequestPBImpl(
|
||||||
|
((RegisterNodeManagerRequestPBImpl) request).getProto());
|
||||||
|
|
||||||
|
// check values
|
||||||
|
Assert.assertEquals(request1.getNMContainerStatuses().size(), request
|
||||||
|
.getNMContainerStatuses().size());
|
||||||
|
Assert.assertEquals(request1.getNMContainerStatuses().get(0).getContainerId(),
|
||||||
|
request.getNMContainerStatuses().get(0).getContainerId());
|
||||||
|
Assert.assertEquals(request1.getRunningApplications().size(), request
|
||||||
|
.getRunningApplications().size());
|
||||||
|
Assert.assertEquals(request1.getRunningApplications().get(0), request
|
||||||
|
.getRunningApplications().get(0));
|
||||||
|
Assert.assertEquals(request1.getRunningApplications().get(1), request
|
||||||
|
.getRunningApplications().get(1));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRegisterNodeManagerRequestWithNullArrays() {
|
||||||
|
RegisterNodeManagerRequest request =
|
||||||
|
RegisterNodeManagerRequest.newInstance(NodeId.newInstance("host", 1234),
|
||||||
|
1234, Resource.newInstance(0, 0), "version", null, null);
|
||||||
|
|
||||||
|
// serialze to proto, and get request from proto
|
||||||
|
RegisterNodeManagerRequest request1 =
|
||||||
|
new RegisterNodeManagerRequestPBImpl(
|
||||||
|
((RegisterNodeManagerRequestPBImpl) request).getProto());
|
||||||
|
|
||||||
|
// check values
|
||||||
|
Assert.assertEquals(0, request1.getNMContainerStatuses().size());
|
||||||
|
Assert.assertEquals(0, request1.getRunningApplications().size());
|
||||||
|
}
|
||||||
|
}
|
|
@ -250,7 +250,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
List<NMContainerStatus> containerReports = getNMContainerStatuses();
|
List<NMContainerStatus> containerReports = getNMContainerStatuses();
|
||||||
RegisterNodeManagerRequest request =
|
RegisterNodeManagerRequest request =
|
||||||
RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
|
RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
|
||||||
nodeManagerVersionId, containerReports);
|
nodeManagerVersionId, containerReports, getRunningApplications());
|
||||||
if (containerReports != null) {
|
if (containerReports != null) {
|
||||||
LOG.info("Registering with RM using containers :" + containerReports);
|
LOG.info("Registering with RM using containers :" + containerReports);
|
||||||
}
|
}
|
||||||
|
@ -374,6 +374,12 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
}
|
}
|
||||||
return containerStatuses;
|
return containerStatuses;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private List<ApplicationId> getRunningApplications() {
|
||||||
|
List<ApplicationId> runningApplications = new ArrayList<ApplicationId>();
|
||||||
|
runningApplications.addAll(this.context.getApplications().keySet());
|
||||||
|
return runningApplications;
|
||||||
|
}
|
||||||
|
|
||||||
// These NMContainerStatus are sent on NM registration and used by YARN only.
|
// These NMContainerStatus are sent on NM registration and used by YARN only.
|
||||||
private List<NMContainerStatus> getNMContainerStatuses() {
|
private List<NMContainerStatus> getNMContainerStatuses() {
|
||||||
|
|
|
@ -244,15 +244,6 @@ public class ResourceTrackerService extends AbstractService implements
|
||||||
Resource capability = request.getResource();
|
Resource capability = request.getResource();
|
||||||
String nodeManagerVersion = request.getNMVersion();
|
String nodeManagerVersion = request.getNMVersion();
|
||||||
|
|
||||||
if (!rmContext.isWorkPreservingRecoveryEnabled()) {
|
|
||||||
if (!request.getNMContainerStatuses().isEmpty()) {
|
|
||||||
LOG.info("received container statuses on node manager register :"
|
|
||||||
+ request.getNMContainerStatuses());
|
|
||||||
for (NMContainerStatus status : request.getNMContainerStatuses()) {
|
|
||||||
handleNMContainerStatus(status);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
RegisterNodeManagerResponse response = recordFactory
|
RegisterNodeManagerResponse response = recordFactory
|
||||||
.newRecordInstance(RegisterNodeManagerResponse.class);
|
.newRecordInstance(RegisterNodeManagerResponse.class);
|
||||||
|
|
||||||
|
@ -311,7 +302,8 @@ public class ResourceTrackerService extends AbstractService implements
|
||||||
RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
|
RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
|
||||||
if (oldNode == null) {
|
if (oldNode == null) {
|
||||||
this.rmContext.getDispatcher().getEventHandler().handle(
|
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||||
new RMNodeStartedEvent(nodeId, request.getNMContainerStatuses()));
|
new RMNodeStartedEvent(nodeId, request.getNMContainerStatuses(),
|
||||||
|
request.getRunningApplications()));
|
||||||
} else {
|
} else {
|
||||||
LOG.info("Reconnect from the node at: " + host);
|
LOG.info("Reconnect from the node at: " + host);
|
||||||
this.nmLivelinessMonitor.unregister(nodeId);
|
this.nmLivelinessMonitor.unregister(nodeId);
|
||||||
|
@ -322,6 +314,18 @@ public class ResourceTrackerService extends AbstractService implements
|
||||||
// present for any running application.
|
// present for any running application.
|
||||||
this.nmTokenSecretManager.removeNodeKey(nodeId);
|
this.nmTokenSecretManager.removeNodeKey(nodeId);
|
||||||
this.nmLivelinessMonitor.register(nodeId);
|
this.nmLivelinessMonitor.register(nodeId);
|
||||||
|
|
||||||
|
// Handle received container status, this should be processed after new
|
||||||
|
// RMNode inserted
|
||||||
|
if (!rmContext.isWorkPreservingRecoveryEnabled()) {
|
||||||
|
if (!request.getNMContainerStatuses().isEmpty()) {
|
||||||
|
LOG.info("received container statuses on node manager register :"
|
||||||
|
+ request.getNMContainerStatuses());
|
||||||
|
for (NMContainerStatus status : request.getNMContainerStatuses()) {
|
||||||
|
handleNMContainerStatus(status);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
String message =
|
String message =
|
||||||
"NodeManager from node " + host + "(cmPort: " + cmPort + " httpPort: "
|
"NodeManager from node " + host + "(cmPort: " + cmPort + " httpPort: "
|
||||||
|
|
|
@ -19,16 +19,16 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
|
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
|
@ -208,6 +208,14 @@ public interface RMApp extends EventHandler<RMAppEvent> {
|
||||||
* @return the flag indicating whether the applications's state is stored.
|
* @return the flag indicating whether the applications's state is stored.
|
||||||
*/
|
*/
|
||||||
boolean isAppFinalStateStored();
|
boolean isAppFinalStateStored();
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Nodes on which the containers for this {@link RMApp} ran.
|
||||||
|
* @return the set of nodes that ran any containers from this {@link RMApp}
|
||||||
|
* Add more node on which containers for this {@link RMApp} ran
|
||||||
|
*/
|
||||||
|
Set<NodeId> getRanNodes();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create the external user-facing state of ApplicationMaster from the
|
* Create the external user-facing state of ApplicationMaster from the
|
||||||
|
|
|
@ -38,6 +38,9 @@ public enum RMAppEventType {
|
||||||
ATTEMPT_FAILED,
|
ATTEMPT_FAILED,
|
||||||
ATTEMPT_KILLED,
|
ATTEMPT_KILLED,
|
||||||
NODE_UPDATE,
|
NODE_UPDATE,
|
||||||
|
|
||||||
|
// Source: Container and ResourceTracker
|
||||||
|
APP_RUNNING_ON_NODE,
|
||||||
|
|
||||||
// Source: RMStateStore
|
// Source: RMStateStore
|
||||||
APP_NEW_SAVED,
|
APP_NEW_SAVED,
|
||||||
|
|
|
@ -25,6 +25,7 @@ import java.util.HashSet;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentSkipListSet;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
||||||
|
@ -71,7 +72,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
|
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
|
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
|
||||||
import org.apache.hadoop.yarn.state.MultipleArcTransition;
|
import org.apache.hadoop.yarn.state.MultipleArcTransition;
|
||||||
|
@ -116,6 +116,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
private EventHandler handler;
|
private EventHandler handler;
|
||||||
private static final AppFinishedTransition FINISHED_TRANSITION =
|
private static final AppFinishedTransition FINISHED_TRANSITION =
|
||||||
new AppFinishedTransition();
|
new AppFinishedTransition();
|
||||||
|
private Set<NodeId> ranNodes = new ConcurrentSkipListSet<NodeId>();
|
||||||
|
|
||||||
// These states stored are only valid when app is at killing or final_saving.
|
// These states stored are only valid when app is at killing or final_saving.
|
||||||
private RMAppState stateBeforeKilling;
|
private RMAppState stateBeforeKilling;
|
||||||
|
@ -180,7 +181,6 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
new FinalSavingTransition(
|
new FinalSavingTransition(
|
||||||
new AppKilledTransition(), RMAppState.KILLED))
|
new AppKilledTransition(), RMAppState.KILLED))
|
||||||
|
|
||||||
|
|
||||||
// Transitions from ACCEPTED state
|
// Transitions from ACCEPTED state
|
||||||
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
|
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
|
||||||
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
|
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
|
||||||
|
@ -200,6 +200,9 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
new FinalSavingTransition(FINISHED_TRANSITION, RMAppState.FINISHED))
|
new FinalSavingTransition(FINISHED_TRANSITION, RMAppState.FINISHED))
|
||||||
.addTransition(RMAppState.ACCEPTED, RMAppState.KILLING,
|
.addTransition(RMAppState.ACCEPTED, RMAppState.KILLING,
|
||||||
RMAppEventType.KILL, new KillAttemptTransition())
|
RMAppEventType.KILL, new KillAttemptTransition())
|
||||||
|
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
|
||||||
|
RMAppEventType.APP_RUNNING_ON_NODE,
|
||||||
|
new AppRunningOnNodeTransition())
|
||||||
// ACCECPTED state can once again receive APP_ACCEPTED event, because on
|
// ACCECPTED state can once again receive APP_ACCEPTED event, because on
|
||||||
// recovery the app returns ACCEPTED state and the app once again go
|
// recovery the app returns ACCEPTED state and the app once again go
|
||||||
// through the scheduler and triggers one more APP_ACCEPTED event at
|
// through the scheduler and triggers one more APP_ACCEPTED event at
|
||||||
|
@ -220,6 +223,9 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
.addTransition(RMAppState.RUNNING, RMAppState.FINISHED,
|
.addTransition(RMAppState.RUNNING, RMAppState.FINISHED,
|
||||||
// UnManagedAM directly jumps to finished
|
// UnManagedAM directly jumps to finished
|
||||||
RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
|
RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
|
||||||
|
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
|
||||||
|
RMAppEventType.APP_RUNNING_ON_NODE,
|
||||||
|
new AppRunningOnNodeTransition())
|
||||||
.addTransition(RMAppState.RUNNING,
|
.addTransition(RMAppState.RUNNING,
|
||||||
EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING),
|
EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING),
|
||||||
RMAppEventType.ATTEMPT_FAILED,
|
RMAppEventType.ATTEMPT_FAILED,
|
||||||
|
@ -235,6 +241,9 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
|
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
|
||||||
RMAppEventType.ATTEMPT_FINISHED,
|
RMAppEventType.ATTEMPT_FINISHED,
|
||||||
new AttemptFinishedAtFinalSavingTransition())
|
new AttemptFinishedAtFinalSavingTransition())
|
||||||
|
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
|
||||||
|
RMAppEventType.APP_RUNNING_ON_NODE,
|
||||||
|
new AppRunningOnNodeTransition())
|
||||||
// ignorable transitions
|
// ignorable transitions
|
||||||
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
|
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
|
||||||
EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL,
|
EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL,
|
||||||
|
@ -243,6 +252,9 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
// Transitions from FINISHING state
|
// Transitions from FINISHING state
|
||||||
.addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
|
.addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
|
||||||
RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
|
RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
|
||||||
|
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
|
||||||
|
RMAppEventType.APP_RUNNING_ON_NODE,
|
||||||
|
new AppRunningOnNodeTransition())
|
||||||
// ignorable transitions
|
// ignorable transitions
|
||||||
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
|
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
|
||||||
EnumSet.of(RMAppEventType.NODE_UPDATE,
|
EnumSet.of(RMAppEventType.NODE_UPDATE,
|
||||||
|
@ -251,6 +263,9 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
RMAppEventType.KILL))
|
RMAppEventType.KILL))
|
||||||
|
|
||||||
// Transitions from KILLING state
|
// Transitions from KILLING state
|
||||||
|
.addTransition(RMAppState.KILLING, RMAppState.KILLING,
|
||||||
|
RMAppEventType.APP_RUNNING_ON_NODE,
|
||||||
|
new AppRunningOnNodeTransition())
|
||||||
.addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING,
|
.addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING,
|
||||||
RMAppEventType.ATTEMPT_KILLED,
|
RMAppEventType.ATTEMPT_KILLED,
|
||||||
new FinalSavingTransition(
|
new FinalSavingTransition(
|
||||||
|
@ -267,6 +282,9 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
|
|
||||||
// Transitions from FINISHED state
|
// Transitions from FINISHED state
|
||||||
// ignorable transitions
|
// ignorable transitions
|
||||||
|
.addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
|
||||||
|
RMAppEventType.APP_RUNNING_ON_NODE,
|
||||||
|
new AppRunningOnNodeTransition())
|
||||||
.addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
|
.addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
|
||||||
EnumSet.of(
|
EnumSet.of(
|
||||||
RMAppEventType.NODE_UPDATE,
|
RMAppEventType.NODE_UPDATE,
|
||||||
|
@ -276,11 +294,17 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
|
|
||||||
// Transitions from FAILED state
|
// Transitions from FAILED state
|
||||||
// ignorable transitions
|
// ignorable transitions
|
||||||
|
.addTransition(RMAppState.FAILED, RMAppState.FAILED,
|
||||||
|
RMAppEventType.APP_RUNNING_ON_NODE,
|
||||||
|
new AppRunningOnNodeTransition())
|
||||||
.addTransition(RMAppState.FAILED, RMAppState.FAILED,
|
.addTransition(RMAppState.FAILED, RMAppState.FAILED,
|
||||||
EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE))
|
EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE))
|
||||||
|
|
||||||
// Transitions from KILLED state
|
// Transitions from KILLED state
|
||||||
// ignorable transitions
|
// ignorable transitions
|
||||||
|
.addTransition(RMAppState.KILLED, RMAppState.KILLED,
|
||||||
|
RMAppEventType.APP_RUNNING_ON_NODE,
|
||||||
|
new AppRunningOnNodeTransition())
|
||||||
.addTransition(
|
.addTransition(
|
||||||
RMAppState.KILLED,
|
RMAppState.KILLED,
|
||||||
RMAppState.KILLED,
|
RMAppState.KILLED,
|
||||||
|
@ -695,6 +719,23 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
nodeUpdateEvent.getNode());
|
nodeUpdateEvent.getNode());
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static final class AppRunningOnNodeTransition extends RMAppTransition {
|
||||||
|
public void transition(RMAppImpl app, RMAppEvent event) {
|
||||||
|
RMAppRunningOnNodeEvent nodeAddedEvent = (RMAppRunningOnNodeEvent) event;
|
||||||
|
|
||||||
|
// if final state already stored, notify RMNode
|
||||||
|
if (isAppInFinalState(app)) {
|
||||||
|
app.handler.handle(
|
||||||
|
new RMNodeCleanAppEvent(nodeAddedEvent.getNodeId(), nodeAddedEvent
|
||||||
|
.getApplicationId()));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// otherwise, add it to ranNodes for further process
|
||||||
|
app.ranNodes.add(nodeAddedEvent.getNodeId());
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Move an app to a new queue.
|
* Move an app to a new queue.
|
||||||
|
@ -1037,17 +1078,8 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
this.finalState = finalState;
|
this.finalState = finalState;
|
||||||
}
|
}
|
||||||
|
|
||||||
private Set<NodeId> getNodesOnWhichAttemptRan(RMAppImpl app) {
|
|
||||||
Set<NodeId> nodes = new HashSet<NodeId>();
|
|
||||||
for (RMAppAttempt attempt : app.attempts.values()) {
|
|
||||||
nodes.addAll(attempt.getRanNodes());
|
|
||||||
}
|
|
||||||
return nodes;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void transition(RMAppImpl app, RMAppEvent event) {
|
public void transition(RMAppImpl app, RMAppEvent event) {
|
||||||
Set<NodeId> nodes = getNodesOnWhichAttemptRan(app);
|
for (NodeId nodeId : app.getRanNodes()) {
|
||||||
for (NodeId nodeId : nodes) {
|
|
||||||
app.handler.handle(
|
app.handler.handle(
|
||||||
new RMNodeCleanAppEvent(nodeId, app.applicationId));
|
new RMNodeCleanAppEvent(nodeId, app.applicationId));
|
||||||
}
|
}
|
||||||
|
@ -1148,4 +1180,9 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
private RMAppState getRecoveredFinalState() {
|
private RMAppState getRecoveredFinalState() {
|
||||||
return this.recoveredFinalState;
|
return this.recoveredFinalState;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<NodeId> getRanNodes() {
|
||||||
|
return ranNodes;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,25 +16,20 @@
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event;
|
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
|
||||||
|
|
||||||
public class RMAppAttemptContainerAcquiredEvent extends RMAppAttemptEvent {
|
public class RMAppRunningOnNodeEvent extends RMAppEvent {
|
||||||
|
private final NodeId node;
|
||||||
|
|
||||||
private final Container container;
|
public RMAppRunningOnNodeEvent(ApplicationId appId, NodeId node) {
|
||||||
|
super(appId, RMAppEventType.APP_RUNNING_ON_NODE);
|
||||||
public RMAppAttemptContainerAcquiredEvent(ApplicationAttemptId appAttemptId,
|
this.node = node;
|
||||||
Container container) {
|
|
||||||
super(appAttemptId, RMAppAttemptEventType.CONTAINER_ACQUIRED);
|
|
||||||
this.container = container;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Container getContainer() {
|
public NodeId getNodeId() {
|
||||||
return this.container;
|
return node;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
|
@ -19,7 +19,6 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
|
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
import javax.crypto.SecretKey;
|
import javax.crypto.SecretKey;
|
||||||
|
|
||||||
|
@ -32,7 +31,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
|
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
@ -114,12 +112,6 @@ public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent> {
|
||||||
*/
|
*/
|
||||||
FinalApplicationStatus getFinalApplicationStatus();
|
FinalApplicationStatus getFinalApplicationStatus();
|
||||||
|
|
||||||
/**
|
|
||||||
* Nodes on which the containers for this {@link RMAppAttempt} ran.
|
|
||||||
* @return the set of nodes that ran any containers from this {@link RMAppAttempt}
|
|
||||||
*/
|
|
||||||
Set<NodeId> getRanNodes();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return a list of the last set of finished containers, resetting the
|
* Return a list of the last set of finished containers, resetting the
|
||||||
* finished containers to empty.
|
* finished containers to empty.
|
||||||
|
|
|
@ -36,7 +36,6 @@ public enum RMAppAttemptEventType {
|
||||||
UNREGISTERED,
|
UNREGISTERED,
|
||||||
|
|
||||||
// Source: Containers
|
// Source: Containers
|
||||||
CONTAINER_ACQUIRED,
|
|
||||||
CONTAINER_ALLOCATED,
|
CONTAINER_ALLOCATED,
|
||||||
CONTAINER_FINISHED,
|
CONTAINER_FINISHED,
|
||||||
|
|
||||||
|
|
|
@ -26,16 +26,13 @@ import java.net.URISyntaxException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
||||||
|
|
||||||
import javax.crypto.SecretKey;
|
import javax.crypto.SecretKey;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import org.apache.commons.lang.StringUtils;
|
import org.apache.commons.lang.StringUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -54,7 +51,6 @@ import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
|
@ -80,7 +76,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFinishedAttemptEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFinishedAttemptEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
|
||||||
|
@ -103,6 +98,8 @@ import org.apache.hadoop.yarn.state.StateMachine;
|
||||||
import org.apache.hadoop.yarn.state.StateMachineFactory;
|
import org.apache.hadoop.yarn.state.StateMachineFactory;
|
||||||
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||||
public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
|
|
||||||
|
@ -133,10 +130,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
private final ApplicationSubmissionContext submissionContext;
|
private final ApplicationSubmissionContext submissionContext;
|
||||||
private Token<AMRMTokenIdentifier> amrmToken = null;
|
private Token<AMRMTokenIdentifier> amrmToken = null;
|
||||||
private SecretKey clientTokenMasterKey = null;
|
private SecretKey clientTokenMasterKey = null;
|
||||||
|
|
||||||
//nodes on while this attempt's containers ran
|
|
||||||
private Set<NodeId> ranNodes =
|
|
||||||
new HashSet<NodeId>();
|
|
||||||
private List<ContainerStatus> justFinishedContainers =
|
private List<ContainerStatus> justFinishedContainers =
|
||||||
new ArrayList<ContainerStatus>();
|
new ArrayList<ContainerStatus>();
|
||||||
private Container masterContainer;
|
private Container masterContainer;
|
||||||
|
@ -219,10 +213,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
.addTransition(RMAppAttemptState.ALLOCATED_SAVING,
|
.addTransition(RMAppAttemptState.ALLOCATED_SAVING,
|
||||||
RMAppAttemptState.ALLOCATED,
|
RMAppAttemptState.ALLOCATED,
|
||||||
RMAppAttemptEventType.ATTEMPT_NEW_SAVED, new AttemptStoredTransition())
|
RMAppAttemptEventType.ATTEMPT_NEW_SAVED, new AttemptStoredTransition())
|
||||||
.addTransition(RMAppAttemptState.ALLOCATED_SAVING,
|
|
||||||
RMAppAttemptState.ALLOCATED_SAVING,
|
|
||||||
RMAppAttemptEventType.CONTAINER_ACQUIRED,
|
|
||||||
new ContainerAcquiredTransition())
|
|
||||||
// App could be killed by the client. So need to handle this.
|
// App could be killed by the client. So need to handle this.
|
||||||
.addTransition(RMAppAttemptState.ALLOCATED_SAVING,
|
.addTransition(RMAppAttemptState.ALLOCATED_SAVING,
|
||||||
RMAppAttemptState.FINAL_SAVING,
|
RMAppAttemptState.FINAL_SAVING,
|
||||||
|
@ -249,10 +240,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
|
RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
|
||||||
|
|
||||||
// Transitions from ALLOCATED State
|
// Transitions from ALLOCATED State
|
||||||
.addTransition(RMAppAttemptState.ALLOCATED,
|
|
||||||
RMAppAttemptState.ALLOCATED,
|
|
||||||
RMAppAttemptEventType.CONTAINER_ACQUIRED,
|
|
||||||
new ContainerAcquiredTransition())
|
|
||||||
.addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.LAUNCHED,
|
.addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.LAUNCHED,
|
||||||
RMAppAttemptEventType.LAUNCHED, new AMLaunchedTransition())
|
RMAppAttemptEventType.LAUNCHED, new AMLaunchedTransition())
|
||||||
.addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING,
|
.addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING,
|
||||||
|
@ -296,10 +283,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
RMAppAttemptEventType.STATUS_UPDATE, new StatusUpdateTransition())
|
RMAppAttemptEventType.STATUS_UPDATE, new StatusUpdateTransition())
|
||||||
.addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
|
.addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
|
||||||
RMAppAttemptEventType.CONTAINER_ALLOCATED)
|
RMAppAttemptEventType.CONTAINER_ALLOCATED)
|
||||||
.addTransition(
|
|
||||||
RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
|
|
||||||
RMAppAttemptEventType.CONTAINER_ACQUIRED,
|
|
||||||
new ContainerAcquiredTransition())
|
|
||||||
.addTransition(
|
.addTransition(
|
||||||
RMAppAttemptState.RUNNING,
|
RMAppAttemptState.RUNNING,
|
||||||
EnumSet.of(RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING),
|
EnumSet.of(RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING),
|
||||||
|
@ -337,7 +320,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
// should be fixed to reject container allocate request at Final
|
// should be fixed to reject container allocate request at Final
|
||||||
// Saving in scheduler
|
// Saving in scheduler
|
||||||
RMAppAttemptEventType.CONTAINER_ALLOCATED,
|
RMAppAttemptEventType.CONTAINER_ALLOCATED,
|
||||||
RMAppAttemptEventType.CONTAINER_ACQUIRED,
|
|
||||||
RMAppAttemptEventType.ATTEMPT_NEW_SAVED,
|
RMAppAttemptEventType.ATTEMPT_NEW_SAVED,
|
||||||
RMAppAttemptEventType.KILL))
|
RMAppAttemptEventType.KILL))
|
||||||
|
|
||||||
|
@ -619,11 +601,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public Set<NodeId> getRanNodes() {
|
|
||||||
return ranNodes;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Container getMasterContainer() {
|
public Container getMasterContainer() {
|
||||||
this.readLock.lock();
|
this.readLock.lock();
|
||||||
|
@ -705,7 +682,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
|
|
||||||
public void transferStateFromPreviousAttempt(RMAppAttempt attempt) {
|
public void transferStateFromPreviousAttempt(RMAppAttempt attempt) {
|
||||||
this.justFinishedContainers = attempt.getJustFinishedContainers();
|
this.justFinishedContainers = attempt.getJustFinishedContainers();
|
||||||
this.ranNodes = attempt.getRanNodes();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void recoverAppAttemptCredentials(Credentials appAttemptTokens)
|
private void recoverAppAttemptCredentials(Credentials appAttemptTokens)
|
||||||
|
@ -1402,17 +1378,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
finalStatus = unregisterEvent.getFinalApplicationStatus();
|
finalStatus = unregisterEvent.getFinalApplicationStatus();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final class ContainerAcquiredTransition extends
|
|
||||||
BaseTransition {
|
|
||||||
@Override
|
|
||||||
public void transition(RMAppAttemptImpl appAttempt,
|
|
||||||
RMAppAttemptEvent event) {
|
|
||||||
RMAppAttemptContainerAcquiredEvent acquiredEvent
|
|
||||||
= (RMAppAttemptContainerAcquiredEvent) event;
|
|
||||||
appAttempt.ranNodes.add(acquiredEvent.getContainer().getNodeId());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static final class ContainerFinishedTransition
|
private static final class ContainerFinishedTransition
|
||||||
implements
|
implements
|
||||||
MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
|
MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
|
||||||
|
|
|
@ -37,7 +37,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
|
||||||
|
@ -365,9 +365,9 @@ public class RMContainerImpl implements RMContainer {
|
||||||
RMContainerEventType.FINISHED));
|
RMContainerEventType.FINISHED));
|
||||||
return RMContainerState.COMPLETED;
|
return RMContainerState.COMPLETED;
|
||||||
} else if (report.getContainerState().equals(ContainerState.RUNNING)) {
|
} else if (report.getContainerState().equals(ContainerState.RUNNING)) {
|
||||||
// Tell the appAttempt
|
// Tell the app
|
||||||
container.eventHandler.handle(new RMAppAttemptContainerAcquiredEvent(
|
container.eventHandler.handle(new RMAppRunningOnNodeEvent(container
|
||||||
container.getApplicationAttemptId(), container.getContainer()));
|
.getApplicationAttemptId().getApplicationId(), container.nodeId));
|
||||||
return RMContainerState.RUNNING;
|
return RMContainerState.RUNNING;
|
||||||
} else {
|
} else {
|
||||||
// This can never happen.
|
// This can never happen.
|
||||||
|
@ -408,9 +408,9 @@ public class RMContainerImpl implements RMContainer {
|
||||||
// Register with containerAllocationExpirer.
|
// Register with containerAllocationExpirer.
|
||||||
container.containerAllocationExpirer.register(container.getContainerId());
|
container.containerAllocationExpirer.register(container.getContainerId());
|
||||||
|
|
||||||
// Tell the appAttempt
|
// Tell the app
|
||||||
container.eventHandler.handle(new RMAppAttemptContainerAcquiredEvent(
|
container.eventHandler.handle(new RMAppRunningOnNodeEvent(container
|
||||||
container.getApplicationAttemptId(), container.getContainer()));
|
.getApplicationAttemptId().getApplicationId(), container.nodeId));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -55,6 +55,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||||
|
@ -473,7 +475,13 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
} else {
|
} else {
|
||||||
// Increment activeNodes explicitly because this is a new node.
|
// Increment activeNodes explicitly because this is a new node.
|
||||||
ClusterMetrics.getMetrics().incrNumActiveNodes();
|
ClusterMetrics.getMetrics().incrNumActiveNodes();
|
||||||
containers = startEvent.getContainerRecoveryReports();
|
containers = startEvent.getNMContainerStatuses();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (null != startEvent.getRunningApplications()) {
|
||||||
|
for (ApplicationId appId : startEvent.getRunningApplications()) {
|
||||||
|
handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
rmNode.context.getDispatcher().getEventHandler()
|
rmNode.context.getDispatcher().getEventHandler()
|
||||||
|
@ -482,6 +490,24 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
new NodesListManagerEvent(
|
new NodesListManagerEvent(
|
||||||
NodesListManagerEventType.NODE_USABLE, rmNode));
|
NodesListManagerEventType.NODE_USABLE, rmNode));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void handleRunningAppOnNode(RMNodeImpl rmNode, RMContext context,
|
||||||
|
ApplicationId appId, NodeId nodeId) {
|
||||||
|
RMApp app = context.getRMApps().get(appId);
|
||||||
|
|
||||||
|
// if we failed getting app by appId, maybe something wrong happened, just
|
||||||
|
// add the app to the finishedApplications list so that the app can be
|
||||||
|
// cleaned up on the NM
|
||||||
|
if (null == app) {
|
||||||
|
LOG.warn("Cannot get RMApp by appId=" + appId
|
||||||
|
+ ", just added it to finishedApplications list for cleanup");
|
||||||
|
rmNode.finishedApplications.add(appId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
context.getDispatcher().getEventHandler()
|
||||||
|
.handle(new RMAppRunningOnNodeEvent(appId, nodeId));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class ReconnectNodeTransition implements
|
public static class ReconnectNodeTransition implements
|
||||||
|
@ -517,7 +543,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
}
|
}
|
||||||
rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode);
|
rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode);
|
||||||
rmNode.context.getDispatcher().getEventHandler().handle(
|
rmNode.context.getDispatcher().getEventHandler().handle(
|
||||||
new RMNodeStartedEvent(newNode.getNodeID(), null));
|
new RMNodeStartedEvent(newNode.getNodeID(), null, null));
|
||||||
}
|
}
|
||||||
rmNode.context.getDispatcher().getEventHandler().handle(
|
rmNode.context.getDispatcher().getEventHandler().handle(
|
||||||
new NodesListManagerEvent(
|
new NodesListManagerEvent(
|
||||||
|
|
|
@ -20,19 +20,28 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||||
|
|
||||||
public class RMNodeStartedEvent extends RMNodeEvent {
|
public class RMNodeStartedEvent extends RMNodeEvent {
|
||||||
|
|
||||||
private List<NMContainerStatus> containerReports;
|
private List<NMContainerStatus> containerStatuses;
|
||||||
|
private List<ApplicationId> runningApplications;
|
||||||
|
|
||||||
public RMNodeStartedEvent(NodeId nodeId, List<NMContainerStatus> containerReports) {
|
public RMNodeStartedEvent(NodeId nodeId,
|
||||||
|
List<NMContainerStatus> containerReports,
|
||||||
|
List<ApplicationId> runningApplications) {
|
||||||
super(nodeId, RMNodeEventType.STARTED);
|
super(nodeId, RMNodeEventType.STARTED);
|
||||||
this.containerReports = containerReports;
|
this.containerStatuses = containerReports;
|
||||||
|
this.runningApplications = runningApplications;
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<NMContainerStatus> getContainerRecoveryReports() {
|
public List<NMContainerStatus> getNMContainerStatuses() {
|
||||||
return this.containerReports;
|
return this.containerStatuses;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<ApplicationId> getRunningApplications() {
|
||||||
|
return runningApplications;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -100,11 +100,17 @@ public class MockNM {
|
||||||
}
|
}
|
||||||
|
|
||||||
public RegisterNodeManagerResponse registerNode() throws Exception {
|
public RegisterNodeManagerResponse registerNode() throws Exception {
|
||||||
return registerNode(null);
|
return registerNode(null, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public RegisterNodeManagerResponse registerNode(
|
||||||
|
List<ApplicationId> runningApplications) throws Exception {
|
||||||
|
return registerNode(null, runningApplications);
|
||||||
}
|
}
|
||||||
|
|
||||||
public RegisterNodeManagerResponse registerNode(
|
public RegisterNodeManagerResponse registerNode(
|
||||||
List<NMContainerStatus> containerReports) throws Exception{
|
List<NMContainerStatus> containerReports,
|
||||||
|
List<ApplicationId> runningApplications) throws Exception {
|
||||||
RegisterNodeManagerRequest req = Records.newRecord(
|
RegisterNodeManagerRequest req = Records.newRecord(
|
||||||
RegisterNodeManagerRequest.class);
|
RegisterNodeManagerRequest.class);
|
||||||
req.setNodeId(nodeId);
|
req.setNodeId(nodeId);
|
||||||
|
@ -113,6 +119,7 @@ public class MockNM {
|
||||||
req.setResource(resource);
|
req.setResource(resource);
|
||||||
req.setContainerStatuses(containerReports);
|
req.setContainerStatuses(containerReports);
|
||||||
req.setNMVersion(version);
|
req.setNMVersion(version);
|
||||||
|
req.setRunningApplications(runningApplications);
|
||||||
RegisterNodeManagerResponse registrationResponse =
|
RegisterNodeManagerResponse registrationResponse =
|
||||||
resourceTracker.registerNodeManager(req);
|
resourceTracker.registerNodeManager(req);
|
||||||
this.currentContainerTokenMasterKey =
|
this.currentContainerTokenMasterKey =
|
||||||
|
|
|
@ -78,6 +78,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSec
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
import org.apache.hadoop.yarn.util.YarnVersionInfo;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
|
@ -350,11 +351,20 @@ public class MockRM extends ResourceManager {
|
||||||
nm.registerNode();
|
nm.registerNode();
|
||||||
return nm;
|
return nm;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public MockNM registerNode(String nodeIdStr, int memory, int vCores,
|
||||||
|
List<ApplicationId> runningApplications) throws Exception {
|
||||||
|
MockNM nm =
|
||||||
|
new MockNM(nodeIdStr, memory, vCores, getResourceTrackerService(),
|
||||||
|
YarnVersionInfo.getVersion());
|
||||||
|
nm.registerNode(runningApplications);
|
||||||
|
return nm;
|
||||||
|
}
|
||||||
|
|
||||||
public void sendNodeStarted(MockNM nm) throws Exception {
|
public void sendNodeStarted(MockNM nm) throws Exception {
|
||||||
RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get(
|
RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get(
|
||||||
nm.getNodeId());
|
nm.getNodeId());
|
||||||
node.handle(new RMNodeStartedEvent(nm.getNodeId(), null));
|
node.handle(new RMNodeStartedEvent(nm.getNodeId(), null, null));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void sendNodeLost(MockNM nm) throws Exception {
|
public void sendNodeLost(MockNM nm) throws Exception {
|
||||||
|
|
|
@ -18,26 +18,30 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
import java.net.UnknownHostException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.junit.Assert;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||||
|
@ -45,13 +49,29 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestApplicationCleanup {
|
public class TestApplicationCleanup {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory
|
private static final Log LOG = LogFactory
|
||||||
.getLog(TestApplicationCleanup.class);
|
.getLog(TestApplicationCleanup.class);
|
||||||
|
|
||||||
|
private YarnConfiguration conf;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws UnknownHostException {
|
||||||
|
Logger rootLogger = LogManager.getRootLogger();
|
||||||
|
rootLogger.setLevel(Level.DEBUG);
|
||||||
|
conf = new YarnConfiguration();
|
||||||
|
UserGroupInformation.setConfiguration(conf);
|
||||||
|
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
|
||||||
|
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||||
|
Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("resource")
|
||||||
@Test
|
@Test
|
||||||
public void testAppCleanup() throws Exception {
|
public void testAppCleanup() throws Exception {
|
||||||
Logger rootLogger = LogManager.getRootLogger();
|
Logger rootLogger = LogManager.getRootLogger();
|
||||||
|
@ -130,6 +150,7 @@ public class TestApplicationCleanup {
|
||||||
rm.stop();
|
rm.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("resource")
|
||||||
@Test
|
@Test
|
||||||
public void testContainerCleanup() throws Exception {
|
public void testContainerCleanup() throws Exception {
|
||||||
|
|
||||||
|
@ -252,6 +273,69 @@ public class TestApplicationCleanup {
|
||||||
|
|
||||||
rm.stop();
|
rm.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void waitForAppCleanupMessageRecved(MockNM nm, ApplicationId appId)
|
||||||
|
throws Exception {
|
||||||
|
while (true) {
|
||||||
|
NodeHeartbeatResponse response = nm.nodeHeartbeat(true);
|
||||||
|
if (response.getApplicationsToCleanup() != null
|
||||||
|
&& response.getApplicationsToCleanup().size() == 1
|
||||||
|
&& appId.equals(response.getApplicationsToCleanup().get(0))) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info("Haven't got application=" + appId.toString()
|
||||||
|
+ " in cleanup list from node heartbeat response, "
|
||||||
|
+ "sleep for a while before next heartbeat");
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
|
||||||
|
throws Exception {
|
||||||
|
RMAppAttempt attempt = app.getCurrentAppAttempt();
|
||||||
|
nm.nodeHeartbeat(true);
|
||||||
|
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
|
||||||
|
am.registerAppAttempt();
|
||||||
|
rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
|
||||||
|
return am;
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("resource")
|
||||||
|
@Test (timeout = 60000)
|
||||||
|
public void testAppCleanupWhenRestartedAfterAppFinished() throws Exception {
|
||||||
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
||||||
|
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||||
|
memStore.init(conf);
|
||||||
|
|
||||||
|
// start RM
|
||||||
|
MockRM rm1 = new MockRM(conf, memStore);
|
||||||
|
rm1.start();
|
||||||
|
MockNM nm1 =
|
||||||
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||||
|
nm1.registerNode();
|
||||||
|
|
||||||
|
// create app and launch the AM
|
||||||
|
RMApp app0 = rm1.submitApp(200);
|
||||||
|
MockAM am0 = launchAM(app0, rm1, nm1);
|
||||||
|
nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||||
|
rm1.waitForState(app0.getApplicationId(), RMAppState.FAILED);
|
||||||
|
|
||||||
|
// start new RM
|
||||||
|
MockRM rm2 = new MockRM(conf, memStore);
|
||||||
|
rm2.start();
|
||||||
|
|
||||||
|
// nm1 register to rm2, and do a heartbeat
|
||||||
|
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||||
|
nm1.registerNode(Arrays.asList(app0.getApplicationId()));
|
||||||
|
rm2.waitForState(app0.getApplicationId(), RMAppState.FAILED);
|
||||||
|
|
||||||
|
// wait for application cleanup message received
|
||||||
|
waitForAppCleanupMessageRecved(nm1, app0.getApplicationId());
|
||||||
|
|
||||||
|
rm1.stop();
|
||||||
|
rm2.stop();
|
||||||
|
}
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
TestApplicationCleanup t = new TestApplicationCleanup();
|
TestApplicationCleanup t = new TestApplicationCleanup();
|
||||||
|
|
|
@ -161,7 +161,7 @@ public class TestRMNodeTransitions {
|
||||||
@Test (timeout = 5000)
|
@Test (timeout = 5000)
|
||||||
public void testExpiredContainer() {
|
public void testExpiredContainer() {
|
||||||
// Start the node
|
// Start the node
|
||||||
node.handle(new RMNodeStartedEvent(null, null));
|
node.handle(new RMNodeStartedEvent(null, null, null));
|
||||||
verify(scheduler).handle(any(NodeAddedSchedulerEvent.class));
|
verify(scheduler).handle(any(NodeAddedSchedulerEvent.class));
|
||||||
|
|
||||||
// Expire a container
|
// Expire a container
|
||||||
|
@ -189,11 +189,11 @@ public class TestRMNodeTransitions {
|
||||||
@Test (timeout = 5000)
|
@Test (timeout = 5000)
|
||||||
public void testContainerUpdate() throws InterruptedException{
|
public void testContainerUpdate() throws InterruptedException{
|
||||||
//Start the node
|
//Start the node
|
||||||
node.handle(new RMNodeStartedEvent(null, null));
|
node.handle(new RMNodeStartedEvent(null, null, null));
|
||||||
|
|
||||||
NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1);
|
NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1);
|
||||||
RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
|
RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
|
||||||
node2.handle(new RMNodeStartedEvent(null, null));
|
node2.handle(new RMNodeStartedEvent(null, null, null));
|
||||||
|
|
||||||
ContainerId completedContainerIdFromNode1 = BuilderUtils.newContainerId(
|
ContainerId completedContainerIdFromNode1 = BuilderUtils.newContainerId(
|
||||||
BuilderUtils.newApplicationAttemptId(
|
BuilderUtils.newApplicationAttemptId(
|
||||||
|
@ -249,7 +249,7 @@ public class TestRMNodeTransitions {
|
||||||
@Test (timeout = 5000)
|
@Test (timeout = 5000)
|
||||||
public void testStatusChange(){
|
public void testStatusChange(){
|
||||||
//Start the node
|
//Start the node
|
||||||
node.handle(new RMNodeStartedEvent(null, null));
|
node.handle(new RMNodeStartedEvent(null, null, null));
|
||||||
//Add info to the queue first
|
//Add info to the queue first
|
||||||
node.setNextHeartBeat(false);
|
node.setNextHeartBeat(false);
|
||||||
|
|
||||||
|
@ -465,7 +465,7 @@ public class TestRMNodeTransitions {
|
||||||
RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0,
|
RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0,
|
||||||
null, ResourceOption.newInstance(capability,
|
null, ResourceOption.newInstance(capability,
|
||||||
RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), nmVersion);
|
RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), nmVersion);
|
||||||
node.handle(new RMNodeStartedEvent(node.getNodeID(), null));
|
node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null));
|
||||||
Assert.assertEquals(NodeState.RUNNING, node.getState());
|
Assert.assertEquals(NodeState.RUNNING, node.getState());
|
||||||
return node;
|
return node;
|
||||||
}
|
}
|
||||||
|
@ -496,7 +496,7 @@ public class TestRMNodeTransitions {
|
||||||
int initialUnhealthy = cm.getUnhealthyNMs();
|
int initialUnhealthy = cm.getUnhealthyNMs();
|
||||||
int initialDecommissioned = cm.getNumDecommisionedNMs();
|
int initialDecommissioned = cm.getNumDecommisionedNMs();
|
||||||
int initialRebooted = cm.getNumRebootedNMs();
|
int initialRebooted = cm.getNumRebootedNMs();
|
||||||
node.handle(new RMNodeStartedEvent(node.getNodeID(), null));
|
node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null));
|
||||||
Assert.assertEquals("Active Nodes", initialActive + 1, cm.getNumActiveNMs());
|
Assert.assertEquals("Active Nodes", initialActive + 1, cm.getNumActiveNMs());
|
||||||
Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
|
Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
|
||||||
Assert.assertEquals("Unhealthy Nodes",
|
Assert.assertEquals("Unhealthy Nodes",
|
||||||
|
|
|
@ -102,7 +102,6 @@ import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestRMRestart {
|
public class TestRMRestart {
|
||||||
|
|
||||||
private final static File TEMP_DIR = new File(System.getProperty(
|
private final static File TEMP_DIR = new File(System.getProperty(
|
||||||
"test.build.data", "/tmp"), "decommision");
|
"test.build.data", "/tmp"), "decommision");
|
||||||
private File hostFile = new File(TEMP_DIR + File.separator + "hostFile.txt");
|
private File hostFile = new File(TEMP_DIR + File.separator + "hostFile.txt");
|
||||||
|
@ -309,7 +308,7 @@ public class TestRMRestart {
|
||||||
TestRMRestart
|
TestRMRestart
|
||||||
.createNMContainerStatus(loadedApp1.getCurrentAppAttempt()
|
.createNMContainerStatus(loadedApp1.getCurrentAppAttempt()
|
||||||
.getAppAttemptId(), 1, ContainerState.COMPLETE);
|
.getAppAttemptId(), 1, ContainerState.COMPLETE);
|
||||||
nm1.registerNode(Arrays.asList(status));
|
nm1.registerNode(Arrays.asList(status), null);
|
||||||
nm2.registerNode();
|
nm2.registerNode();
|
||||||
|
|
||||||
rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED);
|
rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED);
|
||||||
|
@ -392,7 +391,7 @@ public class TestRMRestart {
|
||||||
// completed apps are not removed immediately after app finish
|
// completed apps are not removed immediately after app finish
|
||||||
// And finished app is also loaded back.
|
// And finished app is also loaded back.
|
||||||
Assert.assertEquals(4, rmAppState.size());
|
Assert.assertEquals(4, rmAppState.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout = 60000)
|
@Test (timeout = 60000)
|
||||||
public void testRMRestartAppRunningAMFailed() throws Exception {
|
public void testRMRestartAppRunningAMFailed() throws Exception {
|
||||||
|
@ -514,7 +513,7 @@ public class TestRMRestart {
|
||||||
NMContainerStatus status =
|
NMContainerStatus status =
|
||||||
TestRMRestart.createNMContainerStatus(
|
TestRMRestart.createNMContainerStatus(
|
||||||
am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||||
nm1.registerNode(Arrays.asList(status));
|
nm1.registerNode(Arrays.asList(status), null);
|
||||||
rm2.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
rm2.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||||
launchAM(rmApp, rm2, nm1);
|
launchAM(rmApp, rm2, nm1);
|
||||||
Assert.assertEquals(3, rmApp.getAppAttempts().size());
|
Assert.assertEquals(3, rmApp.getAppAttempts().size());
|
||||||
|
@ -1680,7 +1679,8 @@ public class TestRMRestart {
|
||||||
TestRMRestart
|
TestRMRestart
|
||||||
.createNMContainerStatus(loadedApp1.getCurrentAppAttempt()
|
.createNMContainerStatus(loadedApp1.getCurrentAppAttempt()
|
||||||
.getAppAttemptId(), 1, ContainerState.COMPLETE);
|
.getAppAttemptId(), 1, ContainerState.COMPLETE);
|
||||||
nm1.registerNode(Arrays.asList(status));
|
nm1.registerNode(Arrays.asList(status), null);
|
||||||
|
|
||||||
while (loadedApp1.getAppAttempts().size() != 2) {
|
while (loadedApp1.getAppAttempts().size() != 2) {
|
||||||
Thread.sleep(200);
|
Thread.sleep(200);
|
||||||
}
|
}
|
||||||
|
@ -1807,7 +1807,7 @@ public class TestRMRestart {
|
||||||
NMContainerStatus status =
|
NMContainerStatus status =
|
||||||
TestRMRestart.createNMContainerStatus(
|
TestRMRestart.createNMContainerStatus(
|
||||||
am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||||
nm1.registerNode(Arrays.asList(status));
|
nm1.registerNode(Arrays.asList(status), null);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
|
@ -159,7 +159,7 @@ public class TestWorkPreservingRMRestart {
|
||||||
ContainerState.COMPLETE);
|
ContainerState.COMPLETE);
|
||||||
|
|
||||||
nm1.registerNode(Arrays.asList(amContainer, runningContainer,
|
nm1.registerNode(Arrays.asList(amContainer, runningContainer,
|
||||||
completedContainer));
|
completedContainer), null);
|
||||||
|
|
||||||
// Wait for RM to settle down on recovering containers;
|
// Wait for RM to settle down on recovering containers;
|
||||||
waitForNumContainersToRecover(2, rm2, am1.getApplicationAttemptId());
|
waitForNumContainersToRecover(2, rm2, am1.getApplicationAttemptId());
|
||||||
|
@ -383,11 +383,11 @@ public class TestWorkPreservingRMRestart {
|
||||||
List<NMContainerStatus> am1_2Containers =
|
List<NMContainerStatus> am1_2Containers =
|
||||||
createNMContainerStatusForApp(am1_2);
|
createNMContainerStatusForApp(am1_2);
|
||||||
am1_1Containers.addAll(am1_2Containers);
|
am1_1Containers.addAll(am1_2Containers);
|
||||||
nm1.registerNode(am1_1Containers);
|
nm1.registerNode(am1_1Containers, null);
|
||||||
|
|
||||||
List<NMContainerStatus> am2Containers =
|
List<NMContainerStatus> am2Containers =
|
||||||
createNMContainerStatusForApp(am2);
|
createNMContainerStatusForApp(am2);
|
||||||
nm2.registerNode(am2Containers);
|
nm2.registerNode(am2Containers, null);
|
||||||
|
|
||||||
// Wait for RM to settle down on recovering containers;
|
// Wait for RM to settle down on recovering containers;
|
||||||
waitForNumContainersToRecover(2, rm2, am1_1.getApplicationAttemptId());
|
waitForNumContainersToRecover(2, rm2, am1_1.getApplicationAttemptId());
|
||||||
|
@ -482,7 +482,7 @@ public class TestWorkPreservingRMRestart {
|
||||||
TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 3,
|
TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 3,
|
||||||
ContainerState.COMPLETE);
|
ContainerState.COMPLETE);
|
||||||
nm1.registerNode(Arrays.asList(amContainer, runningContainer,
|
nm1.registerNode(Arrays.asList(amContainer, runningContainer,
|
||||||
completedContainer));
|
completedContainer), null);
|
||||||
rm2.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
rm2.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||||
// Wait for RM to settle down on recovering containers;
|
// Wait for RM to settle down on recovering containers;
|
||||||
Thread.sleep(3000);
|
Thread.sleep(3000);
|
||||||
|
@ -519,7 +519,7 @@ public class TestWorkPreservingRMRestart {
|
||||||
NMContainerStatus completedContainer =
|
NMContainerStatus completedContainer =
|
||||||
TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 3,
|
TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 3,
|
||||||
ContainerState.COMPLETE);
|
ContainerState.COMPLETE);
|
||||||
nm1.registerNode(Arrays.asList(runningContainer, completedContainer));
|
nm1.registerNode(Arrays.asList(runningContainer, completedContainer), null);
|
||||||
RMApp recoveredApp1 =
|
RMApp recoveredApp1 =
|
||||||
rm2.getRMContext().getRMApps().get(app1.getApplicationId());
|
rm2.getRMContext().getRMApps().get(app1.getApplicationId());
|
||||||
assertEquals(RMAppState.FINISHED, recoveredApp1.getState());
|
assertEquals(RMAppState.FINISHED, recoveredApp1.getState());
|
||||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
|
@ -159,6 +160,11 @@ public abstract class MockAsm extends MockApps {
|
||||||
public YarnApplicationState createApplicationState() {
|
public YarnApplicationState createApplicationState() {
|
||||||
throw new UnsupportedOperationException("Not supported yet.");
|
throw new UnsupportedOperationException("Not supported yet.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<NodeId> getRanNodes() {
|
||||||
|
throw new UnsupportedOperationException("Not supported yet.");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static RMApp newApplication(int i) {
|
public static RMApp newApplication(int i) {
|
||||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
@ -232,4 +233,9 @@ public class MockRMApp implements RMApp {
|
||||||
public YarnApplicationState createApplicationState() {
|
public YarnApplicationState createApplicationState() {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<NodeId> getRanNodes() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
|
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
|
||||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||||
|
@ -75,8 +76,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
|
||||||
|
@ -315,7 +316,7 @@ public class TestRMAppAttemptTransitions {
|
||||||
assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
|
assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
|
||||||
assertNull(applicationAttempt.getMasterContainer());
|
assertNull(applicationAttempt.getMasterContainer());
|
||||||
assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
|
assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
|
||||||
assertEquals(0, applicationAttempt.getRanNodes().size());
|
assertEquals(0, application.getRanNodes().size());
|
||||||
assertNull(applicationAttempt.getFinalApplicationStatus());
|
assertNull(applicationAttempt.getFinalApplicationStatus());
|
||||||
assertNotNull(applicationAttempt.getTrackingUrl());
|
assertNotNull(applicationAttempt.getTrackingUrl());
|
||||||
assertFalse("N/A".equals(applicationAttempt.getTrackingUrl()));
|
assertFalse("N/A".equals(applicationAttempt.getTrackingUrl()));
|
||||||
|
@ -331,7 +332,7 @@ public class TestRMAppAttemptTransitions {
|
||||||
assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
|
assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
|
||||||
assertNull(applicationAttempt.getMasterContainer());
|
assertNull(applicationAttempt.getMasterContainer());
|
||||||
assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
|
assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
|
||||||
assertEquals(0, applicationAttempt.getRanNodes().size());
|
assertEquals(0, application.getRanNodes().size());
|
||||||
assertNull(applicationAttempt.getFinalApplicationStatus());
|
assertNull(applicationAttempt.getFinalApplicationStatus());
|
||||||
if (UserGroupInformation.isSecurityEnabled()) {
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
verify(clientToAMTokenManager).createMasterKey(
|
verify(clientToAMTokenManager).createMasterKey(
|
||||||
|
@ -359,7 +360,7 @@ public class TestRMAppAttemptTransitions {
|
||||||
assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
|
assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
|
||||||
assertNull(applicationAttempt.getMasterContainer());
|
assertNull(applicationAttempt.getMasterContainer());
|
||||||
assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
|
assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
|
||||||
assertEquals(0, applicationAttempt.getRanNodes().size());
|
assertEquals(0, application.getRanNodes().size());
|
||||||
assertNull(applicationAttempt.getFinalApplicationStatus());
|
assertNull(applicationAttempt.getFinalApplicationStatus());
|
||||||
|
|
||||||
// Check events
|
// Check events
|
||||||
|
@ -385,7 +386,7 @@ public class TestRMAppAttemptTransitions {
|
||||||
assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
|
assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
|
||||||
assertEquals(amContainer, applicationAttempt.getMasterContainer());
|
assertEquals(amContainer, applicationAttempt.getMasterContainer());
|
||||||
assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
|
assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
|
||||||
assertEquals(0, applicationAttempt.getRanNodes().size());
|
assertEquals(0, application.getRanNodes().size());
|
||||||
assertNull(applicationAttempt.getFinalApplicationStatus());
|
assertNull(applicationAttempt.getFinalApplicationStatus());
|
||||||
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
|
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
|
||||||
verifyAttemptFinalStateSaved();
|
verifyAttemptFinalStateSaved();
|
||||||
|
@ -425,7 +426,7 @@ public class TestRMAppAttemptTransitions {
|
||||||
assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
|
assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
|
||||||
assertNull(applicationAttempt.getMasterContainer());
|
assertNull(applicationAttempt.getMasterContainer());
|
||||||
assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
|
assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
|
||||||
assertEquals(0, applicationAttempt.getRanNodes().size());
|
assertEquals(0, application.getRanNodes().size());
|
||||||
assertNull(applicationAttempt.getFinalApplicationStatus());
|
assertNull(applicationAttempt.getFinalApplicationStatus());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -461,7 +462,7 @@ public class TestRMAppAttemptTransitions {
|
||||||
assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
|
assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
|
||||||
assertEquals(container, applicationAttempt.getMasterContainer());
|
assertEquals(container, applicationAttempt.getMasterContainer());
|
||||||
assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
|
assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
|
||||||
assertEquals(0, applicationAttempt.getRanNodes().size());
|
assertEquals(0, application.getRanNodes().size());
|
||||||
|
|
||||||
// Check events
|
// Check events
|
||||||
verify(application, times(1)).handle(any(RMAppFailedAttemptEvent.class));
|
verify(application, times(1)).handle(any(RMAppFailedAttemptEvent.class));
|
||||||
|
@ -666,8 +667,10 @@ public class TestRMAppAttemptTransitions {
|
||||||
runApplicationAttempt(null, "host", 8042, url, true);
|
runApplicationAttempt(null, "host", 8042, url, true);
|
||||||
|
|
||||||
// complete a container
|
// complete a container
|
||||||
applicationAttempt.handle(new RMAppAttemptContainerAcquiredEvent(
|
Container container = mock(Container.class);
|
||||||
applicationAttempt.getAppAttemptId(), mock(Container.class)));
|
when(container.getNodeId()).thenReturn(NodeId.newInstance("host", 1234));
|
||||||
|
application.handle(new RMAppRunningOnNodeEvent(application.getApplicationId(),
|
||||||
|
container.getNodeId()));
|
||||||
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
|
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
|
||||||
applicationAttempt.getAppAttemptId(), mock(ContainerStatus.class)));
|
applicationAttempt.getAppAttemptId(), mock(ContainerStatus.class)));
|
||||||
// complete AM
|
// complete AM
|
||||||
|
@ -845,7 +848,7 @@ public class TestRMAppAttemptTransitions {
|
||||||
applicationAttempt.getAppAttemptState());
|
applicationAttempt.getAppAttemptState());
|
||||||
assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
|
assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
|
||||||
assertEquals(amContainer, applicationAttempt.getMasterContainer());
|
assertEquals(amContainer, applicationAttempt.getMasterContainer());
|
||||||
assertEquals(0, applicationAttempt.getRanNodes().size());
|
assertEquals(0, application.getRanNodes().size());
|
||||||
String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app",
|
String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app",
|
||||||
applicationAttempt.getAppAttemptId().getApplicationId());
|
applicationAttempt.getAppAttemptId().getApplicationId());
|
||||||
assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl());
|
assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl());
|
||||||
|
@ -882,7 +885,7 @@ public class TestRMAppAttemptTransitions {
|
||||||
applicationAttempt.getAppAttemptState());
|
applicationAttempt.getAppAttemptState());
|
||||||
assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
|
assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
|
||||||
assertEquals(amContainer, applicationAttempt.getMasterContainer());
|
assertEquals(amContainer, applicationAttempt.getMasterContainer());
|
||||||
assertEquals(0, applicationAttempt.getRanNodes().size());
|
assertEquals(0, application.getRanNodes().size());
|
||||||
String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app",
|
String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app",
|
||||||
applicationAttempt.getAppAttemptId().getApplicationId());
|
applicationAttempt.getAppAttemptId().getApplicationId());
|
||||||
assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl());
|
assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl());
|
||||||
|
|
Loading…
Reference in New Issue