From 78b7e070d8009c78665a2baa64fe888788f53e69 Mon Sep 17 00:00:00 2001 From: Sangjin Lee Date: Fri, 14 Oct 2016 14:40:05 -0700 Subject: [PATCH] YARN-5638. Introduce a collector timestamp to uniquely identify collectors creation order in collector discovery. Contributed by Li Lu. --- .../protocolrecords/NodeHeartbeatRequest.java | 13 ++- .../NodeHeartbeatResponse.java | 8 +- .../ReportNewCollectorInfoRequest.java | 10 +- .../impl/pb/NodeHeartbeatRequestPBImpl.java | 47 ++++---- .../impl/pb/NodeHeartbeatResponsePBImpl.java | 37 ++++--- .../ReportNewCollectorInfoRequestPBImpl.java | 36 +++--- .../server/api/records/AppCollectorData.java | 104 ++++++++++++++++++ .../server/api/records/AppCollectorsMap.java | 46 -------- ...BImpl.java => AppCollectorDataPBImpl.java} | 76 ++++++++++--- .../api/records/impl/pb/package-info.java | 19 ++++ .../yarn_server_common_service_protos.proto | 14 ++- .../java/org/apache/hadoop/yarn/TestRPC.java | 6 +- .../hadoop/yarn/TestYarnServerApiClasses.java | 22 ++-- .../yarn/server/nodemanager/Context.java | 14 ++- .../yarn/server/nodemanager/NodeManager.java | 23 ++-- .../nodemanager/NodeStatusUpdaterImpl.java | 54 +++++---- .../collectormanager/NMCollectorService.java | 29 +++-- .../application/ApplicationImpl.java | 39 ++++--- .../amrmproxy/BaseAMRMProxyTest.java | 8 +- .../ApplicationMasterService.java | 3 +- .../resourcemanager/DefaultAMSProcessor.java | 8 +- .../ResourceTrackerService.java | 58 ++++++---- .../server/resourcemanager/rmapp/RMApp.java | 30 ++--- .../resourcemanager/rmapp/RMAppEventType.java | 3 - .../resourcemanager/rmapp/RMAppImpl.java | 17 ++- .../TestResourceTrackerService.java | 32 ++++-- .../applicationsmanager/MockAsm.java | 11 +- .../resourcemanager/rmapp/MockRMApp.java | 12 +- 28 files changed, 483 insertions(+), 296 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java delete mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorsMap.java rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/{AppCollectorsMapPBImpl.java => AppCollectorDataPBImpl.java} (65%) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/package-info.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java index c795e556c46..f238f79f172 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java @@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.util.Records; @@ -47,7 +48,7 @@ public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus, public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus, MasterKey lastKnownContainerTokenMasterKey, MasterKey lastKnownNMTokenMasterKey, Set nodeLabels, - Map registeredCollectors) { + Map registeringCollectors) { NodeHeartbeatRequest nodeHeartbeatRequest = Records.newRecord(NodeHeartbeatRequest.class); nodeHeartbeatRequest.setNodeStatus(nodeStatus); @@ -56,7 +57,7 @@ public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus, nodeHeartbeatRequest .setLastKnownNMTokenMasterKey(lastKnownNMTokenMasterKey); nodeHeartbeatRequest.setNodeLabels(nodeLabels); - nodeHeartbeatRequest.setRegisteredCollectors(registeredCollectors); + nodeHeartbeatRequest.setRegisteringCollectors(registeringCollectors); return nodeHeartbeatRequest; } @@ -79,7 +80,9 @@ public abstract void setLogAggregationReportsForApps( List logAggregationReportsForApps); // This tells RM registered collectors' address info on this node - public abstract Map getRegisteredCollectors(); - public abstract void setRegisteredCollectors(Map appCollectorsMap); + public abstract Map + getRegisteringCollectors(); + + public abstract void setRegisteringCollectors(Map appCollectorsMap); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index 3b0ec10595b..2ebca570853 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; @@ -47,10 +48,9 @@ public abstract class NodeHeartbeatResponse { public abstract List getApplicationsToCleanup(); // This tells NM the collectors' address info of related apps - public abstract Map getAppCollectorsMap(); - - public abstract void setAppCollectorsMap( - Map appCollectorsMap); + public abstract Map getAppCollectors(); + public abstract void setAppCollectors( + Map appCollectorsMap); public abstract void setResponseId(int responseId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java index 3498de90a3c..1503eca6e8e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java @@ -22,14 +22,14 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.util.Records; @Private public abstract class ReportNewCollectorInfoRequest { public static ReportNewCollectorInfoRequest newInstance( - List appCollectorsList) { + List appCollectorsList) { ReportNewCollectorInfoRequest request = Records.newRecord(ReportNewCollectorInfoRequest.class); request.setAppCollectorsList(appCollectorsList); @@ -41,13 +41,13 @@ public static ReportNewCollectorInfoRequest newInstance( ReportNewCollectorInfoRequest request = Records.newRecord(ReportNewCollectorInfoRequest.class); request.setAppCollectorsList( - Arrays.asList(AppCollectorsMap.newInstance(id, collectorAddr))); + Arrays.asList(AppCollectorData.newInstance(id, collectorAddr))); return request; } - public abstract List getAppCollectorsList(); + public abstract List getAppCollectorsList(); public abstract void setAppCollectorsList( - List appCollectorsList); + List appCollectorsList); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java index d0c11985db3..73a8abe0fcc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java @@ -35,13 +35,14 @@ import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto; -import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; @@ -58,7 +59,7 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest { private Set labels = null; private List logAggregationReportsForApps = null; - private Map registeredCollectors = null; + private Map registeringCollectors = null; public NodeHeartbeatRequestPBImpl() { builder = NodeHeartbeatRequestProto.newBuilder(); @@ -114,8 +115,8 @@ private void mergeLocalToBuilder() { if (this.logAggregationReportsForApps != null) { addLogAggregationStatusForAppsToProto(); } - if (this.registeredCollectors != null) { - addRegisteredCollectorsToProto(); + if (this.registeringCollectors != null) { + addRegisteringCollectorsToProto(); } } @@ -158,14 +159,14 @@ private LogAggregationReportProto convertToProtoFormat( return ((LogAggregationReportPBImpl) value).getProto(); } - private void addRegisteredCollectorsToProto() { + private void addRegisteringCollectorsToProto() { maybeInitBuilder(); - builder.clearRegisteredCollectors(); - for (Map.Entry entry : - registeredCollectors.entrySet()) { - builder.addRegisteredCollectors(AppCollectorsMapProto.newBuilder() + builder.clearRegisteringCollectors(); + for (Map.Entry entry : + registeringCollectors.entrySet()) { + builder.addRegisteringCollectors(AppCollectorDataProto.newBuilder() .setAppId(convertToProtoFormat(entry.getKey())) - .setAppCollectorAddr(entry.getValue())); + .setAppCollectorAddr(entry.getValue().getCollectorAddr())); } } @@ -251,35 +252,37 @@ public void setLastKnownNMTokenMasterKey(MasterKey masterKey) { } @Override - public Map getRegisteredCollectors() { - if (this.registeredCollectors != null) { - return this.registeredCollectors; + public Map getRegisteringCollectors() { + if (this.registeringCollectors != null) { + return this.registeringCollectors; } initRegisteredCollectors(); - return registeredCollectors; + return registeringCollectors; } private void initRegisteredCollectors() { NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder; - List list = p.getRegisteredCollectorsList(); + List list = p.getRegisteringCollectorsList(); if (!list.isEmpty()) { - this.registeredCollectors = new HashMap<>(); - for (AppCollectorsMapProto c : list) { + this.registeringCollectors = new HashMap<>(); + for (AppCollectorDataProto c : list) { ApplicationId appId = convertFromProtoFormat(c.getAppId()); - this.registeredCollectors.put(appId, c.getAppCollectorAddr()); + AppCollectorData data = AppCollectorData.newInstance(appId, + c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion()); + this.registeringCollectors.put(appId, data); } } } @Override - public void setRegisteredCollectors( - Map registeredCollectors) { + public void setRegisteringCollectors( + Map registeredCollectors) { if (registeredCollectors == null || registeredCollectors.isEmpty()) { return; } maybeInitBuilder(); - this.registeredCollectors = new HashMap(); - this.registeredCollectors.putAll(registeredCollectors); + this.registeringCollectors = new HashMap<>(); + this.registeringCollectors.putAll(registeredCollectors); } private NodeStatusPBImpl convertFromProtoFormat(NodeStatusProto p) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 46c2b0b8789..ad81a1f9814 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -45,11 +45,12 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto; -import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; @@ -70,7 +71,7 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse { private List applicationsToCleanup = null; private Map systemCredentials = null; private Resource resource = null; - private Map appCollectorsMap = null; + private Map appCollectorsMap = null; private MasterKey containerTokenMasterKey = null; private MasterKey nmTokenMasterKey = null; @@ -146,11 +147,15 @@ private void addSystemCredentialsToProto() { private void addAppCollectorsMapToProto() { maybeInitBuilder(); - builder.clearAppCollectorsMap(); - for (Map.Entry entry : appCollectorsMap.entrySet()) { - builder.addAppCollectorsMap(AppCollectorsMapProto.newBuilder() + builder.clearAppCollectors(); + for (Map.Entry entry + : appCollectorsMap.entrySet()) { + AppCollectorData data = entry.getValue(); + builder.addAppCollectors(AppCollectorDataProto.newBuilder() .setAppId(convertToProtoFormat(entry.getKey())) - .setAppCollectorAddr(entry.getValue())); + .setAppCollectorAddr(data.getCollectorAddr()) + .setRmIdentifier(data.getRMIdentifier()) + .setVersion(data.getVersion())); } } @@ -568,7 +573,7 @@ public Map getSystemCredentialsForApps() { } @Override - public Map getAppCollectorsMap() { + public Map getAppCollectors() { if (this.appCollectorsMap != null) { return this.appCollectorsMap; } @@ -589,12 +594,14 @@ private void initSystemCredentials() { private void initAppCollectorsMap() { NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; - List list = p.getAppCollectorsMapList(); + List list = p.getAppCollectorsList(); if (!list.isEmpty()) { this.appCollectorsMap = new HashMap<>(); - for (AppCollectorsMapProto c : list) { + for (AppCollectorDataProto c : list) { ApplicationId appId = convertFromProtoFormat(c.getAppId()); - this.appCollectorsMap.put(appId, c.getAppCollectorAddr()); + AppCollectorData data = AppCollectorData.newInstance(appId, + c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion()); + this.appCollectorsMap.put(appId, data); } } } @@ -611,14 +618,14 @@ public void setSystemCredentialsForApps( } @Override - public void setAppCollectorsMap( - Map appCollectorsMap) { - if (appCollectorsMap == null || appCollectorsMap.isEmpty()) { + public void setAppCollectors( + Map appCollectors) { + if (appCollectors == null || appCollectors.isEmpty()) { return; } maybeInitBuilder(); - this.appCollectorsMap = new HashMap(); - this.appCollectorsMap.putAll(appCollectorsMap); + this.appCollectorsMap = new HashMap<>(); + this.appCollectorsMap.putAll(appCollectors); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java index c6f66194e5d..3f3dcf52e9f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java @@ -20,12 +20,12 @@ import java.util.ArrayList; import java.util.List; -import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProtoOrBuilder; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; -import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap; -import org.apache.hadoop.yarn.server.api.records.impl.pb.AppCollectorsMapPBImpl; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; +import org.apache.hadoop.yarn.server.api.records.impl.pb.AppCollectorDataPBImpl; public class ReportNewCollectorInfoRequestPBImpl extends ReportNewCollectorInfoRequest { @@ -36,7 +36,7 @@ public class ReportNewCollectorInfoRequestPBImpl extends private ReportNewCollectorInfoRequestProto.Builder builder = null; private boolean viaProto = false; - private List collectorsList = null; + private List collectorsList = null; public ReportNewCollectorInfoRequestPBImpl() { builder = ReportNewCollectorInfoRequestProto.newBuilder(); @@ -96,9 +96,9 @@ private void maybeInitBuilder() { private void addLocalCollectorsToProto() { maybeInitBuilder(); builder.clearAppCollectors(); - List protoList = - new ArrayList(); - for (AppCollectorsMap m : this.collectorsList) { + List protoList = + new ArrayList(); + for (AppCollectorData m : this.collectorsList) { protoList.add(convertToProtoFormat(m)); } builder.addAllAppCollectors(protoList); @@ -106,16 +106,16 @@ private void addLocalCollectorsToProto() { private void initLocalCollectorsList() { ReportNewCollectorInfoRequestProtoOrBuilder p = viaProto ? proto : builder; - List list = + List list = p.getAppCollectorsList(); - this.collectorsList = new ArrayList(); - for (AppCollectorsMapProto m : list) { + this.collectorsList = new ArrayList(); + for (AppCollectorDataProto m : list) { this.collectorsList.add(convertFromProtoFormat(m)); } } @Override - public List getAppCollectorsList() { + public List getAppCollectorsList() { if (this.collectorsList == null) { initLocalCollectorsList(); } @@ -123,7 +123,7 @@ public List getAppCollectorsList() { } @Override - public void setAppCollectorsList(List appCollectorsList) { + public void setAppCollectorsList(List appCollectorsList) { maybeInitBuilder(); if (appCollectorsList == null) { builder.clearAppCollectors(); @@ -131,14 +131,14 @@ public void setAppCollectorsList(List appCollectorsList) { this.collectorsList = appCollectorsList; } - private AppCollectorsMapPBImpl convertFromProtoFormat( - AppCollectorsMapProto p) { - return new AppCollectorsMapPBImpl(p); + private AppCollectorDataPBImpl convertFromProtoFormat( + AppCollectorDataProto p) { + return new AppCollectorDataPBImpl(p); } - private AppCollectorsMapProto convertToProtoFormat( - AppCollectorsMap m) { - return ((AppCollectorsMapPBImpl) m).getProto(); + private AppCollectorDataProto convertToProtoFormat( + AppCollectorData m) { + return ((AppCollectorDataPBImpl) m).getProto(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java new file mode 100644 index 00000000000..da2e5de8aa3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.util.Records; + + +@Private +@InterfaceStability.Unstable +public abstract class AppCollectorData { + + protected static final long DEFAULT_TIMESTAMP_VALUE = -1; + + public static AppCollectorData newInstance( + ApplicationId id, String collectorAddr, long rmIdentifier, long version) { + AppCollectorData appCollectorData = + Records.newRecord(AppCollectorData.class); + appCollectorData.setApplicationId(id); + appCollectorData.setCollectorAddr(collectorAddr); + appCollectorData.setRMIdentifier(rmIdentifier); + appCollectorData.setVersion(version); + return appCollectorData; + } + + public static AppCollectorData newInstance(ApplicationId id, + String collectorAddr) { + return newInstance(id, collectorAddr, DEFAULT_TIMESTAMP_VALUE, + DEFAULT_TIMESTAMP_VALUE); + } + + /** + * Returns if a collector data item happens before another one. Null data + * items happens before any other non-null items. Non-null data items A + * happens before another non-null item B when A's rmIdentifier is less than + * B's rmIdentifier. Or A's version is less than B's if they have the same + * rmIdentifier. + * + * @param dataA first collector data item. + * @param dataB second collector data item. + * @return true if dataA happens before dataB. + */ + public static boolean happensBefore(AppCollectorData dataA, + AppCollectorData dataB) { + if (dataA == null && dataB == null) { + return false; + } else if (dataA == null || dataB == null) { + return dataA == null; + } + + return + (dataA.getRMIdentifier() < dataB.getRMIdentifier()) + || ((dataA.getRMIdentifier() == dataB.getRMIdentifier()) + && (dataA.getVersion() < dataB.getVersion())); + } + + /** + * Returns if the collector data has been stamped by the RM with a RM cluster + * timestamp and a version number. + * + * @return true if RM has already assigned a timestamp for this collector. + * Otherwise, it means the RM has not recognized the existence of this + * collector. + */ + public boolean isStamped() { + return (getRMIdentifier() != DEFAULT_TIMESTAMP_VALUE) + || (getVersion() != DEFAULT_TIMESTAMP_VALUE); + } + + public abstract ApplicationId getApplicationId(); + + public abstract void setApplicationId(ApplicationId id); + + public abstract String getCollectorAddr(); + + public abstract void setCollectorAddr(String addr); + + public abstract long getRMIdentifier(); + + public abstract void setRMIdentifier(long rmId); + + public abstract long getVersion(); + + public abstract void setVersion(long version); + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorsMap.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorsMap.java deleted file mode 100644 index 07e1d92898f..00000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorsMap.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.api.records; - -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.util.Records; - - -@Private -public abstract class AppCollectorsMap { - - public static AppCollectorsMap newInstance( - ApplicationId id, String collectorAddr) { - AppCollectorsMap appCollectorsMap = - Records.newRecord(AppCollectorsMap.class); - appCollectorsMap.setApplicationId(id); - appCollectorsMap.setCollectorAddr(collectorAddr); - return appCollectorsMap; - } - - public abstract ApplicationId getApplicationId(); - - public abstract void setApplicationId(ApplicationId id); - - public abstract String getCollectorAddr(); - - public abstract void setCollectorAddr(String addr); - -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorsMapPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java similarity index 65% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorsMapPBImpl.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java index 3740035d836..7d3a80545b5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorsMapPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java @@ -21,37 +21,39 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; -import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; -import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProto; -import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProtoOrBuilder; import com.google.protobuf.TextFormat; @Private @Unstable -public class AppCollectorsMapPBImpl extends AppCollectorsMap { +public class AppCollectorDataPBImpl extends AppCollectorData { - private AppCollectorsMapProto proto = - AppCollectorsMapProto.getDefaultInstance(); + private AppCollectorDataProto proto = + AppCollectorDataProto.getDefaultInstance(); - private AppCollectorsMapProto.Builder builder = null; + private AppCollectorDataProto.Builder builder = null; private boolean viaProto = false; private ApplicationId appId = null; private String collectorAddr = null; + private Long rmIdentifier = null; + private Long version = null; - public AppCollectorsMapPBImpl() { - builder = AppCollectorsMapProto.newBuilder(); + public AppCollectorDataPBImpl() { + builder = AppCollectorDataProto.newBuilder(); } - public AppCollectorsMapPBImpl(AppCollectorsMapProto proto) { + public AppCollectorDataPBImpl(AppCollectorDataProto proto) { this.proto = proto; viaProto = true; } - public AppCollectorsMapProto getProto() { + public AppCollectorDataProto getProto() { mergeLocalToProto(); proto = viaProto ? proto : builder.build(); viaProto = true; @@ -81,7 +83,7 @@ public String toString() { @Override public ApplicationId getApplicationId() { - AppCollectorsMapProtoOrBuilder p = viaProto ? proto : builder; + AppCollectorDataProtoOrBuilder p = viaProto ? proto : builder; if (this.appId == null && p.hasAppId()) { this.appId = convertFromProtoFormat(p.getAppId()); } @@ -90,7 +92,7 @@ public ApplicationId getApplicationId() { @Override public String getCollectorAddr() { - AppCollectorsMapProtoOrBuilder p = viaProto ? proto : builder; + AppCollectorDataProtoOrBuilder p = viaProto ? proto : builder; if (this.collectorAddr == null && p.hasAppCollectorAddr()) { this.collectorAddr = p.getAppCollectorAddr(); @@ -116,6 +118,46 @@ public void setCollectorAddr(String collectorAddr) { this.collectorAddr = collectorAddr; } + @Override + public long getRMIdentifier() { + AppCollectorDataProtoOrBuilder p = viaProto ? proto : builder; + if (this.rmIdentifier == null && p.hasRmIdentifier()) { + this.rmIdentifier = p.getRmIdentifier(); + } + if (this.rmIdentifier != null) { + return this.rmIdentifier; + } else { + return AppCollectorData.DEFAULT_TIMESTAMP_VALUE; + } + } + + @Override + public void setRMIdentifier(long rmId) { + maybeInitBuilder(); + this.rmIdentifier = rmId; + builder.setRmIdentifier(rmId); + } + + @Override + public long getVersion() { + AppCollectorDataProtoOrBuilder p = viaProto ? proto : builder; + if (this.version == null && p.hasRmIdentifier()) { + this.version = p.getRmIdentifier(); + } + if (this.version != null) { + return this.version; + } else { + return AppCollectorData.DEFAULT_TIMESTAMP_VALUE; + } + } + + @Override + public void setVersion(long version) { + maybeInitBuilder(); + this.version = version; + builder.setVersion(version); + } + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { return new ApplicationIdPBImpl(p); } @@ -126,7 +168,7 @@ private ApplicationIdProto convertToProtoFormat(ApplicationId t) { private void maybeInitBuilder() { if (viaProto || builder == null) { - builder = AppCollectorsMapProto.newBuilder(proto); + builder = AppCollectorDataProto.newBuilder(proto); } viaProto = false; } @@ -147,6 +189,12 @@ private void mergeLocalToBuilder() { if (this.collectorAddr != null) { builder.setAppCollectorAddr(this.collectorAddr); } + if (this.rmIdentifier != null) { + builder.setRmIdentifier(this.rmIdentifier); + } + if (this.version != null) { + builder.setVersion(this.version); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/package-info.java new file mode 100644 index 00000000000..4ce3896fbb9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/package-info.java @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** Server records PB implementations. */ +package org.apache.hadoop.yarn.server.api.records.impl.pb; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 4e05fbad787..c33b913b4d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -90,7 +90,7 @@ message NodeHeartbeatRequestProto { optional MasterKeyProto last_known_nm_token_master_key = 3; optional NodeLabelsProto nodeLabels = 4; repeated LogAggregationReportProto log_aggregation_reports_for_apps = 5; - repeated AppCollectorsMapProto registered_collectors = 6; + repeated AppCollectorDataProto registering_collectors = 6; } message LogAggregationReportProto { @@ -116,7 +116,7 @@ message NodeHeartbeatResponseProto { repeated SignalContainerRequestProto containers_to_signal = 13; optional ResourceProto resource = 14; optional ContainerQueuingLimitProto container_queuing_limit = 15; - repeated AppCollectorsMapProto app_collectors_map = 16; + repeated AppCollectorDataProto app_collectors = 16; // to be used in place of containers_to_decrease repeated ContainerProto containers_to_update = 17; } @@ -134,16 +134,18 @@ message SystemCredentialsForAppsProto { //////////////////////////////////////////////////////////////////////// ////// From collector_nodemanager_protocol //////////////////////////// //////////////////////////////////////////////////////////////////////// -message AppCollectorsMapProto { - optional ApplicationIdProto appId = 1; - optional string appCollectorAddr = 2; +message AppCollectorDataProto { + optional ApplicationIdProto app_id = 1; + optional string app_collector_addr = 2; + optional int64 rm_identifier = 3 [default = -1]; + optional int64 version = 4 [default = -1]; } ////////////////////////////////////////////////////// /////// collector_nodemanager_protocol ////////////// ////////////////////////////////////////////////////// message ReportNewCollectorInfoRequestProto { - repeated AppCollectorsMapProto app_collectors = 1; + repeated AppCollectorDataProto app_collectors = 1; } message ReportNewCollectorInfoResponseProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java index 9775f5c15a4..82911974034 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java @@ -77,7 +77,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse; -import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.junit.Test; @@ -429,10 +429,10 @@ public class DummyNMCollectorService public ReportNewCollectorInfoResponse reportNewCollectorInfo( ReportNewCollectorInfoRequest request) throws YarnException, IOException { - List appCollectors = request.getAppCollectorsList(); + List appCollectors = request.getAppCollectorsList(); if (appCollectors.size() == 1) { // check default appID and collectorAddr - AppCollectorsMap appCollector = appCollectors.get(0); + AppCollectorData appCollector = appCollectors.get(0); Assert.assertEquals(appCollector.getApplicationId(), DEFAULT_APP_ID); Assert.assertEquals(appCollector.getCollectorAddr(), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java index 8c0c73afd80..15071f37499 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java @@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UnRegisterNodeManagerRequestPBImpl; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; @@ -109,14 +110,14 @@ public void testNodeHeartbeatRequestPBImpl() { original.setLastKnownNMTokenMasterKey(getMasterKey()); original.setNodeStatus(getNodeStatus()); original.setNodeLabels(getValidNodeLabels()); - Map collectors = getCollectors(); - original.setRegisteredCollectors(collectors); + Map collectors = getCollectors(); + original.setRegisteringCollectors(collectors); NodeHeartbeatRequestPBImpl copy = new NodeHeartbeatRequestPBImpl( original.getProto()); assertEquals(1, copy.getLastKnownContainerTokenMasterKey().getKeyId()); assertEquals(1, copy.getLastKnownNMTokenMasterKey().getKeyId()); assertEquals("localhost", copy.getNodeStatus().getNodeId().getHost()); - assertEquals(collectors, copy.getRegisteredCollectors()); + assertEquals(collectors, copy.getRegisteringCollectors()); // check labels are coming with valid values Assert.assertTrue(original.getNodeLabels() .containsAll(copy.getNodeLabels())); @@ -153,8 +154,8 @@ public void testNodeHeartbeatResponsePBImpl() { original.setNextHeartBeatInterval(1000); original.setNodeAction(NodeAction.NORMAL); original.setResponseId(100); - Map collectors = getCollectors(); - original.setAppCollectorsMap(collectors); + Map collectors = getCollectors(); + original.setAppCollectors(collectors); NodeHeartbeatResponsePBImpl copy = new NodeHeartbeatResponsePBImpl( original.getProto()); @@ -164,7 +165,7 @@ public void testNodeHeartbeatResponsePBImpl() { assertEquals(1, copy.getContainerTokenMasterKey().getKeyId()); assertEquals(1, copy.getNMTokenMasterKey().getKeyId()); assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage()); - assertEquals(collectors, copy.getAppCollectorsMap()); + assertEquals(collectors, copy.getAppCollectors()); assertEquals(false, copy.getAreNodeLabelsAcceptedByRM()); } @@ -347,12 +348,13 @@ private HashSet getValidNodeLabels() { return nodeLabels; } - private Map getCollectors() { + private Map getCollectors() { ApplicationId appID = ApplicationId.newInstance(1L, 1); String collectorAddr = "localhost:0"; - Map collectorMap = - new HashMap(); - collectorMap.put(appID, collectorAddr); + AppCollectorData data = AppCollectorData.newInstance(appID, collectorAddr); + Map collectorMap = + new HashMap<>(); + collectorMap.put(appID, data); return collectorMap; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index 16a84973b69..7d6d8cb7fcd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; @@ -65,11 +66,18 @@ public interface Context { Map getSystemCredentialsForApps(); /** - * Get the registered collectors that located on this NM. - * @return registered collectors, or null if the timeline service v.2 is not + * Get the list of collectors that are registering with the RM from this node. + * @return registering collectors, or null if the timeline service v.2 is not * enabled */ - Map getRegisteredCollectors(); + Map getRegisteringCollectors(); + + /** + * Get the list of collectors registered with the RM and known by this node. + * @return known collectors, or null if the timeline service v.2 is not + * enabled. + */ + Map getKnownCollectors(); ConcurrentMap getContainers(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index bf4b43cb9a7..21fe0cbf1d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager; import org.apache.hadoop.yarn.server.nodemanager.collectormanager.NMCollectorService; @@ -492,7 +493,9 @@ public static class NMContext implements Context { protected final ConcurrentMap containers = new ConcurrentSkipListMap(); - private Map registeredCollectors; + private Map registeringCollectors; + + private Map knownCollectors; protected final ConcurrentMap increasedContainers = @@ -526,7 +529,8 @@ public NMContext(NMContainerTokenSecretManager containerTokenSecretManager, NMStateStoreService stateStore, boolean isDistSchedulingEnabled, Configuration conf) { if (YarnConfiguration.timelineServiceV2Enabled(conf)) { - this.registeredCollectors = new ConcurrentHashMap<>(); + this.registeringCollectors = new ConcurrentHashMap<>(); + this.knownCollectors = new ConcurrentHashMap<>(); } this.containerTokenSecretManager = containerTokenSecretManager; this.nmTokenSecretManager = nmTokenSecretManager; @@ -681,18 +685,13 @@ public OpportunisticContainerAllocator getContainerAllocator() { } @Override - public Map getRegisteredCollectors() { - return this.registeredCollectors; + public Map getRegisteringCollectors() { + return this.registeringCollectors; } - public void addRegisteredCollectors( - Map newRegisteredCollectors) { - if (registeredCollectors != null) { - this.registeredCollectors.putAll(newRegisteredCollectors); - } else { - LOG.warn("collectors are added when the registered collectors are " + - "initialized"); - } + @Override + public Map getKnownCollectors() { + return this.knownCollectors; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 3b465451319..43cd135079a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -72,6 +72,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; import org.apache.hadoop.yarn.server.api.records.MasterKey; @@ -760,7 +761,6 @@ private static Map parseCredentials( } protected void startStatusUpdater() { - statusUpdaterRunnable = new StatusUpdaterRunnable(); statusUpdater = new Thread(statusUpdaterRunnable, "Node Status Updater"); @@ -1043,7 +1043,7 @@ public void run() { .getNMTokenSecretManager().getCurrentKey(), nodeLabelsForHeartbeat, NodeStatusUpdaterImpl.this.context - .getRegisteredCollectors()); + .getRegisteringCollectors()); if (logAggregationEnabled) { // pull log aggregation status for application running in this NM @@ -1134,7 +1134,7 @@ public void run() { } } if (YarnConfiguration.timelineServiceV2Enabled(context.getConf())) { - updateTimelineClientsAddress(response); + updateTimelineCollectorData(response); } } catch (ConnectException e) { @@ -1164,40 +1164,48 @@ public void run() { } } - private void updateTimelineClientsAddress( + private void updateTimelineCollectorData( NodeHeartbeatResponse response) { - Map knownCollectorsMap = - response.getAppCollectorsMap(); - if (knownCollectorsMap == null) { + Map incomingCollectorsMap = + response.getAppCollectors(); + if (incomingCollectorsMap == null) { if (LOG.isDebugEnabled()) { LOG.debug("No collectors to update RM"); } - } else { - Set> rmKnownCollectors = - knownCollectorsMap.entrySet(); - for (Map.Entry entry : rmKnownCollectors) { - ApplicationId appId = entry.getKey(); - String collectorAddr = entry.getValue(); + return; + } + Map knownCollectors = + context.getKnownCollectors(); + for (Map.Entry entry + : incomingCollectorsMap.entrySet()) { + ApplicationId appId = entry.getKey(); + AppCollectorData collectorData = entry.getValue(); - // Only handle applications running on local node. - // Not include apps with timeline collectors running in local - Application application = context.getApplications().get(appId); - // TODO this logic could be problematic if the collector address - // gets updated due to NM restart or collector service failure - if (application != null && - !context.getRegisteredCollectors().containsKey(appId)) { + // Only handle applications running on local node. + Application application = context.getApplications().get(appId); + if (application != null) { + // Update collector data if the newly received data happens after + // the known data (updates the known data). + AppCollectorData existingData = knownCollectors.get(appId); + if (AppCollectorData.happensBefore(existingData, collectorData)) { if (LOG.isDebugEnabled()) { - LOG.debug("Sync a new collector address: " + collectorAddr + - " for application: " + appId + " from RM."); + LOG.debug("Sync a new collector address: " + + collectorData.getCollectorAddr() + + " for application: " + appId + " from RM."); } + // Update information for clients. NMTimelinePublisher nmTimelinePublisher = context.getNMTimelinePublisher(); if (nmTimelinePublisher != null) { nmTimelinePublisher.setTimelineServiceAddress( - application.getAppId(), collectorAddr); + application.getAppId(), collectorData.getCollectorAddr()); } + // Update information for the node manager itself. + knownCollectors.put(appId, collectorData); } } + // Remove the registering collector data + context.getRegisteringCollectors().remove(entry.getKey()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java index e52e1ec1a16..a5ffc744cc8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java @@ -37,9 +37,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse; -import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.nodemanager.Context; -import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; @@ -108,23 +107,31 @@ public void serviceStop() throws Exception { @Override public ReportNewCollectorInfoResponse reportNewCollectorInfo( ReportNewCollectorInfoRequest request) throws YarnException, IOException { - List newCollectorsList = request.getAppCollectorsList(); + List newCollectorsList = request.getAppCollectorsList(); if (newCollectorsList != null && !newCollectorsList.isEmpty()) { - Map newCollectorsMap = - new HashMap(); - for (AppCollectorsMap collector : newCollectorsList) { + Map newCollectorsMap = + new HashMap<>(); + for (AppCollectorData collector : newCollectorsList) { ApplicationId appId = collector.getApplicationId(); - String collectorAddr = collector.getCollectorAddr(); - newCollectorsMap.put(appId, collectorAddr); + newCollectorsMap.put(appId, collector); // set registered collector address to TimelineClient. + // TODO: Do we need to do this after we received confirmation from + // the RM? NMTimelinePublisher nmTimelinePublisher = context.getNMTimelinePublisher(); if (nmTimelinePublisher != null) { - nmTimelinePublisher.setTimelineServiceAddress(appId, collectorAddr); + nmTimelinePublisher.setTimelineServiceAddress(appId, + collector.getCollectorAddr()); } } - ((NodeManager.NMContext)context).addRegisteredCollectors( - newCollectorsMap); + Map registeringCollectors + = context.getRegisteringCollectors(); + if (registeringCollectors != null) { + registeringCollectors.putAll(newCollectorsMap); + } else { + LOG.warn("collectors are added when the registered collectors are " + + "initialized"); + } } return ReportNewCollectorInfoResponse.newInstance(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index dd200716f87..39be7a790c4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.FlowContextProto; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType; @@ -558,6 +559,29 @@ public ApplicationState transition(ApplicationImpl app, @SuppressWarnings("unchecked") static class AppCompletelyDoneTransition implements SingleArcTransition { + + private void updateCollectorStatus(ApplicationImpl app) { + // Remove collectors info for finished apps. + // TODO check we remove related collectors info in failure cases + // (YARN-3038) + Map registeringCollectors + = app.context.getRegisteringCollectors(); + if (registeringCollectors != null) { + registeringCollectors.remove(app.getAppId()); + } + Map knownCollectors = + app.context.getKnownCollectors(); + if (knownCollectors != null) { + knownCollectors.remove(app.getAppId()); + } + // stop timelineClient when application get finished. + NMTimelinePublisher nmTimelinePublisher = + app.context.getNMTimelinePublisher(); + if (nmTimelinePublisher != null) { + nmTimelinePublisher.stopTimelineClient(app.getAppId()); + } + } + @Override public void transition(ApplicationImpl app, ApplicationEvent event) { @@ -566,20 +590,7 @@ public void transition(ApplicationImpl app, ApplicationEvent event) { new LogHandlerAppFinishedEvent(app.appId)); app.context.getNMTokenSecretManager().appFinished(app.getAppId()); - // Remove collectors info for finished apps. - // TODO check we remove related collectors info in failure cases - // (YARN-3038) - Map registeredCollectors = - app.context.getRegisteredCollectors(); - if (registeredCollectors != null) { - registeredCollectors.remove(app.getAppId()); - } - // stop timelineClient when application get finished. - NMTimelinePublisher nmTimelinePublisher = - app.context.getNMTimelinePublisher(); - if (nmTimelinePublisher != null) { - nmTimelinePublisher.stopTimelineClient(app.getAppId()); - } + updateCollectorStatus(app); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index 26e40859a57..73eba3de0d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; @@ -658,8 +659,11 @@ public Map getSystemCredentialsForApps() { return null; } - @Override - public Map getRegisteredCollectors() { + public Map getRegisteringCollectors() { + return null; + } + + @Override public Map getKnownCollectors() { return null; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 0b13887cfc1..a245a97b9d6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; @@ -254,7 +255,7 @@ public FinishApplicationMasterResponse finishApplicationMaster( // Remove collector address when app get finished. if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) { - rmApp.removeCollectorAddr(); + ((RMAppImpl) rmApp).removeCollectorData(); } // checking whether the app exits in RMStateStore at first not to throw // ApplicationDoesNotExistInCacheException before and after diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java index 052ec22c2f0..2d677a7ac4a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java @@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt @@ -293,9 +294,10 @@ public void allocate(ApplicationAttemptId appAttemptId, // add collector address for this application if (YarnConfiguration.timelineServiceV2Enabled( getRmContext().getYarnConfiguration())) { - response.setCollectorAddr( - getRmContext().getRMApps().get(appAttemptId.getApplicationId()) - .getCollectorAddr()); + AppCollectorData data = app.getCollectorData(); + if (data != null) { + response.setCollectorAddr(data.getCollectorAddr()); + } } // add preemption to the allocateResponse message (if any) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index e6f2bb2436d..99f67527fad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; @@ -63,12 +64,14 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerResponse; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils; import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -118,6 +121,8 @@ public class ResourceTrackerService extends AbstractService implements private boolean isDelegatedCentralizedNodeLabelsConf; private DynamicResourceConfiguration drConf; + private final AtomicLong timelineCollectorVersion = new AtomicLong(0); + public ResourceTrackerService(RMContext rmContext, NodesListManager nodesListManager, NMLivelinessMonitor nmLivelinessMonitor, @@ -525,9 +530,6 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) YarnConfiguration.timelineServiceV2Enabled(getConfig()); if (timelineV2Enabled) { // Check & update collectors info from request. - // TODO make sure it won't have race condition issue for AM failed over - // case that the older registration could possible override the newer - // one. updateAppCollectorsMap(request); } @@ -613,14 +615,14 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) private void setAppCollectorsMapToResponse( List runningApps, NodeHeartbeatResponse response) { - Map liveAppCollectorsMap = new - HashMap(); + Map liveAppCollectorsMap = new + HashMap<>(); Map rmApps = rmContext.getRMApps(); // Set collectors for all running apps on this node. for (ApplicationId appId : runningApps) { - String appCollectorAddr = rmApps.get(appId).getCollectorAddr(); - if (appCollectorAddr != null) { - liveAppCollectorsMap.put(appId, appCollectorAddr); + AppCollectorData appCollectorData = rmApps.get(appId).getCollectorData(); + if (appCollectorData != null) { + liveAppCollectorsMap.put(appId, appCollectorData); } else { if (LOG.isDebugEnabled()) { LOG.debug("Collector for applicaton: " + appId + @@ -628,29 +630,43 @@ private void setAppCollectorsMapToResponse( } } } - response.setAppCollectorsMap(liveAppCollectorsMap); + response.setAppCollectors(liveAppCollectorsMap); } private void updateAppCollectorsMap(NodeHeartbeatRequest request) { - Map registeredCollectorsMap = - request.getRegisteredCollectors(); - if (registeredCollectorsMap != null - && !registeredCollectorsMap.isEmpty()) { + Map registeringCollectorsMap = + request.getRegisteringCollectors(); + if (registeringCollectorsMap != null + && !registeringCollectorsMap.isEmpty()) { Map rmApps = rmContext.getRMApps(); - for (Map.Entry entry: - registeredCollectorsMap.entrySet()) { + for (Map.Entry entry: + registeringCollectorsMap.entrySet()) { ApplicationId appId = entry.getKey(); - String collectorAddr = entry.getValue(); - if (collectorAddr != null && !collectorAddr.isEmpty()) { + AppCollectorData collectorData = entry.getValue(); + if (collectorData != null) { + if (!collectorData.isStamped()) { + // Stamp the collector if we have not done so + collectorData.setRMIdentifier( + ResourceManager.getClusterTimeStamp()); + collectorData.setVersion( + timelineCollectorVersion.getAndIncrement()); + } RMApp rmApp = rmApps.get(appId); if (rmApp == null) { LOG.warn("Cannot update collector info because application ID: " + appId + " is not found in RMContext!"); } else { - String previousCollectorAddr = rmApp.getCollectorAddr(); - if (previousCollectorAddr == null - || !previousCollectorAddr.equals(collectorAddr)) { - rmApp.setCollectorAddr(collectorAddr); + AppCollectorData previousCollectorData = rmApp.getCollectorData(); + if (AppCollectorData.happensBefore(previousCollectorData, + collectorData)) { + // Sending collector update event. + // Note: RM has to store the newly received collector data + // synchronously. Otherwise, the RM may send out stale collector + // data before this update is done, and the RM then crashes, the + // newly updated collector data will get lost. + LOG.info("Update collector information for application " + appId + + " with new address: " + collectorData.getCollectorAddr()); + ((RMAppImpl) rmApp).setCollectorData(collectorData); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index 43fd1fbfbdf..1a0b9208122 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -23,6 +23,8 @@ import java.util.Map; import java.util.Set; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -39,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -180,27 +183,16 @@ ApplicationReport createAndGetApplicationReport(String clientUserName, String getTrackingUrl(); /** - * The collector address for the application. It should be used only if the - * timeline service v.2 is enabled. + * The timeline collector information for the application. It should be used + * only if the timeline service v.2 is enabled. * - * @return the address for the application's collector, or null if the - * timeline service v.2 is not enabled. + * @return the data for the application's collector, including collector + * address, collector ID. Return null if the timeline service v.2 is not + * enabled. */ - String getCollectorAddr(); - - /** - * Set collector address for the application. It should be used only if the - * timeline service v.2 is enabled. - * - * @param collectorAddr the address of collector - */ - void setCollectorAddr(String collectorAddr); - - /** - * Remove collector address when application is finished or killed. It should - * be used only if the timeline service v.2 is enabled. - */ - void removeCollectorAddr(); + @InterfaceAudience.Private + @InterfaceStability.Unstable + AppCollectorData getCollectorData(); /** * The original tracking url for the application master. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java index 04d2db522f6..514efd44dd6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java @@ -30,9 +30,6 @@ public enum RMAppEventType { // Source: Scheduler APP_ACCEPTED, - // TODO add source later - COLLECTOR_UPDATE, - // Source: RMAppAttempt ATTEMPT_REGISTERED, ATTEMPT_UNREGISTERED, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index af7cec4377f..dbddc5f471d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -72,6 +72,7 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; @@ -165,7 +166,7 @@ public class RMAppImpl implements RMApp, Recoverable { private long storedFinishTime = 0; private int firstAttemptIdInStateStore = 1; private int nextAttemptId = 1; - private volatile String collectorAddr; + private AppCollectorData collectorData; // This field isn't protected by readlock now. private volatile RMAppAttempt currentAttempt; private String queue; @@ -611,18 +612,16 @@ public void setQueue(String queue) { } @Override - public String getCollectorAddr() { - return this.collectorAddr; + public AppCollectorData getCollectorData() { + return this.collectorData; } - @Override - public void setCollectorAddr(String collectorAddress) { - this.collectorAddr = collectorAddress; + public void setCollectorData(AppCollectorData incomingData) { + this.collectorData = incomingData; } - @Override - public void removeCollectorAddr() { - this.collectorAddr = null; + public void removeCollectorData() { + this.collectorData = null; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 1e63fdf577f..5ed327868c6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; @@ -70,6 +71,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -1011,13 +1013,23 @@ public void testNodeHeartbeatForAppCollectorsMap() throws Exception { RMNodeImpl node2 = (RMNodeImpl) rm.getRMContext().getRMNodes().get(nm2.getNodeId()); - RMApp app1 = rm.submitApp(1024); + RMAppImpl app1 = (RMAppImpl) rm.submitApp(1024); String collectorAddr1 = "1.2.3.4:5"; - app1.setCollectorAddr(collectorAddr1); + app1.setCollectorData(AppCollectorData.newInstance( + app1.getApplicationId(), collectorAddr1)); String collectorAddr2 = "5.4.3.2:1"; - RMApp app2 = rm.submitApp(1024); - app2.setCollectorAddr(collectorAddr2); + RMAppImpl app2 = (RMAppImpl) rm.submitApp(1024); + app2.setCollectorData(AppCollectorData.newInstance( + app2.getApplicationId(), collectorAddr2)); + + String collectorAddr3 = "5.4.3.2:2"; + app2.setCollectorData(AppCollectorData.newInstance( + app2.getApplicationId(), collectorAddr3, 0, 1)); + + String collectorAddr4 = "5.4.3.2:3"; + app2.setCollectorData(AppCollectorData.newInstance( + app2.getApplicationId(), collectorAddr4, 1, 0)); // Create a running container for app1 running on nm1 ContainerId runningContainerId1 = BuilderUtils.newContainerId( @@ -1055,14 +1067,18 @@ public void testNodeHeartbeatForAppCollectorsMap() throws Exception { Assert.assertEquals(app2.getApplicationId(), node2.getRunningApps().get(0)); nodeHeartbeat1 = nm1.nodeHeartbeat(true); - Map map1 = nodeHeartbeat1.getAppCollectorsMap(); + Map map1 + = nodeHeartbeat1.getAppCollectors(); Assert.assertEquals(1, map1.size()); - Assert.assertEquals(collectorAddr1, map1.get(app1.getApplicationId())); + Assert.assertEquals(collectorAddr1, + map1.get(app1.getApplicationId()).getCollectorAddr()); nodeHeartbeat2 = nm2.nodeHeartbeat(true); - Map map2 = nodeHeartbeat2.getAppCollectorsMap(); + Map map2 + = nodeHeartbeat2.getAppCollectors(); Assert.assertEquals(1, map2.size()); - Assert.assertEquals(collectorAddr2, map2.get(app2.getApplicationId())); + Assert.assertEquals(collectorAddr4, + map2.get(app2.getApplicationId()).getCollectorAddr()); } private void checkRebootedNMCount(MockRM rm2, int count) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index 5246eb79a15..3234d6fe488 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; @@ -97,15 +98,7 @@ public StringBuilder getDiagnostics() { throw new UnsupportedOperationException("Not supported yet."); } @Override - public String getCollectorAddr() { - throw new UnsupportedOperationException("Not supported yet."); - } - @Override - public void setCollectorAddr(String collectorAddr) { - throw new UnsupportedOperationException("Not supported yet."); - } - @Override - public void removeCollectorAddr() { + public AppCollectorData getCollectorData() { throw new UnsupportedOperationException("Not supported yet."); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index 9290ff8faa0..b0d47a19e52 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -305,17 +306,8 @@ public CallerContext getCallerContext() { throw new UnsupportedOperationException("Not supported yet."); } - public String getCollectorAddr() { - throw new UnsupportedOperationException("Not supported yet."); - } - @Override - public void removeCollectorAddr() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public void setCollectorAddr(String collectorAddr) { + public AppCollectorData getCollectorData() { throw new UnsupportedOperationException("Not supported yet."); }