diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java index c795e556c46..f238f79f172 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java @@ -24,6 +24,7 @@ import java.util.Set; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.util.Records; @@ -47,7 +48,7 @@ public abstract class NodeHeartbeatRequest { public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus, MasterKey lastKnownContainerTokenMasterKey, MasterKey lastKnownNMTokenMasterKey, Set nodeLabels, - Map registeredCollectors) { + Map registeringCollectors) { NodeHeartbeatRequest nodeHeartbeatRequest = Records.newRecord(NodeHeartbeatRequest.class); nodeHeartbeatRequest.setNodeStatus(nodeStatus); @@ -56,7 +57,7 @@ public abstract class NodeHeartbeatRequest { nodeHeartbeatRequest .setLastKnownNMTokenMasterKey(lastKnownNMTokenMasterKey); nodeHeartbeatRequest.setNodeLabels(nodeLabels); - nodeHeartbeatRequest.setRegisteredCollectors(registeredCollectors); + nodeHeartbeatRequest.setRegisteringCollectors(registeringCollectors); return nodeHeartbeatRequest; } @@ -79,7 +80,9 @@ public abstract class NodeHeartbeatRequest { List logAggregationReportsForApps); // This tells RM registered collectors' address info on this node - public abstract Map getRegisteredCollectors(); - public abstract void setRegisteredCollectors(Map appCollectorsMap); + public abstract Map + getRegisteringCollectors(); + + public abstract void setRegisteringCollectors(Map appCollectorsMap); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index 7568bbb0584..60028fd1d15 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; @@ -47,10 +48,9 @@ public abstract class NodeHeartbeatResponse { public abstract List getApplicationsToCleanup(); // This tells NM the collectors' address info of related apps - public abstract Map getAppCollectorsMap(); - - public abstract void setAppCollectorsMap( - Map appCollectorsMap); + public abstract Map getAppCollectors(); + public abstract void setAppCollectors( + Map appCollectorsMap); public abstract void setResponseId(int responseId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java index 3498de90a3c..1503eca6e8e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java @@ -22,14 +22,14 @@ import java.util.Arrays; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.util.Records; @Private public abstract class ReportNewCollectorInfoRequest { public static ReportNewCollectorInfoRequest newInstance( - List appCollectorsList) { + List appCollectorsList) { ReportNewCollectorInfoRequest request = Records.newRecord(ReportNewCollectorInfoRequest.class); request.setAppCollectorsList(appCollectorsList); @@ -41,13 +41,13 @@ public abstract class ReportNewCollectorInfoRequest { ReportNewCollectorInfoRequest request = Records.newRecord(ReportNewCollectorInfoRequest.class); request.setAppCollectorsList( - Arrays.asList(AppCollectorsMap.newInstance(id, collectorAddr))); + Arrays.asList(AppCollectorData.newInstance(id, collectorAddr))); return request; } - public abstract List getAppCollectorsList(); + public abstract List getAppCollectorsList(); public abstract void setAppCollectorsList( - List appCollectorsList); + List appCollectorsList); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java index d0c11985db3..73a8abe0fcc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java @@ -35,13 +35,14 @@ import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto; -import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; @@ -58,7 +59,7 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest { private Set labels = null; private List logAggregationReportsForApps = null; - private Map registeredCollectors = null; + private Map registeringCollectors = null; public NodeHeartbeatRequestPBImpl() { builder = NodeHeartbeatRequestProto.newBuilder(); @@ -114,8 +115,8 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest { if (this.logAggregationReportsForApps != null) { addLogAggregationStatusForAppsToProto(); } - if (this.registeredCollectors != null) { - addRegisteredCollectorsToProto(); + if (this.registeringCollectors != null) { + addRegisteringCollectorsToProto(); } } @@ -158,14 +159,14 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest { return ((LogAggregationReportPBImpl) value).getProto(); } - private void addRegisteredCollectorsToProto() { + private void addRegisteringCollectorsToProto() { maybeInitBuilder(); - builder.clearRegisteredCollectors(); - for (Map.Entry entry : - registeredCollectors.entrySet()) { - builder.addRegisteredCollectors(AppCollectorsMapProto.newBuilder() + builder.clearRegisteringCollectors(); + for (Map.Entry entry : + registeringCollectors.entrySet()) { + builder.addRegisteringCollectors(AppCollectorDataProto.newBuilder() .setAppId(convertToProtoFormat(entry.getKey())) - .setAppCollectorAddr(entry.getValue())); + .setAppCollectorAddr(entry.getValue().getCollectorAddr())); } } @@ -251,35 +252,37 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest { } @Override - public Map getRegisteredCollectors() { - if (this.registeredCollectors != null) { - return this.registeredCollectors; + public Map getRegisteringCollectors() { + if (this.registeringCollectors != null) { + return this.registeringCollectors; } initRegisteredCollectors(); - return registeredCollectors; + return registeringCollectors; } private void initRegisteredCollectors() { NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder; - List list = p.getRegisteredCollectorsList(); + List list = p.getRegisteringCollectorsList(); if (!list.isEmpty()) { - this.registeredCollectors = new HashMap<>(); - for (AppCollectorsMapProto c : list) { + this.registeringCollectors = new HashMap<>(); + for (AppCollectorDataProto c : list) { ApplicationId appId = convertFromProtoFormat(c.getAppId()); - this.registeredCollectors.put(appId, c.getAppCollectorAddr()); + AppCollectorData data = AppCollectorData.newInstance(appId, + c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion()); + this.registeringCollectors.put(appId, data); } } } @Override - public void setRegisteredCollectors( - Map registeredCollectors) { + public void setRegisteringCollectors( + Map registeredCollectors) { if (registeredCollectors == null || registeredCollectors.isEmpty()) { return; } maybeInitBuilder(); - this.registeredCollectors = new HashMap(); - this.registeredCollectors.putAll(registeredCollectors); + this.registeringCollectors = new HashMap<>(); + this.registeringCollectors.putAll(registeredCollectors); } private NodeStatusPBImpl convertFromProtoFormat(NodeStatusProto p) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 37bf5706b03..409e9961221 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -45,11 +45,12 @@ import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueui import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto; -import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; @@ -70,7 +71,7 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse { private List applicationsToCleanup = null; private Map systemCredentials = null; private Resource resource = null; - private Map appCollectorsMap = null; + private Map appCollectorsMap = null; private MasterKey containerTokenMasterKey = null; private MasterKey nmTokenMasterKey = null; @@ -146,11 +147,15 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse { private void addAppCollectorsMapToProto() { maybeInitBuilder(); - builder.clearAppCollectorsMap(); - for (Map.Entry entry : appCollectorsMap.entrySet()) { - builder.addAppCollectorsMap(AppCollectorsMapProto.newBuilder() + builder.clearAppCollectors(); + for (Map.Entry entry + : appCollectorsMap.entrySet()) { + AppCollectorData data = entry.getValue(); + builder.addAppCollectors(AppCollectorDataProto.newBuilder() .setAppId(convertToProtoFormat(entry.getKey())) - .setAppCollectorAddr(entry.getValue())); + .setAppCollectorAddr(data.getCollectorAddr()) + .setRmIdentifier(data.getRMIdentifier()) + .setVersion(data.getVersion())); } } @@ -568,7 +573,7 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse { } @Override - public Map getAppCollectorsMap() { + public Map getAppCollectors() { if (this.appCollectorsMap != null) { return this.appCollectorsMap; } @@ -589,12 +594,14 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse { private void initAppCollectorsMap() { NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; - List list = p.getAppCollectorsMapList(); + List list = p.getAppCollectorsList(); if (!list.isEmpty()) { this.appCollectorsMap = new HashMap<>(); - for (AppCollectorsMapProto c : list) { + for (AppCollectorDataProto c : list) { ApplicationId appId = convertFromProtoFormat(c.getAppId()); - this.appCollectorsMap.put(appId, c.getAppCollectorAddr()); + AppCollectorData data = AppCollectorData.newInstance(appId, + c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion()); + this.appCollectorsMap.put(appId, data); } } } @@ -611,14 +618,14 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse { } @Override - public void setAppCollectorsMap( - Map appCollectorsMap) { - if (appCollectorsMap == null || appCollectorsMap.isEmpty()) { + public void setAppCollectors( + Map appCollectors) { + if (appCollectors == null || appCollectors.isEmpty()) { return; } maybeInitBuilder(); - this.appCollectorsMap = new HashMap(); - this.appCollectorsMap.putAll(appCollectorsMap); + this.appCollectorsMap = new HashMap<>(); + this.appCollectorsMap.putAll(appCollectors); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java index c6f66194e5d..3f3dcf52e9f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java @@ -20,12 +20,12 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; import java.util.ArrayList; import java.util.List; -import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProtoOrBuilder; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; -import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap; -import org.apache.hadoop.yarn.server.api.records.impl.pb.AppCollectorsMapPBImpl; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; +import org.apache.hadoop.yarn.server.api.records.impl.pb.AppCollectorDataPBImpl; public class ReportNewCollectorInfoRequestPBImpl extends ReportNewCollectorInfoRequest { @@ -36,7 +36,7 @@ public class ReportNewCollectorInfoRequestPBImpl extends private ReportNewCollectorInfoRequestProto.Builder builder = null; private boolean viaProto = false; - private List collectorsList = null; + private List collectorsList = null; public ReportNewCollectorInfoRequestPBImpl() { builder = ReportNewCollectorInfoRequestProto.newBuilder(); @@ -96,9 +96,9 @@ public class ReportNewCollectorInfoRequestPBImpl extends private void addLocalCollectorsToProto() { maybeInitBuilder(); builder.clearAppCollectors(); - List protoList = - new ArrayList(); - for (AppCollectorsMap m : this.collectorsList) { + List protoList = + new ArrayList(); + for (AppCollectorData m : this.collectorsList) { protoList.add(convertToProtoFormat(m)); } builder.addAllAppCollectors(protoList); @@ -106,16 +106,16 @@ public class ReportNewCollectorInfoRequestPBImpl extends private void initLocalCollectorsList() { ReportNewCollectorInfoRequestProtoOrBuilder p = viaProto ? proto : builder; - List list = + List list = p.getAppCollectorsList(); - this.collectorsList = new ArrayList(); - for (AppCollectorsMapProto m : list) { + this.collectorsList = new ArrayList(); + for (AppCollectorDataProto m : list) { this.collectorsList.add(convertFromProtoFormat(m)); } } @Override - public List getAppCollectorsList() { + public List getAppCollectorsList() { if (this.collectorsList == null) { initLocalCollectorsList(); } @@ -123,7 +123,7 @@ public class ReportNewCollectorInfoRequestPBImpl extends } @Override - public void setAppCollectorsList(List appCollectorsList) { + public void setAppCollectorsList(List appCollectorsList) { maybeInitBuilder(); if (appCollectorsList == null) { builder.clearAppCollectors(); @@ -131,14 +131,14 @@ public class ReportNewCollectorInfoRequestPBImpl extends this.collectorsList = appCollectorsList; } - private AppCollectorsMapPBImpl convertFromProtoFormat( - AppCollectorsMapProto p) { - return new AppCollectorsMapPBImpl(p); + private AppCollectorDataPBImpl convertFromProtoFormat( + AppCollectorDataProto p) { + return new AppCollectorDataPBImpl(p); } - private AppCollectorsMapProto convertToProtoFormat( - AppCollectorsMap m) { - return ((AppCollectorsMapPBImpl) m).getProto(); + private AppCollectorDataProto convertToProtoFormat( + AppCollectorData m) { + return ((AppCollectorDataPBImpl) m).getProto(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java new file mode 100644 index 00000000000..da2e5de8aa3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.util.Records; + + +@Private +@InterfaceStability.Unstable +public abstract class AppCollectorData { + + protected static final long DEFAULT_TIMESTAMP_VALUE = -1; + + public static AppCollectorData newInstance( + ApplicationId id, String collectorAddr, long rmIdentifier, long version) { + AppCollectorData appCollectorData = + Records.newRecord(AppCollectorData.class); + appCollectorData.setApplicationId(id); + appCollectorData.setCollectorAddr(collectorAddr); + appCollectorData.setRMIdentifier(rmIdentifier); + appCollectorData.setVersion(version); + return appCollectorData; + } + + public static AppCollectorData newInstance(ApplicationId id, + String collectorAddr) { + return newInstance(id, collectorAddr, DEFAULT_TIMESTAMP_VALUE, + DEFAULT_TIMESTAMP_VALUE); + } + + /** + * Returns if a collector data item happens before another one. Null data + * items happens before any other non-null items. Non-null data items A + * happens before another non-null item B when A's rmIdentifier is less than + * B's rmIdentifier. Or A's version is less than B's if they have the same + * rmIdentifier. + * + * @param dataA first collector data item. + * @param dataB second collector data item. + * @return true if dataA happens before dataB. + */ + public static boolean happensBefore(AppCollectorData dataA, + AppCollectorData dataB) { + if (dataA == null && dataB == null) { + return false; + } else if (dataA == null || dataB == null) { + return dataA == null; + } + + return + (dataA.getRMIdentifier() < dataB.getRMIdentifier()) + || ((dataA.getRMIdentifier() == dataB.getRMIdentifier()) + && (dataA.getVersion() < dataB.getVersion())); + } + + /** + * Returns if the collector data has been stamped by the RM with a RM cluster + * timestamp and a version number. + * + * @return true if RM has already assigned a timestamp for this collector. + * Otherwise, it means the RM has not recognized the existence of this + * collector. + */ + public boolean isStamped() { + return (getRMIdentifier() != DEFAULT_TIMESTAMP_VALUE) + || (getVersion() != DEFAULT_TIMESTAMP_VALUE); + } + + public abstract ApplicationId getApplicationId(); + + public abstract void setApplicationId(ApplicationId id); + + public abstract String getCollectorAddr(); + + public abstract void setCollectorAddr(String addr); + + public abstract long getRMIdentifier(); + + public abstract void setRMIdentifier(long rmId); + + public abstract long getVersion(); + + public abstract void setVersion(long version); + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorsMap.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorsMap.java deleted file mode 100644 index 07e1d92898f..00000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorsMap.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.api.records; - -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.util.Records; - - -@Private -public abstract class AppCollectorsMap { - - public static AppCollectorsMap newInstance( - ApplicationId id, String collectorAddr) { - AppCollectorsMap appCollectorsMap = - Records.newRecord(AppCollectorsMap.class); - appCollectorsMap.setApplicationId(id); - appCollectorsMap.setCollectorAddr(collectorAddr); - return appCollectorsMap; - } - - public abstract ApplicationId getApplicationId(); - - public abstract void setApplicationId(ApplicationId id); - - public abstract String getCollectorAddr(); - - public abstract void setCollectorAddr(String addr); - -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorsMapPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java similarity index 65% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorsMapPBImpl.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java index 3740035d836..7d3a80545b5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorsMapPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java @@ -21,37 +21,39 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; -import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; -import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProto; -import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProtoOrBuilder; import com.google.protobuf.TextFormat; @Private @Unstable -public class AppCollectorsMapPBImpl extends AppCollectorsMap { +public class AppCollectorDataPBImpl extends AppCollectorData { - private AppCollectorsMapProto proto = - AppCollectorsMapProto.getDefaultInstance(); + private AppCollectorDataProto proto = + AppCollectorDataProto.getDefaultInstance(); - private AppCollectorsMapProto.Builder builder = null; + private AppCollectorDataProto.Builder builder = null; private boolean viaProto = false; private ApplicationId appId = null; private String collectorAddr = null; + private Long rmIdentifier = null; + private Long version = null; - public AppCollectorsMapPBImpl() { - builder = AppCollectorsMapProto.newBuilder(); + public AppCollectorDataPBImpl() { + builder = AppCollectorDataProto.newBuilder(); } - public AppCollectorsMapPBImpl(AppCollectorsMapProto proto) { + public AppCollectorDataPBImpl(AppCollectorDataProto proto) { this.proto = proto; viaProto = true; } - public AppCollectorsMapProto getProto() { + public AppCollectorDataProto getProto() { mergeLocalToProto(); proto = viaProto ? proto : builder.build(); viaProto = true; @@ -81,7 +83,7 @@ public class AppCollectorsMapPBImpl extends AppCollectorsMap { @Override public ApplicationId getApplicationId() { - AppCollectorsMapProtoOrBuilder p = viaProto ? proto : builder; + AppCollectorDataProtoOrBuilder p = viaProto ? proto : builder; if (this.appId == null && p.hasAppId()) { this.appId = convertFromProtoFormat(p.getAppId()); } @@ -90,7 +92,7 @@ public class AppCollectorsMapPBImpl extends AppCollectorsMap { @Override public String getCollectorAddr() { - AppCollectorsMapProtoOrBuilder p = viaProto ? proto : builder; + AppCollectorDataProtoOrBuilder p = viaProto ? proto : builder; if (this.collectorAddr == null && p.hasAppCollectorAddr()) { this.collectorAddr = p.getAppCollectorAddr(); @@ -116,6 +118,46 @@ public class AppCollectorsMapPBImpl extends AppCollectorsMap { this.collectorAddr = collectorAddr; } + @Override + public long getRMIdentifier() { + AppCollectorDataProtoOrBuilder p = viaProto ? proto : builder; + if (this.rmIdentifier == null && p.hasRmIdentifier()) { + this.rmIdentifier = p.getRmIdentifier(); + } + if (this.rmIdentifier != null) { + return this.rmIdentifier; + } else { + return AppCollectorData.DEFAULT_TIMESTAMP_VALUE; + } + } + + @Override + public void setRMIdentifier(long rmId) { + maybeInitBuilder(); + this.rmIdentifier = rmId; + builder.setRmIdentifier(rmId); + } + + @Override + public long getVersion() { + AppCollectorDataProtoOrBuilder p = viaProto ? proto : builder; + if (this.version == null && p.hasRmIdentifier()) { + this.version = p.getRmIdentifier(); + } + if (this.version != null) { + return this.version; + } else { + return AppCollectorData.DEFAULT_TIMESTAMP_VALUE; + } + } + + @Override + public void setVersion(long version) { + maybeInitBuilder(); + this.version = version; + builder.setVersion(version); + } + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { return new ApplicationIdPBImpl(p); } @@ -126,7 +168,7 @@ public class AppCollectorsMapPBImpl extends AppCollectorsMap { private void maybeInitBuilder() { if (viaProto || builder == null) { - builder = AppCollectorsMapProto.newBuilder(proto); + builder = AppCollectorDataProto.newBuilder(proto); } viaProto = false; } @@ -147,6 +189,12 @@ public class AppCollectorsMapPBImpl extends AppCollectorsMap { if (this.collectorAddr != null) { builder.setAppCollectorAddr(this.collectorAddr); } + if (this.rmIdentifier != null) { + builder.setRmIdentifier(this.rmIdentifier); + } + if (this.version != null) { + builder.setVersion(this.version); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppCollectorUpdateEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/package-info.java similarity index 58% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppCollectorUpdateEvent.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/package-info.java index 964291123ae..4ce3896fbb9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppCollectorUpdateEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/package-info.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,26 +15,5 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -package org.apache.hadoop.yarn.server.resourcemanager.rmapp; - -import org.apache.hadoop.yarn.api.records.ApplicationId; - -/** - * Event used for updating collector address in RMApp on node heartbeat. - */ -public class RMAppCollectorUpdateEvent extends RMAppEvent { - - private final String appCollectorAddr; - - public RMAppCollectorUpdateEvent(ApplicationId appId, - String appCollectorAddr) { - super(appId, RMAppEventType.COLLECTOR_UPDATE); - this.appCollectorAddr = appCollectorAddr; - } - - public String getAppCollectorAddr(){ - return this.appCollectorAddr; - } - -} +/** Server records PB implementations. */ +package org.apache.hadoop.yarn.server.api.records.impl.pb; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 4350fc53a59..9568c638023 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -89,7 +89,7 @@ message NodeHeartbeatRequestProto { optional MasterKeyProto last_known_nm_token_master_key = 3; optional NodeLabelsProto nodeLabels = 4; repeated LogAggregationReportProto log_aggregation_reports_for_apps = 5; - repeated AppCollectorsMapProto registered_collectors = 6; + repeated AppCollectorDataProto registering_collectors = 6; } message LogAggregationReportProto { @@ -114,7 +114,7 @@ message NodeHeartbeatResponseProto { repeated SignalContainerRequestProto containers_to_signal = 13; optional ResourceProto resource = 14; optional ContainerQueuingLimitProto container_queuing_limit = 15; - repeated AppCollectorsMapProto app_collectors_map = 16; + repeated AppCollectorDataProto app_collectors = 16; } message ContainerQueuingLimitProto { @@ -130,16 +130,18 @@ message SystemCredentialsForAppsProto { //////////////////////////////////////////////////////////////////////// ////// From collector_nodemanager_protocol //////////////////////////// //////////////////////////////////////////////////////////////////////// -message AppCollectorsMapProto { - optional ApplicationIdProto appId = 1; - optional string appCollectorAddr = 2; +message AppCollectorDataProto { + optional ApplicationIdProto app_id = 1; + optional string app_collector_addr = 2; + optional int64 rm_identifier = 3 [default = -1]; + optional int64 version = 4 [default = -1]; } ////////////////////////////////////////////////////// /////// collector_nodemanager_protocol ////////////// ////////////////////////////////////////////////////// message ReportNewCollectorInfoRequestProto { - repeated AppCollectorsMapProto app_collectors = 1; + repeated AppCollectorDataProto app_collectors = 1; } message ReportNewCollectorInfoResponseProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java index e5d159b2ad6..7eb89445016 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java @@ -75,7 +75,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorCon import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse; -import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.junit.Test; @@ -420,10 +420,10 @@ public class TestRPC { public ReportNewCollectorInfoResponse reportNewCollectorInfo( ReportNewCollectorInfoRequest request) throws YarnException, IOException { - List appCollectors = request.getAppCollectorsList(); + List appCollectors = request.getAppCollectorsList(); if (appCollectors.size() == 1) { // check default appID and collectorAddr - AppCollectorsMap appCollector = appCollectors.get(0); + AppCollectorData appCollector = appCollectors.get(0); Assert.assertEquals(appCollector.getApplicationId(), DEFAULT_APP_ID); Assert.assertEquals(appCollector.getCollectorAddr(), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java index be350e2651b..944fc0e86bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java @@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatRe import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UnRegisterNodeManagerRequestPBImpl; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; @@ -109,14 +110,14 @@ public class TestYarnServerApiClasses { original.setLastKnownNMTokenMasterKey(getMasterKey()); original.setNodeStatus(getNodeStatus()); original.setNodeLabels(getValidNodeLabels()); - Map collectors = getCollectors(); - original.setRegisteredCollectors(collectors); + Map collectors = getCollectors(); + original.setRegisteringCollectors(collectors); NodeHeartbeatRequestPBImpl copy = new NodeHeartbeatRequestPBImpl( original.getProto()); assertEquals(1, copy.getLastKnownContainerTokenMasterKey().getKeyId()); assertEquals(1, copy.getLastKnownNMTokenMasterKey().getKeyId()); assertEquals("localhost", copy.getNodeStatus().getNodeId().getHost()); - assertEquals(collectors, copy.getRegisteredCollectors()); + assertEquals(collectors, copy.getRegisteringCollectors()); // check labels are coming with valid values Assert.assertTrue(original.getNodeLabels() .containsAll(copy.getNodeLabels())); @@ -153,8 +154,8 @@ public class TestYarnServerApiClasses { original.setNextHeartBeatInterval(1000); original.setNodeAction(NodeAction.NORMAL); original.setResponseId(100); - Map collectors = getCollectors(); - original.setAppCollectorsMap(collectors); + Map collectors = getCollectors(); + original.setAppCollectors(collectors); NodeHeartbeatResponsePBImpl copy = new NodeHeartbeatResponsePBImpl( original.getProto()); @@ -164,7 +165,7 @@ public class TestYarnServerApiClasses { assertEquals(1, copy.getContainerTokenMasterKey().getKeyId()); assertEquals(1, copy.getNMTokenMasterKey().getKeyId()); assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage()); - assertEquals(collectors, copy.getAppCollectorsMap()); + assertEquals(collectors, copy.getAppCollectors()); assertEquals(false, copy.getAreNodeLabelsAcceptedByRM()); } @@ -344,12 +345,13 @@ public class TestYarnServerApiClasses { return nodeLabels; } - private Map getCollectors() { + private Map getCollectors() { ApplicationId appID = ApplicationId.newInstance(1L, 1); String collectorAddr = "localhost:0"; - Map collectorMap = - new HashMap(); - collectorMap.put(appID, collectorAddr); + AppCollectorData data = AppCollectorData.newInstance(appID, collectorAddr); + Map collectorMap = + new HashMap<>(); + collectorMap.put(appID, data); return collectorMap; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index e888393e4c8..b92526b4ecd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; @@ -74,11 +75,18 @@ public interface Context { Map getSystemCredentialsForApps(); /** - * Get the registered collectors that located on this NM. - * @return registered collectors, or null if the timeline service v.2 is not + * Get the list of collectors that are registering with the RM from this node. + * @return registering collectors, or null if the timeline service v.2 is not * enabled */ - Map getRegisteredCollectors(); + Map getRegisteringCollectors(); + + /** + * Get the list of collectors registered with the RM and known by this node. + * @return known collectors, or null if the timeline service v.2 is not + * enabled. + */ + Map getKnownCollectors(); ConcurrentMap getContainers(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 0f0a0812dd9..cd2ec5da6ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager; import org.apache.hadoop.yarn.server.nodemanager.collectormanager.NMCollectorService; @@ -486,7 +487,9 @@ public class NodeManager extends CompositeService protected final ConcurrentMap containers = new ConcurrentSkipListMap(); - private Map registeredCollectors; + private Map registeringCollectors; + + private Map knownCollectors; protected final ConcurrentMap increasedContainers = @@ -521,7 +524,8 @@ public class NodeManager extends CompositeService NMStateStoreService stateStore, boolean isDistSchedulingEnabled, Configuration conf) { if (YarnConfiguration.timelineServiceV2Enabled(conf)) { - this.registeredCollectors = new ConcurrentHashMap<>(); + this.registeringCollectors = new ConcurrentHashMap<>(); + this.knownCollectors = new ConcurrentHashMap<>(); } this.containerTokenSecretManager = containerTokenSecretManager; this.nmTokenSecretManager = nmTokenSecretManager; @@ -682,18 +686,13 @@ public class NodeManager extends CompositeService } @Override - public Map getRegisteredCollectors() { - return this.registeredCollectors; + public Map getRegisteringCollectors() { + return this.registeringCollectors; } - public void addRegisteredCollectors( - Map newRegisteredCollectors) { - if (registeredCollectors != null) { - this.registeredCollectors.putAll(newRegisteredCollectors); - } else { - LOG.warn("collectors are added when the registered collectors are " + - "initialized"); - } + @Override + public Map getKnownCollectors() { + return this.knownCollectors; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index ec7e1d90d30..fa118df290b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; import org.apache.hadoop.yarn.server.api.records.MasterKey; @@ -822,7 +823,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements .getNMTokenSecretManager().getCurrentKey(), nodeLabelsForHeartbeat, NodeStatusUpdaterImpl.this.context - .getRegisteredCollectors()); + .getRegisteringCollectors()); if (logAggregationEnabled) { // pull log aggregation status for application running in this NM @@ -915,7 +916,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements } } if (YarnConfiguration.timelineServiceV2Enabled(context.getConf())) { - updateTimelineClientsAddress(response); + updateTimelineCollectorData(response); } } catch (ConnectException e) { @@ -945,40 +946,48 @@ public class NodeStatusUpdaterImpl extends AbstractService implements } } - private void updateTimelineClientsAddress( + private void updateTimelineCollectorData( NodeHeartbeatResponse response) { - Map knownCollectorsMap = - response.getAppCollectorsMap(); - if (knownCollectorsMap == null) { + Map incomingCollectorsMap = + response.getAppCollectors(); + if (incomingCollectorsMap == null) { if (LOG.isDebugEnabled()) { LOG.debug("No collectors to update RM"); } - } else { - Set> rmKnownCollectors = - knownCollectorsMap.entrySet(); - for (Map.Entry entry : rmKnownCollectors) { - ApplicationId appId = entry.getKey(); - String collectorAddr = entry.getValue(); - - // Only handle applications running on local node. - // Not include apps with timeline collectors running in local - Application application = context.getApplications().get(appId); - // TODO this logic could be problematic if the collector address - // gets updated due to NM restart or collector service failure - if (application != null && - !context.getRegisteredCollectors().containsKey(appId)) { + return; + } + Map knownCollectors + = context.getKnownCollectors(); + for (Map.Entry entry + : incomingCollectorsMap.entrySet()) { + ApplicationId appId = entry.getKey(); + AppCollectorData collectorData = entry.getValue(); + // Only handle applications running on local node. + Application application = context.getApplications().get(appId); + if (application != null) { + // Update collector data if the newly received data happens after + // the known data (updates the known data). + AppCollectorData existingData = knownCollectors.get(appId); + if (AppCollectorData.happensBefore(existingData, collectorData)) { if (LOG.isDebugEnabled()) { - LOG.debug("Sync a new collector address: " + collectorAddr + - " for application: " + appId + " from RM."); + LOG.debug("Sync a new collector address: " + + collectorData.getCollectorAddr() + + " for application: " + appId + " from RM."); } - NMTimelinePublisher nmTimelinePublisher = - context.getNMTimelinePublisher(); + + // Update information for clients. + NMTimelinePublisher nmTimelinePublisher + = context.getNMTimelinePublisher(); if (nmTimelinePublisher != null) { nmTimelinePublisher.setTimelineServiceAddress( - application.getAppId(), collectorAddr); + application.getAppId(), collectorData.getCollectorAddr()); } + // Update information for the node manager itself. + knownCollectors.put(appId, collectorData); } } + // Remove the registering collector data + context.getRegisteringCollectors().remove(entry.getKey()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java index d667c0ee246..7fdca783093 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java @@ -37,9 +37,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorCon import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse; -import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.nodemanager.Context; -import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; @@ -107,23 +106,31 @@ public class NMCollectorService extends CompositeService implements @Override public ReportNewCollectorInfoResponse reportNewCollectorInfo( ReportNewCollectorInfoRequest request) throws YarnException, IOException { - List newCollectorsList = request.getAppCollectorsList(); + List newCollectorsList = request.getAppCollectorsList(); if (newCollectorsList != null && !newCollectorsList.isEmpty()) { - Map newCollectorsMap = - new HashMap(); - for (AppCollectorsMap collector : newCollectorsList) { + Map newCollectorsMap = + new HashMap<>(); + for (AppCollectorData collector : newCollectorsList) { ApplicationId appId = collector.getApplicationId(); - String collectorAddr = collector.getCollectorAddr(); - newCollectorsMap.put(appId, collectorAddr); + newCollectorsMap.put(appId, collector); // set registered collector address to TimelineClient. + // TODO: Do we need to do this after we received confirmation from + // the RM? NMTimelinePublisher nmTimelinePublisher = context.getNMTimelinePublisher(); if (nmTimelinePublisher != null) { - nmTimelinePublisher.setTimelineServiceAddress(appId, collectorAddr); + nmTimelinePublisher.setTimelineServiceAddress(appId, + collector.getCollectorAddr()); } } - ((NodeManager.NMContext)context).addRegisteredCollectors( - newCollectorsMap); + Map registeringCollectors + = context.getRegisteringCollectors(); + if (registeringCollectors != null) { + registeringCollectors.putAll(newCollectorsMap); + } else { + LOG.warn("collectors are added when the registered collectors are " + + "initialized"); + } } return ReportNewCollectorInfoResponse.newInstance(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index b9197c22943..10e1e1f6160 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType; @@ -544,6 +545,29 @@ public class ApplicationImpl implements Application { @SuppressWarnings("unchecked") static class AppCompletelyDoneTransition implements SingleArcTransition { + + private void updateCollectorStatus(ApplicationImpl app) { + // Remove collectors info for finished apps. + // TODO check we remove related collectors info in failure cases + // (YARN-3038) + Map registeringCollectors + = app.context.getRegisteringCollectors(); + if (registeringCollectors != null) { + registeringCollectors.remove(app.getAppId()); + } + Map knownCollectors = + app.context.getKnownCollectors(); + if (knownCollectors != null) { + knownCollectors.remove(app.getAppId()); + } + // stop timelineClient when application get finished. + NMTimelinePublisher nmTimelinePublisher = + app.context.getNMTimelinePublisher(); + if (nmTimelinePublisher != null) { + nmTimelinePublisher.stopTimelineClient(app.getAppId()); + } + } + @Override public void transition(ApplicationImpl app, ApplicationEvent event) { @@ -552,20 +576,7 @@ public class ApplicationImpl implements Application { new LogHandlerAppFinishedEvent(app.appId)); app.context.getNMTokenSecretManager().appFinished(app.getAppId()); - // Remove collectors info for finished apps. - // TODO check we remove related collectors info in failure cases - // (YARN-3038) - Map registeredCollectors = - app.context.getRegisteredCollectors(); - if (registeredCollectors != null) { - registeredCollectors.remove(app.getAppId()); - } - // stop timelineClient when application get finished. - NMTimelinePublisher nmTimelinePublisher = - app.context.getNMTimelinePublisher(); - if (nmTimelinePublisher != null) { - nmTimelinePublisher.stopTimelineClient(app.getAppId()); - } + updateCollectorStatus(app); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index 031300f92c4..7c4e1cc0fa9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; @@ -618,8 +619,11 @@ public abstract class BaseAMRMProxyTest { return null; } - @Override - public Map getRegisteredCollectors() { + public Map getRegisteringCollectors() { + return null; + } + + @Override public Map getKnownCollectors() { return null; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 4d73ba23eeb..ce1a66648ae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -82,8 +82,10 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; @@ -319,7 +321,7 @@ public class ApplicationMasterService extends AbstractService implements // Remove collector address when app get finished. if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) { - rmApp.removeCollectorAddr(); + ((RMAppImpl) rmApp).removeCollectorData(); } // checking whether the app exits in RMStateStore at first not to throw // ApplicationDoesNotExistInCacheException before and after @@ -605,8 +607,10 @@ public class ApplicationMasterService extends AbstractService implements // add collector address for this application if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) { - allocateResponse.setCollectorAddr( - this.rmContext.getRMApps().get(applicationId).getCollectorAddr()); + AppCollectorData data = app.getCollectorData(); + if (data != null) { + allocateResponse.setCollectorAddr(data.getCollectorAddr()); + } } // add preemption to the allocateResponse message (if any) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 51fc0bdb978..d3886b27895 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; @@ -63,13 +64,14 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerResponse; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils; import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppCollectorUpdateEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -119,6 +121,8 @@ public class ResourceTrackerService extends AbstractService implements private boolean isDelegatedCentralizedNodeLabelsConf; private DynamicResourceConfiguration drConf; + private final AtomicLong timelineCollectorVersion = new AtomicLong(0); + public ResourceTrackerService(RMContext rmContext, NodesListManager nodesListManager, NMLivelinessMonitor nmLivelinessMonitor, @@ -525,9 +529,6 @@ public class ResourceTrackerService extends AbstractService implements YarnConfiguration.timelineServiceV2Enabled(getConfig()); if (timelineV2Enabled) { // Check & update collectors info from request. - // TODO make sure it won't have race condition issue for AM failed over - // case that the older registration could possible override the newer - // one. updateAppCollectorsMap(request); } @@ -613,14 +614,14 @@ public class ResourceTrackerService extends AbstractService implements private void setAppCollectorsMapToResponse( List runningApps, NodeHeartbeatResponse response) { - Map liveAppCollectorsMap = new - HashMap(); + Map liveAppCollectorsMap = new + HashMap<>(); Map rmApps = rmContext.getRMApps(); // Set collectors for all running apps on this node. for (ApplicationId appId : runningApps) { - String appCollectorAddr = rmApps.get(appId).getCollectorAddr(); - if (appCollectorAddr != null) { - liveAppCollectorsMap.put(appId, appCollectorAddr); + AppCollectorData appCollectorData = rmApps.get(appId).getCollectorData(); + if (appCollectorData != null) { + liveAppCollectorsMap.put(appId, appCollectorData); } else { if (LOG.isDebugEnabled()) { LOG.debug("Collector for applicaton: " + appId + @@ -628,32 +629,43 @@ public class ResourceTrackerService extends AbstractService implements } } } - response.setAppCollectorsMap(liveAppCollectorsMap); + response.setAppCollectors(liveAppCollectorsMap); } private void updateAppCollectorsMap(NodeHeartbeatRequest request) { - Map registeredCollectorsMap = - request.getRegisteredCollectors(); - if (registeredCollectorsMap != null - && !registeredCollectorsMap.isEmpty()) { + Map registeringCollectorsMap = + request.getRegisteringCollectors(); + if (registeringCollectorsMap != null + && !registeringCollectorsMap.isEmpty()) { Map rmApps = rmContext.getRMApps(); - for (Map.Entry entry: - registeredCollectorsMap.entrySet()) { + for (Map.Entry entry: + registeringCollectorsMap.entrySet()) { ApplicationId appId = entry.getKey(); - String collectorAddr = entry.getValue(); - if (collectorAddr != null && !collectorAddr.isEmpty()) { + AppCollectorData collectorData = entry.getValue(); + if (collectorData != null) { + if (!collectorData.isStamped()) { + // Stamp the collector if we have not done so + collectorData.setRMIdentifier( + ResourceManager.getClusterTimeStamp()); + collectorData.setVersion( + timelineCollectorVersion.getAndIncrement()); + } RMApp rmApp = rmApps.get(appId); if (rmApp == null) { LOG.warn("Cannot update collector info because application ID: " + appId + " is not found in RMContext!"); } else { - String previousCollectorAddr = rmApp.getCollectorAddr(); - if (previousCollectorAddr == null - || !previousCollectorAddr.equals(collectorAddr)) { - // sending collector update event. - RMAppCollectorUpdateEvent event = - new RMAppCollectorUpdateEvent(appId, collectorAddr); - rmContext.getDispatcher().getEventHandler().handle(event); + AppCollectorData previousCollectorData = rmApp.getCollectorData(); + if (AppCollectorData.happensBefore(previousCollectorData, + collectorData)) { + // Sending collector update event. + // Note: RM has to store the newly received collector data + // synchronously. Otherwise, the RM may send out stale collector + // data before this update is done, and the RM then crashes, the + // newly updated collector data will get lost. + LOG.info("Update collector information for application " + appId + + " with new address: " + collectorData.getCollectorAddr()); + ((RMAppImpl) rmApp).setCollectorData(collectorData); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index 98cbd92180f..c405a8da785 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -22,6 +22,8 @@ import java.util.Collection; import java.util.Map; import java.util.Set; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -36,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -177,27 +180,16 @@ public interface RMApp extends EventHandler { String getTrackingUrl(); /** - * The collector address for the application. It should be used only if the - * timeline service v.2 is enabled. + * The timeline collector information for the application. It should be used + * only if the timeline service v.2 is enabled. * - * @return the address for the application's collector, or null if the - * timeline service v.2 is not enabled. + * @return the data for the application's collector, including collector + * address, collector ID. Return null if the timeline service v.2 is not + * enabled. */ - String getCollectorAddr(); - - /** - * Set collector address for the application. It should be used only if the - * timeline service v.2 is enabled. - * - * @param collectorAddr the address of collector - */ - void setCollectorAddr(String collectorAddr); - - /** - * Remove collector address when application is finished or killed. It should - * be used only if the timeline service v.2 is enabled. - */ - void removeCollectorAddr(); + @InterfaceAudience.Private + @InterfaceStability.Unstable + AppCollectorData getCollectorData(); /** * The original tracking url for the application master. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java index 2b42638a9f0..668c5e1f734 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java @@ -31,9 +31,6 @@ public enum RMAppEventType { // Source: Scheduler APP_ACCEPTED, - // TODO add source later - COLLECTOR_UPDATE, - // Source: RMAppAttempt ATTEMPT_REGISTERED, ATTEMPT_UNREGISTERED, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 0fdc3113fa3..a6883b342d3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; @@ -157,7 +158,7 @@ public class RMAppImpl implements RMApp, Recoverable { private long storedFinishTime = 0; private int firstAttemptIdInStateStore = 1; private int nextAttemptId = 1; - private String collectorAddr; + private AppCollectorData collectorData; // This field isn't protected by readlock now. private volatile RMAppAttempt currentAttempt; private String queue; @@ -205,8 +206,6 @@ public class RMAppImpl implements RMApp, Recoverable { // Transitions from NEW state .addTransition(RMAppState.NEW, RMAppState.NEW, RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) - .addTransition(RMAppState.NEW, RMAppState.NEW, - RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING, RMAppEventType.START, new RMAppNewlySavingTransition()) .addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED, @@ -223,8 +222,6 @@ public class RMAppImpl implements RMApp, Recoverable { // Transitions from NEW_SAVING state .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING, RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) - .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING, - RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) .addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED, RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition()) .addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING, @@ -243,8 +240,6 @@ public class RMAppImpl implements RMApp, Recoverable { RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED, RMAppEventType.MOVE, new RMAppMoveTransition()) - .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED, - RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING, RMAppEventType.APP_REJECTED, new FinalSavingTransition( @@ -261,8 +256,6 @@ public class RMAppImpl implements RMApp, Recoverable { RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, RMAppEventType.MOVE, new RMAppMoveTransition()) - .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, - RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) .addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING, RMAppEventType.ATTEMPT_REGISTERED, new RMAppStateUpdateTransition( YarnApplicationState.RUNNING)) @@ -290,8 +283,6 @@ public class RMAppImpl implements RMApp, Recoverable { RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, RMAppEventType.MOVE, new RMAppMoveTransition()) - .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, - RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) .addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING, RMAppEventType.ATTEMPT_UNREGISTERED, new FinalSavingTransition( @@ -321,8 +312,6 @@ public class RMAppImpl implements RMApp, Recoverable { .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, RMAppEventType.APP_RUNNING_ON_NODE, new AppRunningOnNodeTransition()) - .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, - RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) // ignorable transitions .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL, @@ -334,8 +323,6 @@ public class RMAppImpl implements RMApp, Recoverable { .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, RMAppEventType.APP_RUNNING_ON_NODE, new AppRunningOnNodeTransition()) - .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, - RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) // ignorable transitions .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, EnumSet.of(RMAppEventType.NODE_UPDATE, @@ -347,8 +334,6 @@ public class RMAppImpl implements RMApp, Recoverable { .addTransition(RMAppState.KILLING, RMAppState.KILLING, RMAppEventType.APP_RUNNING_ON_NODE, new AppRunningOnNodeTransition()) - .addTransition(RMAppState.KILLING, RMAppState.KILLING, - RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) .addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING, RMAppEventType.ATTEMPT_KILLED, new FinalSavingTransition( @@ -620,18 +605,16 @@ public class RMAppImpl implements RMApp, Recoverable { } @Override - public String getCollectorAddr() { - return this.collectorAddr; + public AppCollectorData getCollectorData() { + return this.collectorData; } - @Override - public void setCollectorAddr(String collectorAddress) { - this.collectorAddr = collectorAddress; + public void setCollectorData(AppCollectorData incomingData) { + this.collectorData = incomingData; } - @Override - public void removeCollectorAddr() { - this.collectorAddr = null; + public void removeCollectorData() { + this.collectorData = null; } @Override @@ -979,24 +962,6 @@ public class RMAppImpl implements RMApp, Recoverable { }; } - private static final class RMAppCollectorUpdateTransition - extends RMAppTransition { - - public void transition(RMAppImpl app, RMAppEvent event) { - if (YarnConfiguration.timelineServiceV2Enabled(app.conf)) { - LOG.info("Updating collector info for app: " + app.getApplicationId()); - - RMAppCollectorUpdateEvent appCollectorUpdateEvent = - (RMAppCollectorUpdateEvent) event; - // Update collector address - app.setCollectorAddr(appCollectorUpdateEvent.getAppCollectorAddr()); - - // TODO persistent to RMStateStore for recover - // Save to RMStateStore - } - }; - } - private static final class RMAppNodeUpdateTransition extends RMAppTransition { public void transition(RMAppImpl app, RMAppEvent event) { RMAppNodeUpdateEvent nodeUpdateEvent = (RMAppNodeUpdateEvent) event; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 42fd49ef8d4..4df4996e393 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; @@ -70,6 +71,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -1007,13 +1009,23 @@ public class TestResourceTrackerService extends NodeLabelTestBase { RMNodeImpl node2 = (RMNodeImpl) rm.getRMContext().getRMNodes().get(nm2.getNodeId()); - RMApp app1 = rm.submitApp(1024); + RMAppImpl app1 = (RMAppImpl) rm.submitApp(1024); String collectorAddr1 = "1.2.3.4:5"; - app1.setCollectorAddr(collectorAddr1); + app1.setCollectorData(AppCollectorData.newInstance( + app1.getApplicationId(), collectorAddr1)); String collectorAddr2 = "5.4.3.2:1"; - RMApp app2 = rm.submitApp(1024); - app2.setCollectorAddr(collectorAddr2); + RMAppImpl app2 = (RMAppImpl) rm.submitApp(1024); + app2.setCollectorData(AppCollectorData.newInstance( + app2.getApplicationId(), collectorAddr2)); + + String collectorAddr3 = "5.4.3.2:2"; + app2.setCollectorData(AppCollectorData.newInstance( + app2.getApplicationId(), collectorAddr3, 0, 1)); + + String collectorAddr4 = "5.4.3.2:3"; + app2.setCollectorData(AppCollectorData.newInstance( + app2.getApplicationId(), collectorAddr4, 1, 0)); // Create a running container for app1 running on nm1 ContainerId runningContainerId1 = BuilderUtils.newContainerId( @@ -1051,14 +1063,18 @@ public class TestResourceTrackerService extends NodeLabelTestBase { Assert.assertEquals(app2.getApplicationId(), node2.getRunningApps().get(0)); nodeHeartbeat1 = nm1.nodeHeartbeat(true); - Map map1 = nodeHeartbeat1.getAppCollectorsMap(); + Map map1 + = nodeHeartbeat1.getAppCollectors(); Assert.assertEquals(1, map1.size()); - Assert.assertEquals(collectorAddr1, map1.get(app1.getApplicationId())); + Assert.assertEquals(collectorAddr1, + map1.get(app1.getApplicationId()).getCollectorAddr()); nodeHeartbeat2 = nm2.nodeHeartbeat(true); - Map map2 = nodeHeartbeat2.getAppCollectorsMap(); + Map map2 + = nodeHeartbeat2.getAppCollectors(); Assert.assertEquals(1, map2.size()); - Assert.assertEquals(collectorAddr2, map2.get(app2.getApplicationId())); + Assert.assertEquals(collectorAddr4, + map2.get(app2.getApplicationId()).getCollectorAddr()); } private void checkRebootedNMCount(MockRM rm2, int count) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index 19ee0b17c46..6274573f36b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; @@ -95,15 +96,7 @@ public abstract class MockAsm extends MockApps { throw new UnsupportedOperationException("Not supported yet."); } @Override - public String getCollectorAddr() { - throw new UnsupportedOperationException("Not supported yet."); - } - @Override - public void setCollectorAddr(String collectorAddr) { - throw new UnsupportedOperationException("Not supported yet."); - } - @Override - public void removeCollectorAddr() { + public AppCollectorData getCollectorData() { throw new UnsupportedOperationException("Not supported yet."); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index 62a5c5282ae..8c8d09ee905 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -302,17 +303,9 @@ public class MockRMApp implements RMApp { throw new UnsupportedOperationException("Not supported yet."); } - public String getCollectorAddr() { + @Override + public AppCollectorData getCollectorData() { throw new UnsupportedOperationException("Not supported yet."); } - @Override - public void removeCollectorAddr() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public void setCollectorAddr(String collectorAddr) { - throw new UnsupportedOperationException("Not supported yet."); - } }