diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 09cf7a1f562..cf082999617 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -126,6 +126,10 @@ Release 2.5.0 - UNRELEASED YARN-2010. Document yarn.resourcemanager.zk-auth and its scope. (Robert Kanter via kasha) + YARN-2115. Replaced RegisterNodeManagerRequest's ContainerStatus with a new + NMContainerStatus which has more information that is needed for + work-preserving RM-restart. (Jian He via vinodkv) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NMContainerStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NMContainerStatus.java new file mode 100644 index 00000000000..d55e5a6f567 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NMContainerStatus.java @@ -0,0 +1,98 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.Records; + +/** + * NMContainerStatus includes the current information of a container. This + * record is used by YARN only, whereas {@link ContainerStatus} is used both + * inside YARN and by end-users. + */ +public abstract class NMContainerStatus { + + public static NMContainerStatus newInstance(ContainerId containerId, + ContainerState containerState, Resource allocatedResource, + String diagnostics, int containerExitStatus) { + NMContainerStatus status = + Records.newRecord(NMContainerStatus.class); + status.setContainerId(containerId); + status.setContainerState(containerState); + status.setAllocatedResource(allocatedResource); + status.setDiagnostics(diagnostics); + status.setContainerExitStatus(containerExitStatus); + return status; + } + + /** + * Get the ContainerId of the container. + * + * @return ContainerId of the container. + */ + public abstract ContainerId getContainerId(); + + public abstract void setContainerId(ContainerId containerId); + + /** + * Get the allocated Resource of the container. + * + * @return allocated Resource of the container. + */ + public abstract Resource getAllocatedResource(); + + + public abstract void setAllocatedResource(Resource resource); + + /** + * Get the DiagnosticsInfo of the container. + * + * @return DiagnosticsInfo of the container + */ + public abstract String getDiagnostics(); + + public abstract void setDiagnostics(String diagnostics); + + + public abstract ContainerState getContainerState(); + + public abstract void setContainerState(ContainerState containerState); + + /** + * Get the final exit status of the container. + * + * @return final exit status of the container. + */ + public abstract int getContainerExitStatus(); + + + public abstract void setContainerExitStatus(int containerExitStatus); + + /** + * Get the Priority of the request. + * @return Priority of the request + */ + public abstract Priority getPriority(); + + public abstract void setPriority(Priority priority); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java index 6ca38615f7d..43e892d636f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java @@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; import java.util.List; -import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.util.Records; @@ -29,7 +28,7 @@ public abstract class RegisterNodeManagerRequest { public static RegisterNodeManagerRequest newInstance(NodeId nodeId, int httpPort, Resource resource, String nodeManagerVersionId, - List containerStatuses) { + List containerStatuses) { RegisterNodeManagerRequest request = Records.newRecord(RegisterNodeManagerRequest.class); request.setHttpPort(httpPort); @@ -44,11 +43,12 @@ public abstract class RegisterNodeManagerRequest { public abstract int getHttpPort(); public abstract Resource getResource(); public abstract String getNMVersion(); - public abstract List getContainerStatuses(); + public abstract List getNMContainerStatuses(); public abstract void setNodeId(NodeId nodeId); public abstract void setHttpPort(int port); public abstract void setResource(Resource resource); public abstract void setNMVersion(String version); - public abstract void setContainerStatuses(List containerStatuses); + public abstract void setContainerStatuses( + List containerStatuses); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NMContainerStatusPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NMContainerStatusPBImpl.java new file mode 100644 index 00000000000..785bc5d463b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NMContainerStatusPBImpl.java @@ -0,0 +1,266 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; +import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStateProto; +import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; + +import com.google.protobuf.TextFormat; + +public class NMContainerStatusPBImpl extends NMContainerStatus { + + NMContainerStatusProto proto = NMContainerStatusProto + .getDefaultInstance(); + NMContainerStatusProto.Builder builder = null; + boolean viaProto = false; + + private ContainerId containerId = null; + private Resource resource = null; + private Priority priority = null; + + public NMContainerStatusPBImpl() { + builder = NMContainerStatusProto.newBuilder(); + } + + public NMContainerStatusPBImpl(NMContainerStatusProto proto) { + this.proto = proto; + viaProto = true; + } + + public NMContainerStatusProto getProto() { + + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return this.getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + @Override + public Resource getAllocatedResource() { + if (this.resource != null) { + return this.resource; + } + NMContainerStatusProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasResource()) { + return null; + } + this.resource = convertFromProtoFormat(p.getResource()); + return this.resource; + } + + @Override + public ContainerId getContainerId() { + if (this.containerId != null) { + return this.containerId; + } + NMContainerStatusProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasContainerId()) { + return null; + } + this.containerId = convertFromProtoFormat(p.getContainerId()); + return this.containerId; + } + + @Override + public String getDiagnostics() { + NMContainerStatusProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasDiagnostics()) { + return null; + } + return (p.getDiagnostics()); + } + + @Override + public ContainerState getContainerState() { + NMContainerStatusProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasContainerState()) { + return null; + } + return convertFromProtoFormat(p.getContainerState()); + } + + @Override + public void setAllocatedResource(Resource resource) { + maybeInitBuilder(); + if (resource == null) + builder.clearResource(); + this.resource = resource; + } + + public void setContainerId(ContainerId containerId) { + maybeInitBuilder(); + if (containerId == null) + builder.clearContainerId(); + this.containerId = containerId; + } + + @Override + public void setDiagnostics(String diagnosticsInfo) { + maybeInitBuilder(); + if (diagnosticsInfo == null) { + builder.clearDiagnostics(); + return; + } + builder.setDiagnostics(diagnosticsInfo); + } + + @Override + public void setContainerState(ContainerState containerState) { + maybeInitBuilder(); + if (containerState == null) { + builder.clearContainerState(); + return; + } + builder.setContainerState(convertToProtoFormat(containerState)); + } + + @Override + public int getContainerExitStatus() { + NMContainerStatusProtoOrBuilder p = viaProto ? proto : builder; + return p.getContainerExitStatus(); + } + + @Override + public void setContainerExitStatus(int containerExitStatus) { + maybeInitBuilder(); + builder.setContainerExitStatus(containerExitStatus); + } + + @Override + public Priority getPriority() { + NMContainerStatusProtoOrBuilder p = viaProto ? proto : builder; + if (this.priority != null) { + return this.priority; + } + if (!p.hasPriority()) { + return null; + } + this.priority = convertFromProtoFormat(p.getPriority()); + return this.priority; + } + + @Override + public void setPriority(Priority priority) { + maybeInitBuilder(); + if (priority == null) + builder.clearPriority(); + this.priority = priority; + } + + private void mergeLocalToBuilder() { + if (this.containerId != null + && !((ContainerIdPBImpl) containerId).getProto().equals( + builder.getContainerId())) { + builder.setContainerId(convertToProtoFormat(this.containerId)); + } + + if (this.resource != null + && !((ResourcePBImpl) this.resource).getProto().equals( + builder.getResource())) { + builder.setResource(convertToProtoFormat(this.resource)); + } + + if (this.priority != null) { + builder.setPriority(convertToProtoFormat(this.priority)); + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = NMContainerStatusProto.newBuilder(proto); + } + viaProto = false; + } + + private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { + return new ContainerIdPBImpl(p); + } + + private ContainerIdProto convertToProtoFormat(ContainerId t) { + return ((ContainerIdPBImpl) t).getProto(); + } + + private ResourcePBImpl convertFromProtoFormat(ResourceProto p) { + return new ResourcePBImpl(p); + } + + private ResourceProto convertToProtoFormat(Resource t) { + return ((ResourcePBImpl) t).getProto(); + } + + private ContainerStateProto + convertToProtoFormat(ContainerState containerState) { + return ProtoUtils.convertToProtoFormat(containerState); + } + + private ContainerState convertFromProtoFormat( + ContainerStateProto containerState) { + return ProtoUtils.convertFromProtoFormat(containerState); + } + + private PriorityPBImpl convertFromProtoFormat(PriorityProto p) { + return new PriorityPBImpl(p); + } + + private PriorityProto convertToProtoFormat(Priority t) { + return ((PriorityPBImpl)t).getProto(); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java index 6f9c43dd98c..5b3d066fe6b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java @@ -20,24 +20,18 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; -import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; -import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; @@ -49,7 +43,7 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest private Resource resource = null; private NodeId nodeId = null; - private List containerStatuses = null; + private List containerStatuses = null; public RegisterNodeManagerRequestPBImpl() { builder = RegisterNodeManagerRequestProto.newBuilder(); @@ -69,7 +63,7 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest private void mergeLocalToBuilder() { if (this.containerStatuses != null) { - addContainerStatusesToProto(); + addNMContainerStatusesToProto(); } if (this.resource != null) { builder.setResource(convertToProtoFormat(this.resource)); @@ -80,6 +74,18 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest } + private synchronized void addNMContainerStatusesToProto() { + maybeInitBuilder(); + builder.clearContainerStatuses(); + List list = + new ArrayList(); + for (NMContainerStatus status : this.containerStatuses) { + list.add(convertToProtoFormat(status)); + } + builder.addAllContainerStatuses(list); + } + + private void mergeLocalToProto() { if (viaProto) maybeInitBuilder(); @@ -154,63 +160,31 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest } @Override - public List getContainerStatuses() { - initContainerStatuses(); + public List getNMContainerStatuses() { + initContainerRecoveryReports(); return containerStatuses; } - private void initContainerStatuses() { + private void initContainerRecoveryReports() { if (this.containerStatuses != null) { return; } RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder; - List list = p.getContainerStatusesList(); - this.containerStatuses = new ArrayList(); - for (ContainerStatusProto c : list) { + List list = p.getContainerStatusesList(); + this.containerStatuses = new ArrayList(); + for (NMContainerStatusProto c : list) { this.containerStatuses.add(convertFromProtoFormat(c)); } } @Override - public void setContainerStatuses(List containers) { - if (containers == null) { + public void setContainerStatuses( + List containerReports) { + if (containerReports == null) { return; } - initContainerStatuses(); - this.containerStatuses.addAll(containers); - } - - private void addContainerStatusesToProto() { - maybeInitBuilder(); - builder.clearContainerStatuses(); - if (containerStatuses == null) { - return; - } - Iterable it = new Iterable() { - - @Override - public Iterator iterator() { - return new Iterator() { - Iterator iter = containerStatuses.iterator(); - - @Override - public boolean hasNext() { - return iter.hasNext(); - } - - @Override - public ContainerStatusProto next() { - return convertToProtoFormat(iter.next()); - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; - } - }; - builder.addAllContainerStatuses(it); + initContainerRecoveryReports(); + this.containerStatuses.addAll(containerReports); } @Override @@ -259,11 +233,11 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest return ((ResourcePBImpl)t).getProto(); } - private ContainerStatusPBImpl convertFromProtoFormat(ContainerStatusProto c) { - return new ContainerStatusPBImpl(c); + private NMContainerStatusPBImpl convertFromProtoFormat(NMContainerStatusProto c) { + return new NMContainerStatusPBImpl(c); } - private ContainerStatusProto convertToProtoFormat(ContainerStatus c) { - return ((ContainerStatusPBImpl)c).getProto(); + private NMContainerStatusProto convertToProtoFormat(NMContainerStatus c) { + return ((NMContainerStatusPBImpl)c).getProto(); } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index c54490526df..ebd752f37aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -30,7 +30,7 @@ message RegisterNodeManagerRequestProto { optional int32 http_port = 3; optional ResourceProto resource = 4; optional string nm_version = 5; - repeated ContainerStatusProto containerStatuses = 6; + repeated NMContainerStatusProto container_statuses = 6; } message RegisterNodeManagerResponseProto { @@ -58,3 +58,12 @@ message NodeHeartbeatResponseProto { optional int64 nextHeartBeatInterval = 7; optional string diagnostics_message = 8; } + +message NMContainerStatusProto { + optional ContainerIdProto container_id = 1; + optional ContainerStateProto container_state = 2; + optional ResourceProto resource = 3; + optional PriorityProto priority = 4; + optional string diagnostics = 5 [default = "N/A"]; + optional int32 container_exit_status = 6; +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java new file mode 100644 index 00000000000..2ffc9c99613 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java @@ -0,0 +1,99 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NMContainerStatusPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl; +import org.junit.Assert; +import org.junit.Test; + +public class TestProtocolRecords { + + @Test + public void testContainerRecoveryReport() { + ApplicationId appId = ApplicationId.newInstance(123456789, 1); + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); + ContainerId containerId = ContainerId.newInstance(attemptId, 1); + Resource resource = Resource.newInstance(1000, 200); + + NMContainerStatus report = + NMContainerStatus.newInstance(containerId, + ContainerState.COMPLETE, resource, "diagnostics", + ContainerExitStatus.ABORTED); + NMContainerStatus reportProto = + new NMContainerStatusPBImpl( + ((NMContainerStatusPBImpl) report).getProto()); + Assert.assertEquals("diagnostics", reportProto.getDiagnostics()); + Assert.assertEquals(resource, reportProto.getAllocatedResource()); + Assert.assertEquals(ContainerExitStatus.ABORTED, + reportProto.getContainerExitStatus()); + Assert.assertEquals(ContainerState.COMPLETE, + reportProto.getContainerState()); + Assert.assertEquals(containerId, reportProto.getContainerId()); + } + + public static NMContainerStatus createContainerRecoveryReport( + ApplicationAttemptId appAttemptId, int id, ContainerState containerState) { + ContainerId containerId = ContainerId.newInstance(appAttemptId, id); + NMContainerStatus containerReport = + NMContainerStatus.newInstance(containerId, containerState, + Resource.newInstance(1024, 1), "diagnostics", 0); + return containerReport; + } + + @Test + public void testRegisterNodeManagerRequest() { + ApplicationId appId = ApplicationId.newInstance(123456789, 1); + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); + ContainerId containerId = ContainerId.newInstance(attemptId, 1); + + NMContainerStatus containerReport = + NMContainerStatus.newInstance(containerId, + ContainerState.RUNNING, Resource.newInstance(1024, 1), "diagnostics", + 0); + List reports = Arrays.asList(containerReport); + RegisterNodeManagerRequest request = + RegisterNodeManagerRequest.newInstance( + NodeId.newInstance("1.1.1.1", 1000), 8080, + Resource.newInstance(1024, 1), "NM-version-id", reports); + RegisterNodeManagerRequest requestProto = + new RegisterNodeManagerRequestPBImpl( + ((RegisterNodeManagerRequestPBImpl) request).getProto()); + Assert.assertEquals(containerReport, requestProto + .getNMContainerStatuses().get(0)); + Assert.assertEquals(8080, requestProto.getHttpPort()); + Assert.assertEquals("NM-version-id", requestProto.getNMVersion()); + Assert.assertEquals(NodeId.newInstance("1.1.1.1", 1000), + requestProto.getNodeId()); + Assert.assertEquals(Resource.newInstance(1024, 1), + requestProto.getResource()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index df99737d4b3..5cdb57458d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.api.ResourceManagerConstants; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.ServerRMProxy; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; @@ -246,13 +247,12 @@ public class NodeStatusUpdaterImpl extends AbstractService implements @VisibleForTesting protected void registerWithRM() throws YarnException, IOException { - List containerStatuses = getContainerStatuses(); + List containerReports = getNMContainerStatuses(); RegisterNodeManagerRequest request = RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource, - nodeManagerVersionId, containerStatuses); - if (containerStatuses != null) { - LOG.info("Registering with RM using finished containers :" - + containerStatuses); + nodeManagerVersionId, containerReports); + if (containerReports != null) { + LOG.info("Registering with RM using containers :" + containerReports); } RegisterNodeManagerResponse regNMResponse = resourceTracker.registerNodeManager(request); @@ -375,6 +375,27 @@ public class NodeStatusUpdaterImpl extends AbstractService implements return containerStatuses; } + // These NMContainerStatus are sent on NM registration and used by YARN only. + private List getNMContainerStatuses() { + List containerStatuses = + new ArrayList(); + for (Container container : this.context.getContainers().values()) { + NMContainerStatus status = + container.getNMContainerStatus(); + containerStatuses.add(status); + if (status.getContainerState().equals(ContainerState.COMPLETE)) { + // Adding to finished containers cache. Cache will keep it around at + // least for #durationToTrackStoppedContainers duration. In the + // subsequent call to stop container it will get removed from cache. + updateStoppedContainersInCache(container.getContainerId()); + addCompletedContainer(container); + } + } + LOG.info("Sending out " + containerStatuses.size() + + " NM container statuses: " + containerStatuses); + return containerStatuses; + } + private void addCompletedContainer(Container container) { synchronized (previousCompletedContainers) { previousCompletedContainers.add(container.getContainerId()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java index e69e61a6444..56b4fddbcd6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; public interface Container extends EventHandler { @@ -39,7 +40,7 @@ public interface Container extends EventHandler { ContainerTokenIdentifier getContainerTokenIdentifier(); String getUser(); - + ContainerState getContainerState(); ContainerLaunchContext getLaunchContext(); @@ -50,6 +51,8 @@ public interface Container extends EventHandler { ContainerStatus cloneAndGetContainerStatus(); + NMContainerStatus getNMContainerStatus(); + String toString(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 50653f5175c..1b683a10de6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger; import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants; @@ -387,6 +388,17 @@ public class ContainerImpl implements Container { } } + @Override + public NMContainerStatus getNMContainerStatus() { + this.readLock.lock(); + try { + return NMContainerStatus.newInstance(this.containerId, + getCurrentState(), getResource(), diagnostics.toString(), exitCode); + } finally { + this.readLock.unlock(); + } + } + @Override public ContainerId getContainerId() { return this.containerId; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java index 5c2e085f1d8..3cbf3d3bab7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java @@ -18,6 +18,9 @@ package org.apache.hadoop.yarn.server.nodemanager; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.io.File; import java.io.IOException; import java.util.ArrayList; @@ -27,17 +30,18 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicBoolean; -import org.junit.Assert; - import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException; @@ -46,6 +50,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.api.ResourceTracker; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; @@ -53,10 +58,12 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResp import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -185,6 +192,9 @@ public class TestNodeManagerResync { TestNodeStatusUpdater.createContainerStatus(2, ContainerState.COMPLETE); final Container container = TestNodeStatusUpdater.getMockContainer(testCompleteContainer); + NMContainerStatus report = + createNMContainerStatus(2, ContainerState.COMPLETE); + when(container.getNMContainerStatus()).thenReturn(report); NodeManager nm = new NodeManager() { int registerCount = 0; @@ -203,7 +213,7 @@ public class TestNodeManagerResync { if (registerCount == 0) { // first register, no containers info. try { - Assert.assertEquals(0, request.getContainerStatuses() + Assert.assertEquals(0, request.getNMContainerStatuses() .size()); } catch (AssertionError error) { error.printStackTrace(); @@ -214,8 +224,8 @@ public class TestNodeManagerResync { testCompleteContainer.getContainerId(), container); } else { // second register contains the completed container info. - List statuses = - request.getContainerStatuses(); + List statuses = + request.getNMContainerStatuses(); try { Assert.assertEquals(1, statuses.size()); Assert.assertEquals(testCompleteContainer.getContainerId(), @@ -510,4 +520,16 @@ public class TestNodeManagerResync { } } }} + + public static NMContainerStatus createNMContainerStatus(int id, + ContainerState containerState) { + ApplicationId applicationId = ApplicationId.newInstance(0, 1); + ApplicationAttemptId applicationAttemptId = + ApplicationAttemptId.newInstance(applicationId, 1); + ContainerId containerId = ContainerId.newInstance(applicationAttemptId, id); + NMContainerStatus containerReport = + NMContainerStatus.newInstance(containerId, containerState, + Resource.newInstance(1024, 1), "recover container", 0); + return containerReport; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java index a021214ad03..b2ccb6149ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; @@ -134,4 +135,9 @@ public class MockContainer implements Container { public ContainerTokenIdentifier getContainerTokenIdentifier() { return this.containerTokenIdentifier; } + + @Override + public NMContainerStatus getNMContainerStatus() { + return null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 1d4032048e4..a59d1d51bb7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -32,7 +32,6 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.VersionUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -45,6 +44,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.server.api.ResourceTracker; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; @@ -195,7 +195,7 @@ public class ResourceTrackerService extends AbstractService implements */ @SuppressWarnings("unchecked") @VisibleForTesting - void handleContainerStatus(ContainerStatus containerStatus) { + void handleNMContainerStatus(NMContainerStatus containerStatus) { ApplicationAttemptId appAttemptId = containerStatus.getContainerId().getApplicationAttemptId(); RMApp rmApp = @@ -219,11 +219,14 @@ public class ResourceTrackerService extends AbstractService implements RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptId); Container masterContainer = rmAppAttempt.getMasterContainer(); if (masterContainer.getId().equals(containerStatus.getContainerId()) - && containerStatus.getState() == ContainerState.COMPLETE) { + && containerStatus.getContainerState() == ContainerState.COMPLETE) { + ContainerStatus status = + ContainerStatus.newInstance(containerStatus.getContainerId(), + containerStatus.getContainerState(), containerStatus.getDiagnostics(), + containerStatus.getContainerExitStatus()); // sending master container finished event. RMAppAttemptContainerFinishedEvent evt = - new RMAppAttemptContainerFinishedEvent(appAttemptId, - containerStatus); + new RMAppAttemptContainerFinishedEvent(appAttemptId, status); rmContext.getDispatcher().getEventHandler().handle(evt); } } @@ -240,11 +243,11 @@ public class ResourceTrackerService extends AbstractService implements Resource capability = request.getResource(); String nodeManagerVersion = request.getNMVersion(); - if (!request.getContainerStatuses().isEmpty()) { + if (!request.getNMContainerStatuses().isEmpty()) { LOG.info("received container statuses on node manager register :" - + request.getContainerStatuses()); - for (ContainerStatus containerStatus : request.getContainerStatuses()) { - handleContainerStatus(containerStatus); + + request.getNMContainerStatuses()); + for (NMContainerStatus report : request.getNMContainerStatuses()) { + handleNMContainerStatus(report); } } RegisterNodeManagerResponse response = recordFactory diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 1dcac063b39..e3a57763216 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -32,7 +31,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; @@ -105,14 +104,14 @@ public class MockNM { } public RegisterNodeManagerResponse registerNode( - List containerStatus) throws Exception{ + List containerReports) throws Exception{ RegisterNodeManagerRequest req = Records.newRecord( RegisterNodeManagerRequest.class); req.setNodeId(nodeId); req.setHttpPort(httpPort); Resource resource = BuilderUtils.newResource(memory, vCores); req.setResource(resource); - req.setContainerStatuses(containerStatus); + req.setContainerStatuses(containerReports); req.setNMVersion(version); RegisterNodeManagerResponse registrationResponse = resourceTracker.registerNodeManager(req); @@ -185,4 +184,11 @@ public class MockNM { return heartbeatResponse; } + public int getMemory() { + return memory; + } + + public int getvCores() { + return vCores; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 6a0512307a2..3bdb66c4b40 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -30,6 +30,7 @@ import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -67,13 +68,14 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; -import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; @@ -303,13 +305,11 @@ public class TestRMRestart { nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService()); nm2 = new MockNM("127.0.0.2:5678", 15120, rm2.getResourceTrackerService()); - List containerStatuses = new ArrayList(); - ContainerStatus containerStatus = - BuilderUtils.newContainerStatus(BuilderUtils.newContainerId(loadedApp1 - .getCurrentAppAttempt().getAppAttemptId(), 1), - ContainerState.COMPLETE, "Killed AM container", 143); - containerStatuses.add(containerStatus); - nm1.registerNode(containerStatuses); + NMContainerStatus status = + TestRMRestart + .createNMContainerStatus(loadedApp1.getCurrentAppAttempt() + .getAppAttemptId(), 1, ContainerState.COMPLETE); + nm1.registerNode(Arrays.asList(status)); nm2.registerNode(); rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED); @@ -510,14 +510,11 @@ public class TestRMRestart { Assert.assertEquals(RMAppAttemptState.LAUNCHED, rmApp.getAppAttempts().get(am2.getApplicationAttemptId()) .getAppAttemptState()); - - List containerStatuses = new ArrayList(); - ContainerStatus containerStatus = - BuilderUtils.newContainerStatus( - BuilderUtils.newContainerId(am2.getApplicationAttemptId(), 1), - ContainerState.COMPLETE, "Killed AM container", 143); - containerStatuses.add(containerStatus); - nm1.registerNode(containerStatuses); + + NMContainerStatus status = + TestRMRestart.createNMContainerStatus( + am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + nm1.registerNode(Arrays.asList(status)); rm2.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED); launchAM(rmApp, rm2, nm1); Assert.assertEquals(3, rmApp.getAppAttempts().size()); @@ -1678,13 +1675,12 @@ public class TestRMRestart { am1.allocate(new ArrayList(), new ArrayList()); nm1.nodeHeartbeat(true); nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService()); - List containerStatuses = new ArrayList(); - ContainerStatus containerStatus = - BuilderUtils.newContainerStatus(BuilderUtils.newContainerId(loadedApp1 - .getCurrentAppAttempt().getAppAttemptId(), 1), - ContainerState.COMPLETE, "Killed AM container", 143); - containerStatuses.add(containerStatus); - nm1.registerNode(containerStatuses); + + NMContainerStatus status = + TestRMRestart + .createNMContainerStatus(loadedApp1.getCurrentAppAttempt() + .getAppAttemptId(), 1, ContainerState.COMPLETE); + nm1.registerNode(Arrays.asList(status)); while (loadedApp1.getAppAttempts().size() != 2) { Thread.sleep(200); } @@ -1808,12 +1804,10 @@ public class TestRMRestart { // ResourceTrackerService is started. super.serviceStart(); nm1.setResourceTrackerService(getResourceTrackerService()); - List status = new ArrayList(); - ContainerId amContainer = - ContainerId.newInstance(am0.getApplicationAttemptId(), 1); - status.add(ContainerStatus.newInstance(amContainer, - ContainerState.COMPLETE, "AM container exit", 143)); - nm1.registerNode(status); + NMContainerStatus status = + TestRMRestart.createNMContainerStatus( + am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + nm1.registerNode(Arrays.asList(status)); } }; } @@ -1852,6 +1846,15 @@ public class TestRMRestart { } } + public static NMContainerStatus createNMContainerStatus( + ApplicationAttemptId appAttemptId, int id, ContainerState containerState) { + ContainerId containerId = ContainerId.newInstance(appAttemptId, id); + NMContainerStatus containerReport = + NMContainerStatus.newInstance(containerId, containerState, + Resource.newInstance(1024, 1), "recover container", 0); + return containerReport; + } + public class TestMemoryRMStateStore extends MemoryRMStateStore { int count = 0; public int updateApp = 0; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 2f16b85699d..e4a9a1c361f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; @@ -487,33 +488,37 @@ public class TestResourceTrackerService { RMApp app = rm.submitApp(1024, true); // Case 1.1: AppAttemptId is null - ContainerStatus status = ContainerStatus.newInstance( - ContainerId.newInstance(ApplicationAttemptId.newInstance( - app.getApplicationId(), 2), 1), - ContainerState.COMPLETE, "Dummy Completed", 0); - rm.getResourceTrackerService().handleContainerStatus(status); + NMContainerStatus report = + NMContainerStatus.newInstance( + ContainerId.newInstance( + ApplicationAttemptId.newInstance(app.getApplicationId(), 2), 1), + ContainerState.COMPLETE, Resource.newInstance(1024, 1), + "Dummy Completed", 0); + rm.getResourceTrackerService().handleNMContainerStatus(report); verify(handler, never()).handle((Event) any()); // Case 1.2: Master container is null RMAppAttemptImpl currentAttempt = (RMAppAttemptImpl) app.getCurrentAppAttempt(); currentAttempt.setMasterContainer(null); - status = ContainerStatus.newInstance( - ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0), - ContainerState.COMPLETE, "Dummy Completed", 0); - rm.getResourceTrackerService().handleContainerStatus(status); + report = NMContainerStatus.newInstance( + ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0), + ContainerState.COMPLETE, Resource.newInstance(1024, 1), + "Dummy Completed", 0); + rm.getResourceTrackerService().handleNMContainerStatus(report); verify(handler, never()).handle((Event)any()); // Case 2: Managed AM app = rm.submitApp(1024); // Case 2.1: AppAttemptId is null - status = ContainerStatus.newInstance( - ContainerId.newInstance(ApplicationAttemptId.newInstance( - app.getApplicationId(), 2), 1), - ContainerState.COMPLETE, "Dummy Completed", 0); + report = NMContainerStatus.newInstance( + ContainerId.newInstance( + ApplicationAttemptId.newInstance(app.getApplicationId(), 2), 1), + ContainerState.COMPLETE, Resource.newInstance(1024, 1), + "Dummy Completed", 0); try { - rm.getResourceTrackerService().handleContainerStatus(status); + rm.getResourceTrackerService().handleNMContainerStatus(report); } catch (Exception e) { // expected - ignore } @@ -523,11 +528,12 @@ public class TestResourceTrackerService { currentAttempt = (RMAppAttemptImpl) app.getCurrentAppAttempt(); currentAttempt.setMasterContainer(null); - status = ContainerStatus.newInstance( - ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0), - ContainerState.COMPLETE, "Dummy Completed", 0); + report = NMContainerStatus.newInstance( + ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0), + ContainerState.COMPLETE, Resource.newInstance(1024, 1), + "Dummy Completed", 0); try { - rm.getResourceTrackerService().handleContainerStatus(status); + rm.getResourceTrackerService().handleNMContainerStatus(report); } catch (Exception e) { // expected - ignore }