YARN-5638. Introduce a collector timestamp to uniquely identify collectors creation order in collector discovery. Contributed by Li Lu.

This commit is contained in:
Sangjin Lee 2016-10-14 14:40:05 -07:00 committed by Varun Saxena
parent 3ba8596095
commit b92689ff30
27 changed files with 471 additions and 359 deletions

View File

@ -24,6 +24,7 @@ import java.util.Set;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
@ -47,7 +48,7 @@ public abstract class NodeHeartbeatRequest {
public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus, public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus,
MasterKey lastKnownContainerTokenMasterKey, MasterKey lastKnownContainerTokenMasterKey,
MasterKey lastKnownNMTokenMasterKey, Set<NodeLabel> nodeLabels, MasterKey lastKnownNMTokenMasterKey, Set<NodeLabel> nodeLabels,
Map<ApplicationId, String> registeredCollectors) { Map<ApplicationId, AppCollectorData> registeringCollectors) {
NodeHeartbeatRequest nodeHeartbeatRequest = NodeHeartbeatRequest nodeHeartbeatRequest =
Records.newRecord(NodeHeartbeatRequest.class); Records.newRecord(NodeHeartbeatRequest.class);
nodeHeartbeatRequest.setNodeStatus(nodeStatus); nodeHeartbeatRequest.setNodeStatus(nodeStatus);
@ -56,7 +57,7 @@ public abstract class NodeHeartbeatRequest {
nodeHeartbeatRequest nodeHeartbeatRequest
.setLastKnownNMTokenMasterKey(lastKnownNMTokenMasterKey); .setLastKnownNMTokenMasterKey(lastKnownNMTokenMasterKey);
nodeHeartbeatRequest.setNodeLabels(nodeLabels); nodeHeartbeatRequest.setNodeLabels(nodeLabels);
nodeHeartbeatRequest.setRegisteredCollectors(registeredCollectors); nodeHeartbeatRequest.setRegisteringCollectors(registeringCollectors);
return nodeHeartbeatRequest; return nodeHeartbeatRequest;
} }
@ -79,7 +80,9 @@ public abstract class NodeHeartbeatRequest {
List<LogAggregationReport> logAggregationReportsForApps); List<LogAggregationReport> logAggregationReportsForApps);
// This tells RM registered collectors' address info on this node // This tells RM registered collectors' address info on this node
public abstract Map<ApplicationId, String> getRegisteredCollectors(); public abstract Map<ApplicationId, AppCollectorData>
public abstract void setRegisteredCollectors(Map<ApplicationId, getRegisteringCollectors();
String> appCollectorsMap);
public abstract void setRegisteringCollectors(Map<ApplicationId,
AppCollectorData> appCollectorsMap);
} }

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeAction;
@ -47,10 +48,9 @@ public abstract class NodeHeartbeatResponse {
public abstract List<ApplicationId> getApplicationsToCleanup(); public abstract List<ApplicationId> getApplicationsToCleanup();
// This tells NM the collectors' address info of related apps // This tells NM the collectors' address info of related apps
public abstract Map<ApplicationId, String> getAppCollectorsMap(); public abstract Map<ApplicationId, AppCollectorData> getAppCollectors();
public abstract void setAppCollectors(
public abstract void setAppCollectorsMap( Map<ApplicationId, AppCollectorData> appCollectorsMap);
Map<ApplicationId, String> appCollectorsMap);
public abstract void setResponseId(int responseId); public abstract void setResponseId(int responseId);

View File

@ -22,14 +22,14 @@ import java.util.Arrays;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap; import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
@Private @Private
public abstract class ReportNewCollectorInfoRequest { public abstract class ReportNewCollectorInfoRequest {
public static ReportNewCollectorInfoRequest newInstance( public static ReportNewCollectorInfoRequest newInstance(
List<AppCollectorsMap> appCollectorsList) { List<AppCollectorData> appCollectorsList) {
ReportNewCollectorInfoRequest request = ReportNewCollectorInfoRequest request =
Records.newRecord(ReportNewCollectorInfoRequest.class); Records.newRecord(ReportNewCollectorInfoRequest.class);
request.setAppCollectorsList(appCollectorsList); request.setAppCollectorsList(appCollectorsList);
@ -41,13 +41,13 @@ public abstract class ReportNewCollectorInfoRequest {
ReportNewCollectorInfoRequest request = ReportNewCollectorInfoRequest request =
Records.newRecord(ReportNewCollectorInfoRequest.class); Records.newRecord(ReportNewCollectorInfoRequest.class);
request.setAppCollectorsList( request.setAppCollectorsList(
Arrays.asList(AppCollectorsMap.newInstance(id, collectorAddr))); Arrays.asList(AppCollectorData.newInstance(id, collectorAddr)));
return request; return request;
} }
public abstract List<AppCollectorsMap> getAppCollectorsList(); public abstract List<AppCollectorData> getAppCollectorsList();
public abstract void setAppCollectorsList( public abstract void setAppCollectorsList(
List<AppCollectorsMap> appCollectorsList); List<AppCollectorData> appCollectorsList);
} }

View File

@ -35,13 +35,14 @@ import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
@ -58,7 +59,7 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
private Set<NodeLabel> labels = null; private Set<NodeLabel> labels = null;
private List<LogAggregationReport> logAggregationReportsForApps = null; private List<LogAggregationReport> logAggregationReportsForApps = null;
private Map<ApplicationId, String> registeredCollectors = null; private Map<ApplicationId, AppCollectorData> registeringCollectors = null;
public NodeHeartbeatRequestPBImpl() { public NodeHeartbeatRequestPBImpl() {
builder = NodeHeartbeatRequestProto.newBuilder(); builder = NodeHeartbeatRequestProto.newBuilder();
@ -114,8 +115,8 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
if (this.logAggregationReportsForApps != null) { if (this.logAggregationReportsForApps != null) {
addLogAggregationStatusForAppsToProto(); addLogAggregationStatusForAppsToProto();
} }
if (this.registeredCollectors != null) { if (this.registeringCollectors != null) {
addRegisteredCollectorsToProto(); addRegisteringCollectorsToProto();
} }
} }
@ -158,14 +159,14 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
return ((LogAggregationReportPBImpl) value).getProto(); return ((LogAggregationReportPBImpl) value).getProto();
} }
private void addRegisteredCollectorsToProto() { private void addRegisteringCollectorsToProto() {
maybeInitBuilder(); maybeInitBuilder();
builder.clearRegisteredCollectors(); builder.clearRegisteringCollectors();
for (Map.Entry<ApplicationId, String> entry : for (Map.Entry<ApplicationId, AppCollectorData> entry :
registeredCollectors.entrySet()) { registeringCollectors.entrySet()) {
builder.addRegisteredCollectors(AppCollectorsMapProto.newBuilder() builder.addRegisteringCollectors(AppCollectorDataProto.newBuilder()
.setAppId(convertToProtoFormat(entry.getKey())) .setAppId(convertToProtoFormat(entry.getKey()))
.setAppCollectorAddr(entry.getValue())); .setAppCollectorAddr(entry.getValue().getCollectorAddr()));
} }
} }
@ -251,35 +252,37 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
} }
@Override @Override
public Map<ApplicationId, String> getRegisteredCollectors() { public Map<ApplicationId, AppCollectorData> getRegisteringCollectors() {
if (this.registeredCollectors != null) { if (this.registeringCollectors != null) {
return this.registeredCollectors; return this.registeringCollectors;
} }
initRegisteredCollectors(); initRegisteredCollectors();
return registeredCollectors; return registeringCollectors;
} }
private void initRegisteredCollectors() { private void initRegisteredCollectors() {
NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder; NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder;
List<AppCollectorsMapProto> list = p.getRegisteredCollectorsList(); List<AppCollectorDataProto> list = p.getRegisteringCollectorsList();
if (!list.isEmpty()) { if (!list.isEmpty()) {
this.registeredCollectors = new HashMap<>(); this.registeringCollectors = new HashMap<>();
for (AppCollectorsMapProto c : list) { for (AppCollectorDataProto c : list) {
ApplicationId appId = convertFromProtoFormat(c.getAppId()); ApplicationId appId = convertFromProtoFormat(c.getAppId());
this.registeredCollectors.put(appId, c.getAppCollectorAddr()); AppCollectorData data = AppCollectorData.newInstance(appId,
c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion());
this.registeringCollectors.put(appId, data);
} }
} }
} }
@Override @Override
public void setRegisteredCollectors( public void setRegisteringCollectors(
Map<ApplicationId, String> registeredCollectors) { Map<ApplicationId, AppCollectorData> registeredCollectors) {
if (registeredCollectors == null || registeredCollectors.isEmpty()) { if (registeredCollectors == null || registeredCollectors.isEmpty()) {
return; return;
} }
maybeInitBuilder(); maybeInitBuilder();
this.registeredCollectors = new HashMap<ApplicationId, String>(); this.registeringCollectors = new HashMap<>();
this.registeredCollectors.putAll(registeredCollectors); this.registeringCollectors.putAll(registeredCollectors);
} }
private NodeStatusPBImpl convertFromProtoFormat(NodeStatusProto p) { private NodeStatusPBImpl convertFromProtoFormat(NodeStatusProto p) {

View File

@ -45,11 +45,12 @@ import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueui
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeAction;
@ -70,7 +71,7 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
private List<ApplicationId> applicationsToCleanup = null; private List<ApplicationId> applicationsToCleanup = null;
private Map<ApplicationId, ByteBuffer> systemCredentials = null; private Map<ApplicationId, ByteBuffer> systemCredentials = null;
private Resource resource = null; private Resource resource = null;
private Map<ApplicationId, String> appCollectorsMap = null; private Map<ApplicationId, AppCollectorData> appCollectorsMap = null;
private MasterKey containerTokenMasterKey = null; private MasterKey containerTokenMasterKey = null;
private MasterKey nmTokenMasterKey = null; private MasterKey nmTokenMasterKey = null;
@ -146,11 +147,15 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
private void addAppCollectorsMapToProto() { private void addAppCollectorsMapToProto() {
maybeInitBuilder(); maybeInitBuilder();
builder.clearAppCollectorsMap(); builder.clearAppCollectors();
for (Map.Entry<ApplicationId, String> entry : appCollectorsMap.entrySet()) { for (Map.Entry<ApplicationId, AppCollectorData> entry
builder.addAppCollectorsMap(AppCollectorsMapProto.newBuilder() : appCollectorsMap.entrySet()) {
AppCollectorData data = entry.getValue();
builder.addAppCollectors(AppCollectorDataProto.newBuilder()
.setAppId(convertToProtoFormat(entry.getKey())) .setAppId(convertToProtoFormat(entry.getKey()))
.setAppCollectorAddr(entry.getValue())); .setAppCollectorAddr(data.getCollectorAddr())
.setRmIdentifier(data.getRMIdentifier())
.setVersion(data.getVersion()));
} }
} }
@ -568,7 +573,7 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
} }
@Override @Override
public Map<ApplicationId, String> getAppCollectorsMap() { public Map<ApplicationId, AppCollectorData> getAppCollectors() {
if (this.appCollectorsMap != null) { if (this.appCollectorsMap != null) {
return this.appCollectorsMap; return this.appCollectorsMap;
} }
@ -589,12 +594,14 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
private void initAppCollectorsMap() { private void initAppCollectorsMap() {
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
List<AppCollectorsMapProto> list = p.getAppCollectorsMapList(); List<AppCollectorDataProto> list = p.getAppCollectorsList();
if (!list.isEmpty()) { if (!list.isEmpty()) {
this.appCollectorsMap = new HashMap<>(); this.appCollectorsMap = new HashMap<>();
for (AppCollectorsMapProto c : list) { for (AppCollectorDataProto c : list) {
ApplicationId appId = convertFromProtoFormat(c.getAppId()); ApplicationId appId = convertFromProtoFormat(c.getAppId());
this.appCollectorsMap.put(appId, c.getAppCollectorAddr()); AppCollectorData data = AppCollectorData.newInstance(appId,
c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion());
this.appCollectorsMap.put(appId, data);
} }
} }
} }
@ -611,14 +618,14 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
} }
@Override @Override
public void setAppCollectorsMap( public void setAppCollectors(
Map<ApplicationId, String> appCollectorsMap) { Map<ApplicationId, AppCollectorData> appCollectors) {
if (appCollectorsMap == null || appCollectorsMap.isEmpty()) { if (appCollectors == null || appCollectors.isEmpty()) {
return; return;
} }
maybeInitBuilder(); maybeInitBuilder();
this.appCollectorsMap = new HashMap<ApplicationId, String>(); this.appCollectorsMap = new HashMap<>();
this.appCollectorsMap.putAll(appCollectorsMap); this.appCollectorsMap.putAll(appCollectors);
} }
@Override @Override

View File

@ -20,12 +20,12 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProtoOrBuilder;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap; import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.server.api.records.impl.pb.AppCollectorsMapPBImpl; import org.apache.hadoop.yarn.server.api.records.impl.pb.AppCollectorDataPBImpl;
public class ReportNewCollectorInfoRequestPBImpl extends public class ReportNewCollectorInfoRequestPBImpl extends
ReportNewCollectorInfoRequest { ReportNewCollectorInfoRequest {
@ -36,7 +36,7 @@ public class ReportNewCollectorInfoRequestPBImpl extends
private ReportNewCollectorInfoRequestProto.Builder builder = null; private ReportNewCollectorInfoRequestProto.Builder builder = null;
private boolean viaProto = false; private boolean viaProto = false;
private List<AppCollectorsMap> collectorsList = null; private List<AppCollectorData> collectorsList = null;
public ReportNewCollectorInfoRequestPBImpl() { public ReportNewCollectorInfoRequestPBImpl() {
builder = ReportNewCollectorInfoRequestProto.newBuilder(); builder = ReportNewCollectorInfoRequestProto.newBuilder();
@ -96,9 +96,9 @@ public class ReportNewCollectorInfoRequestPBImpl extends
private void addLocalCollectorsToProto() { private void addLocalCollectorsToProto() {
maybeInitBuilder(); maybeInitBuilder();
builder.clearAppCollectors(); builder.clearAppCollectors();
List<AppCollectorsMapProto> protoList = List<AppCollectorDataProto> protoList =
new ArrayList<AppCollectorsMapProto>(); new ArrayList<AppCollectorDataProto>();
for (AppCollectorsMap m : this.collectorsList) { for (AppCollectorData m : this.collectorsList) {
protoList.add(convertToProtoFormat(m)); protoList.add(convertToProtoFormat(m));
} }
builder.addAllAppCollectors(protoList); builder.addAllAppCollectors(protoList);
@ -106,16 +106,16 @@ public class ReportNewCollectorInfoRequestPBImpl extends
private void initLocalCollectorsList() { private void initLocalCollectorsList() {
ReportNewCollectorInfoRequestProtoOrBuilder p = viaProto ? proto : builder; ReportNewCollectorInfoRequestProtoOrBuilder p = viaProto ? proto : builder;
List<AppCollectorsMapProto> list = List<AppCollectorDataProto> list =
p.getAppCollectorsList(); p.getAppCollectorsList();
this.collectorsList = new ArrayList<AppCollectorsMap>(); this.collectorsList = new ArrayList<AppCollectorData>();
for (AppCollectorsMapProto m : list) { for (AppCollectorDataProto m : list) {
this.collectorsList.add(convertFromProtoFormat(m)); this.collectorsList.add(convertFromProtoFormat(m));
} }
} }
@Override @Override
public List<AppCollectorsMap> getAppCollectorsList() { public List<AppCollectorData> getAppCollectorsList() {
if (this.collectorsList == null) { if (this.collectorsList == null) {
initLocalCollectorsList(); initLocalCollectorsList();
} }
@ -123,7 +123,7 @@ public class ReportNewCollectorInfoRequestPBImpl extends
} }
@Override @Override
public void setAppCollectorsList(List<AppCollectorsMap> appCollectorsList) { public void setAppCollectorsList(List<AppCollectorData> appCollectorsList) {
maybeInitBuilder(); maybeInitBuilder();
if (appCollectorsList == null) { if (appCollectorsList == null) {
builder.clearAppCollectors(); builder.clearAppCollectors();
@ -131,14 +131,14 @@ public class ReportNewCollectorInfoRequestPBImpl extends
this.collectorsList = appCollectorsList; this.collectorsList = appCollectorsList;
} }
private AppCollectorsMapPBImpl convertFromProtoFormat( private AppCollectorDataPBImpl convertFromProtoFormat(
AppCollectorsMapProto p) { AppCollectorDataProto p) {
return new AppCollectorsMapPBImpl(p); return new AppCollectorDataPBImpl(p);
} }
private AppCollectorsMapProto convertToProtoFormat( private AppCollectorDataProto convertToProtoFormat(
AppCollectorsMap m) { AppCollectorData m) {
return ((AppCollectorsMapPBImpl) m).getProto(); return ((AppCollectorDataPBImpl) m).getProto();
} }
} }

View File

@ -0,0 +1,104 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.records;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.util.Records;
@Private
@InterfaceStability.Unstable
public abstract class AppCollectorData {
protected static final long DEFAULT_TIMESTAMP_VALUE = -1;
public static AppCollectorData newInstance(
ApplicationId id, String collectorAddr, long rmIdentifier, long version) {
AppCollectorData appCollectorData =
Records.newRecord(AppCollectorData.class);
appCollectorData.setApplicationId(id);
appCollectorData.setCollectorAddr(collectorAddr);
appCollectorData.setRMIdentifier(rmIdentifier);
appCollectorData.setVersion(version);
return appCollectorData;
}
public static AppCollectorData newInstance(ApplicationId id,
String collectorAddr) {
return newInstance(id, collectorAddr, DEFAULT_TIMESTAMP_VALUE,
DEFAULT_TIMESTAMP_VALUE);
}
/**
* Returns if a collector data item happens before another one. Null data
* items happens before any other non-null items. Non-null data items A
* happens before another non-null item B when A's rmIdentifier is less than
* B's rmIdentifier. Or A's version is less than B's if they have the same
* rmIdentifier.
*
* @param dataA first collector data item.
* @param dataB second collector data item.
* @return true if dataA happens before dataB.
*/
public static boolean happensBefore(AppCollectorData dataA,
AppCollectorData dataB) {
if (dataA == null && dataB == null) {
return false;
} else if (dataA == null || dataB == null) {
return dataA == null;
}
return
(dataA.getRMIdentifier() < dataB.getRMIdentifier())
|| ((dataA.getRMIdentifier() == dataB.getRMIdentifier())
&& (dataA.getVersion() < dataB.getVersion()));
}
/**
* Returns if the collector data has been stamped by the RM with a RM cluster
* timestamp and a version number.
*
* @return true if RM has already assigned a timestamp for this collector.
* Otherwise, it means the RM has not recognized the existence of this
* collector.
*/
public boolean isStamped() {
return (getRMIdentifier() != DEFAULT_TIMESTAMP_VALUE)
|| (getVersion() != DEFAULT_TIMESTAMP_VALUE);
}
public abstract ApplicationId getApplicationId();
public abstract void setApplicationId(ApplicationId id);
public abstract String getCollectorAddr();
public abstract void setCollectorAddr(String addr);
public abstract long getRMIdentifier();
public abstract void setRMIdentifier(long rmId);
public abstract long getVersion();
public abstract void setVersion(long version);
}

View File

@ -1,46 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.records;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.util.Records;
@Private
public abstract class AppCollectorsMap {
public static AppCollectorsMap newInstance(
ApplicationId id, String collectorAddr) {
AppCollectorsMap appCollectorsMap =
Records.newRecord(AppCollectorsMap.class);
appCollectorsMap.setApplicationId(id);
appCollectorsMap.setCollectorAddr(collectorAddr);
return appCollectorsMap;
}
public abstract ApplicationId getApplicationId();
public abstract void setApplicationId(ApplicationId id);
public abstract String getCollectorAddr();
public abstract void setCollectorAddr(String addr);
}

View File

@ -21,37 +21,39 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap; import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProtoOrBuilder;
import com.google.protobuf.TextFormat; import com.google.protobuf.TextFormat;
@Private @Private
@Unstable @Unstable
public class AppCollectorsMapPBImpl extends AppCollectorsMap { public class AppCollectorDataPBImpl extends AppCollectorData {
private AppCollectorsMapProto proto = private AppCollectorDataProto proto =
AppCollectorsMapProto.getDefaultInstance(); AppCollectorDataProto.getDefaultInstance();
private AppCollectorsMapProto.Builder builder = null; private AppCollectorDataProto.Builder builder = null;
private boolean viaProto = false; private boolean viaProto = false;
private ApplicationId appId = null; private ApplicationId appId = null;
private String collectorAddr = null; private String collectorAddr = null;
private Long rmIdentifier = null;
private Long version = null;
public AppCollectorsMapPBImpl() { public AppCollectorDataPBImpl() {
builder = AppCollectorsMapProto.newBuilder(); builder = AppCollectorDataProto.newBuilder();
} }
public AppCollectorsMapPBImpl(AppCollectorsMapProto proto) { public AppCollectorDataPBImpl(AppCollectorDataProto proto) {
this.proto = proto; this.proto = proto;
viaProto = true; viaProto = true;
} }
public AppCollectorsMapProto getProto() { public AppCollectorDataProto getProto() {
mergeLocalToProto(); mergeLocalToProto();
proto = viaProto ? proto : builder.build(); proto = viaProto ? proto : builder.build();
viaProto = true; viaProto = true;
@ -81,7 +83,7 @@ public class AppCollectorsMapPBImpl extends AppCollectorsMap {
@Override @Override
public ApplicationId getApplicationId() { public ApplicationId getApplicationId() {
AppCollectorsMapProtoOrBuilder p = viaProto ? proto : builder; AppCollectorDataProtoOrBuilder p = viaProto ? proto : builder;
if (this.appId == null && p.hasAppId()) { if (this.appId == null && p.hasAppId()) {
this.appId = convertFromProtoFormat(p.getAppId()); this.appId = convertFromProtoFormat(p.getAppId());
} }
@ -90,7 +92,7 @@ public class AppCollectorsMapPBImpl extends AppCollectorsMap {
@Override @Override
public String getCollectorAddr() { public String getCollectorAddr() {
AppCollectorsMapProtoOrBuilder p = viaProto ? proto : builder; AppCollectorDataProtoOrBuilder p = viaProto ? proto : builder;
if (this.collectorAddr == null if (this.collectorAddr == null
&& p.hasAppCollectorAddr()) { && p.hasAppCollectorAddr()) {
this.collectorAddr = p.getAppCollectorAddr(); this.collectorAddr = p.getAppCollectorAddr();
@ -116,6 +118,46 @@ public class AppCollectorsMapPBImpl extends AppCollectorsMap {
this.collectorAddr = collectorAddr; this.collectorAddr = collectorAddr;
} }
@Override
public long getRMIdentifier() {
AppCollectorDataProtoOrBuilder p = viaProto ? proto : builder;
if (this.rmIdentifier == null && p.hasRmIdentifier()) {
this.rmIdentifier = p.getRmIdentifier();
}
if (this.rmIdentifier != null) {
return this.rmIdentifier;
} else {
return AppCollectorData.DEFAULT_TIMESTAMP_VALUE;
}
}
@Override
public void setRMIdentifier(long rmId) {
maybeInitBuilder();
this.rmIdentifier = rmId;
builder.setRmIdentifier(rmId);
}
@Override
public long getVersion() {
AppCollectorDataProtoOrBuilder p = viaProto ? proto : builder;
if (this.version == null && p.hasRmIdentifier()) {
this.version = p.getRmIdentifier();
}
if (this.version != null) {
return this.version;
} else {
return AppCollectorData.DEFAULT_TIMESTAMP_VALUE;
}
}
@Override
public void setVersion(long version) {
maybeInitBuilder();
this.version = version;
builder.setVersion(version);
}
private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
return new ApplicationIdPBImpl(p); return new ApplicationIdPBImpl(p);
} }
@ -126,7 +168,7 @@ public class AppCollectorsMapPBImpl extends AppCollectorsMap {
private void maybeInitBuilder() { private void maybeInitBuilder() {
if (viaProto || builder == null) { if (viaProto || builder == null) {
builder = AppCollectorsMapProto.newBuilder(proto); builder = AppCollectorDataProto.newBuilder(proto);
} }
viaProto = false; viaProto = false;
} }
@ -147,6 +189,12 @@ public class AppCollectorsMapPBImpl extends AppCollectorsMap {
if (this.collectorAddr != null) { if (this.collectorAddr != null) {
builder.setAppCollectorAddr(this.collectorAddr); builder.setAppCollectorAddr(this.collectorAddr);
} }
if (this.rmIdentifier != null) {
builder.setRmIdentifier(this.rmIdentifier);
}
if (this.version != null) {
builder.setVersion(this.version);
}
} }
} }

View File

@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -15,26 +15,5 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
/** Server records PB implementations. */
package org.apache.hadoop.yarn.server.resourcemanager.rmapp; package org.apache.hadoop.yarn.server.api.records.impl.pb;
import org.apache.hadoop.yarn.api.records.ApplicationId;
/**
* Event used for updating collector address in RMApp on node heartbeat.
*/
public class RMAppCollectorUpdateEvent extends RMAppEvent {
private final String appCollectorAddr;
public RMAppCollectorUpdateEvent(ApplicationId appId,
String appCollectorAddr) {
super(appId, RMAppEventType.COLLECTOR_UPDATE);
this.appCollectorAddr = appCollectorAddr;
}
public String getAppCollectorAddr(){
return this.appCollectorAddr;
}
}

View File

@ -89,7 +89,7 @@ message NodeHeartbeatRequestProto {
optional MasterKeyProto last_known_nm_token_master_key = 3; optional MasterKeyProto last_known_nm_token_master_key = 3;
optional NodeLabelsProto nodeLabels = 4; optional NodeLabelsProto nodeLabels = 4;
repeated LogAggregationReportProto log_aggregation_reports_for_apps = 5; repeated LogAggregationReportProto log_aggregation_reports_for_apps = 5;
repeated AppCollectorsMapProto registered_collectors = 6; repeated AppCollectorDataProto registering_collectors = 6;
} }
message LogAggregationReportProto { message LogAggregationReportProto {
@ -114,7 +114,7 @@ message NodeHeartbeatResponseProto {
repeated SignalContainerRequestProto containers_to_signal = 13; repeated SignalContainerRequestProto containers_to_signal = 13;
optional ResourceProto resource = 14; optional ResourceProto resource = 14;
optional ContainerQueuingLimitProto container_queuing_limit = 15; optional ContainerQueuingLimitProto container_queuing_limit = 15;
repeated AppCollectorsMapProto app_collectors_map = 16; repeated AppCollectorDataProto app_collectors = 16;
} }
message ContainerQueuingLimitProto { message ContainerQueuingLimitProto {
@ -130,16 +130,18 @@ message SystemCredentialsForAppsProto {
//////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////
////// From collector_nodemanager_protocol //////////////////////////// ////// From collector_nodemanager_protocol ////////////////////////////
//////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////
message AppCollectorsMapProto { message AppCollectorDataProto {
optional ApplicationIdProto appId = 1; optional ApplicationIdProto app_id = 1;
optional string appCollectorAddr = 2; optional string app_collector_addr = 2;
optional int64 rm_identifier = 3 [default = -1];
optional int64 version = 4 [default = -1];
} }
////////////////////////////////////////////////////// //////////////////////////////////////////////////////
/////// collector_nodemanager_protocol ////////////// /////// collector_nodemanager_protocol //////////////
////////////////////////////////////////////////////// //////////////////////////////////////////////////////
message ReportNewCollectorInfoRequestProto { message ReportNewCollectorInfoRequestProto {
repeated AppCollectorsMapProto app_collectors = 1; repeated AppCollectorDataProto app_collectors = 1;
} }
message ReportNewCollectorInfoResponseProto { message ReportNewCollectorInfoResponseProto {

View File

@ -75,7 +75,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorCon
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse;
import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap; import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -420,10 +420,10 @@ public class TestRPC {
public ReportNewCollectorInfoResponse reportNewCollectorInfo( public ReportNewCollectorInfoResponse reportNewCollectorInfo(
ReportNewCollectorInfoRequest request) ReportNewCollectorInfoRequest request)
throws YarnException, IOException { throws YarnException, IOException {
List<AppCollectorsMap> appCollectors = request.getAppCollectorsList(); List<AppCollectorData> appCollectors = request.getAppCollectorsList();
if (appCollectors.size() == 1) { if (appCollectors.size() == 1) {
// check default appID and collectorAddr // check default appID and collectorAddr
AppCollectorsMap appCollector = appCollectors.get(0); AppCollectorData appCollector = appCollectors.get(0);
Assert.assertEquals(appCollector.getApplicationId(), Assert.assertEquals(appCollector.getApplicationId(),
DEFAULT_APP_ID); DEFAULT_APP_ID);
Assert.assertEquals(appCollector.getCollectorAddr(), Assert.assertEquals(appCollector.getCollectorAddr(),

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatRe
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UnRegisterNodeManagerRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UnRegisterNodeManagerRequestPBImpl;
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
@ -109,14 +110,14 @@ public class TestYarnServerApiClasses {
original.setLastKnownNMTokenMasterKey(getMasterKey()); original.setLastKnownNMTokenMasterKey(getMasterKey());
original.setNodeStatus(getNodeStatus()); original.setNodeStatus(getNodeStatus());
original.setNodeLabels(getValidNodeLabels()); original.setNodeLabels(getValidNodeLabels());
Map<ApplicationId, String> collectors = getCollectors(); Map<ApplicationId, AppCollectorData> collectors = getCollectors();
original.setRegisteredCollectors(collectors); original.setRegisteringCollectors(collectors);
NodeHeartbeatRequestPBImpl copy = new NodeHeartbeatRequestPBImpl( NodeHeartbeatRequestPBImpl copy = new NodeHeartbeatRequestPBImpl(
original.getProto()); original.getProto());
assertEquals(1, copy.getLastKnownContainerTokenMasterKey().getKeyId()); assertEquals(1, copy.getLastKnownContainerTokenMasterKey().getKeyId());
assertEquals(1, copy.getLastKnownNMTokenMasterKey().getKeyId()); assertEquals(1, copy.getLastKnownNMTokenMasterKey().getKeyId());
assertEquals("localhost", copy.getNodeStatus().getNodeId().getHost()); assertEquals("localhost", copy.getNodeStatus().getNodeId().getHost());
assertEquals(collectors, copy.getRegisteredCollectors()); assertEquals(collectors, copy.getRegisteringCollectors());
// check labels are coming with valid values // check labels are coming with valid values
Assert.assertTrue(original.getNodeLabels() Assert.assertTrue(original.getNodeLabels()
.containsAll(copy.getNodeLabels())); .containsAll(copy.getNodeLabels()));
@ -153,8 +154,8 @@ public class TestYarnServerApiClasses {
original.setNextHeartBeatInterval(1000); original.setNextHeartBeatInterval(1000);
original.setNodeAction(NodeAction.NORMAL); original.setNodeAction(NodeAction.NORMAL);
original.setResponseId(100); original.setResponseId(100);
Map<ApplicationId, String> collectors = getCollectors(); Map<ApplicationId, AppCollectorData> collectors = getCollectors();
original.setAppCollectorsMap(collectors); original.setAppCollectors(collectors);
NodeHeartbeatResponsePBImpl copy = new NodeHeartbeatResponsePBImpl( NodeHeartbeatResponsePBImpl copy = new NodeHeartbeatResponsePBImpl(
original.getProto()); original.getProto());
@ -164,7 +165,7 @@ public class TestYarnServerApiClasses {
assertEquals(1, copy.getContainerTokenMasterKey().getKeyId()); assertEquals(1, copy.getContainerTokenMasterKey().getKeyId());
assertEquals(1, copy.getNMTokenMasterKey().getKeyId()); assertEquals(1, copy.getNMTokenMasterKey().getKeyId());
assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage()); assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage());
assertEquals(collectors, copy.getAppCollectorsMap()); assertEquals(collectors, copy.getAppCollectors());
assertEquals(false, copy.getAreNodeLabelsAcceptedByRM()); assertEquals(false, copy.getAreNodeLabelsAcceptedByRM());
} }
@ -344,12 +345,13 @@ public class TestYarnServerApiClasses {
return nodeLabels; return nodeLabels;
} }
private Map<ApplicationId, String> getCollectors() { private Map<ApplicationId, AppCollectorData> getCollectors() {
ApplicationId appID = ApplicationId.newInstance(1L, 1); ApplicationId appID = ApplicationId.newInstance(1L, 1);
String collectorAddr = "localhost:0"; String collectorAddr = "localhost:0";
Map<ApplicationId, String> collectorMap = AppCollectorData data = AppCollectorData.newInstance(appID, collectorAddr);
new HashMap<ApplicationId, String>(); Map<ApplicationId, AppCollectorData> collectorMap =
collectorMap.put(appID, collectorAddr); new HashMap<>();
collectorMap.put(appID, data);
return collectorMap; return collectorMap;
} }

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
@ -74,11 +75,18 @@ public interface Context {
Map<ApplicationId, Credentials> getSystemCredentialsForApps(); Map<ApplicationId, Credentials> getSystemCredentialsForApps();
/** /**
* Get the registered collectors that located on this NM. * Get the list of collectors that are registering with the RM from this node.
* @return registered collectors, or null if the timeline service v.2 is not * @return registering collectors, or null if the timeline service v.2 is not
* enabled * enabled
*/ */
Map<ApplicationId, String> getRegisteredCollectors(); Map<ApplicationId, AppCollectorData> getRegisteringCollectors();
/**
* Get the list of collectors registered with the RM and known by this node.
* @return known collectors, or null if the timeline service v.2 is not
* enabled.
*/
Map<ApplicationId, AppCollectorData> getKnownCollectors();
ConcurrentMap<ContainerId, Container> getContainers(); ConcurrentMap<ContainerId, Container> getContainers();

View File

@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
import org.apache.hadoop.yarn.server.nodemanager.collectormanager.NMCollectorService; import org.apache.hadoop.yarn.server.nodemanager.collectormanager.NMCollectorService;
@ -486,7 +487,9 @@ public class NodeManager extends CompositeService
protected final ConcurrentMap<ContainerId, Container> containers = protected final ConcurrentMap<ContainerId, Container> containers =
new ConcurrentSkipListMap<ContainerId, Container>(); new ConcurrentSkipListMap<ContainerId, Container>();
private Map<ApplicationId, String> registeredCollectors; private Map<ApplicationId, AppCollectorData> registeringCollectors;
private Map<ApplicationId, AppCollectorData> knownCollectors;
protected final ConcurrentMap<ContainerId, protected final ConcurrentMap<ContainerId,
org.apache.hadoop.yarn.api.records.Container> increasedContainers = org.apache.hadoop.yarn.api.records.Container> increasedContainers =
@ -521,7 +524,8 @@ public class NodeManager extends CompositeService
NMStateStoreService stateStore, boolean isDistSchedulingEnabled, NMStateStoreService stateStore, boolean isDistSchedulingEnabled,
Configuration conf) { Configuration conf) {
if (YarnConfiguration.timelineServiceV2Enabled(conf)) { if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
this.registeredCollectors = new ConcurrentHashMap<>(); this.registeringCollectors = new ConcurrentHashMap<>();
this.knownCollectors = new ConcurrentHashMap<>();
} }
this.containerTokenSecretManager = containerTokenSecretManager; this.containerTokenSecretManager = containerTokenSecretManager;
this.nmTokenSecretManager = nmTokenSecretManager; this.nmTokenSecretManager = nmTokenSecretManager;
@ -682,18 +686,13 @@ public class NodeManager extends CompositeService
} }
@Override @Override
public Map<ApplicationId, String> getRegisteredCollectors() { public Map<ApplicationId, AppCollectorData> getRegisteringCollectors() {
return this.registeredCollectors; return this.registeringCollectors;
} }
public void addRegisteredCollectors( @Override
Map<ApplicationId, String> newRegisteredCollectors) { public Map<ApplicationId, AppCollectorData> getKnownCollectors() {
if (registeredCollectors != null) { return this.knownCollectors;
this.registeredCollectors.putAll(newRegisteredCollectors);
} else {
LOG.warn("collectors are added when the registered collectors are " +
"initialized");
}
} }
@Override @Override

View File

@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequ
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.MasterKey;
@ -822,7 +823,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
.getNMTokenSecretManager().getCurrentKey(), .getNMTokenSecretManager().getCurrentKey(),
nodeLabelsForHeartbeat, nodeLabelsForHeartbeat,
NodeStatusUpdaterImpl.this.context NodeStatusUpdaterImpl.this.context
.getRegisteredCollectors()); .getRegisteringCollectors());
if (logAggregationEnabled) { if (logAggregationEnabled) {
// pull log aggregation status for application running in this NM // pull log aggregation status for application running in this NM
@ -915,7 +916,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
} }
} }
if (YarnConfiguration.timelineServiceV2Enabled(context.getConf())) { if (YarnConfiguration.timelineServiceV2Enabled(context.getConf())) {
updateTimelineClientsAddress(response); updateTimelineCollectorData(response);
} }
} catch (ConnectException e) { } catch (ConnectException e) {
@ -945,40 +946,48 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
} }
} }
private void updateTimelineClientsAddress( private void updateTimelineCollectorData(
NodeHeartbeatResponse response) { NodeHeartbeatResponse response) {
Map<ApplicationId, String> knownCollectorsMap = Map<ApplicationId, AppCollectorData> incomingCollectorsMap =
response.getAppCollectorsMap(); response.getAppCollectors();
if (knownCollectorsMap == null) { if (incomingCollectorsMap == null) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("No collectors to update RM"); LOG.debug("No collectors to update RM");
} }
} else { return;
Set<Map.Entry<ApplicationId, String>> rmKnownCollectors =
knownCollectorsMap.entrySet();
for (Map.Entry<ApplicationId, String> entry : rmKnownCollectors) {
ApplicationId appId = entry.getKey();
String collectorAddr = entry.getValue();
// Only handle applications running on local node.
// Not include apps with timeline collectors running in local
Application application = context.getApplications().get(appId);
// TODO this logic could be problematic if the collector address
// gets updated due to NM restart or collector service failure
if (application != null &&
!context.getRegisteredCollectors().containsKey(appId)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Sync a new collector address: " + collectorAddr +
" for application: " + appId + " from RM.");
} }
NMTimelinePublisher nmTimelinePublisher = Map<ApplicationId, AppCollectorData> knownCollectors
context.getNMTimelinePublisher(); = context.getKnownCollectors();
for (Map.Entry<ApplicationId, AppCollectorData> entry
: incomingCollectorsMap.entrySet()) {
ApplicationId appId = entry.getKey();
AppCollectorData collectorData = entry.getValue();
// Only handle applications running on local node.
Application application = context.getApplications().get(appId);
if (application != null) {
// Update collector data if the newly received data happens after
// the known data (updates the known data).
AppCollectorData existingData = knownCollectors.get(appId);
if (AppCollectorData.happensBefore(existingData, collectorData)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Sync a new collector address: "
+ collectorData.getCollectorAddr()
+ " for application: " + appId + " from RM.");
}
// Update information for clients.
NMTimelinePublisher nmTimelinePublisher
= context.getNMTimelinePublisher();
if (nmTimelinePublisher != null) { if (nmTimelinePublisher != null) {
nmTimelinePublisher.setTimelineServiceAddress( nmTimelinePublisher.setTimelineServiceAddress(
application.getAppId(), collectorAddr); application.getAppId(), collectorData.getCollectorAddr());
} }
// Update information for the node manager itself.
knownCollectors.put(appId, collectorData);
} }
} }
// Remove the registering collector data
context.getRegisteringCollectors().remove(entry.getKey());
} }
} }

View File

@ -37,9 +37,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorCon
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse;
import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap; import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
@ -107,23 +106,31 @@ public class NMCollectorService extends CompositeService implements
@Override @Override
public ReportNewCollectorInfoResponse reportNewCollectorInfo( public ReportNewCollectorInfoResponse reportNewCollectorInfo(
ReportNewCollectorInfoRequest request) throws YarnException, IOException { ReportNewCollectorInfoRequest request) throws YarnException, IOException {
List<AppCollectorsMap> newCollectorsList = request.getAppCollectorsList(); List<AppCollectorData> newCollectorsList = request.getAppCollectorsList();
if (newCollectorsList != null && !newCollectorsList.isEmpty()) { if (newCollectorsList != null && !newCollectorsList.isEmpty()) {
Map<ApplicationId, String> newCollectorsMap = Map<ApplicationId, AppCollectorData> newCollectorsMap =
new HashMap<ApplicationId, String>(); new HashMap<>();
for (AppCollectorsMap collector : newCollectorsList) { for (AppCollectorData collector : newCollectorsList) {
ApplicationId appId = collector.getApplicationId(); ApplicationId appId = collector.getApplicationId();
String collectorAddr = collector.getCollectorAddr(); newCollectorsMap.put(appId, collector);
newCollectorsMap.put(appId, collectorAddr);
// set registered collector address to TimelineClient. // set registered collector address to TimelineClient.
// TODO: Do we need to do this after we received confirmation from
// the RM?
NMTimelinePublisher nmTimelinePublisher = NMTimelinePublisher nmTimelinePublisher =
context.getNMTimelinePublisher(); context.getNMTimelinePublisher();
if (nmTimelinePublisher != null) { if (nmTimelinePublisher != null) {
nmTimelinePublisher.setTimelineServiceAddress(appId, collectorAddr); nmTimelinePublisher.setTimelineServiceAddress(appId,
collector.getCollectorAddr());
} }
} }
((NodeManager.NMContext)context).addRegisteredCollectors( Map<ApplicationId, AppCollectorData> registeringCollectors
newCollectorsMap); = context.getRegisteringCollectors();
if (registeringCollectors != null) {
registeringCollectors.putAll(newCollectorsMap);
} else {
LOG.warn("collectors are added when the registered collectors are " +
"initialized");
}
} }
return ReportNewCollectorInfoResponse.newInstance(); return ReportNewCollectorInfoResponse.newInstance();

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
@ -544,6 +545,29 @@ public class ApplicationImpl implements Application {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
static class AppCompletelyDoneTransition implements static class AppCompletelyDoneTransition implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> { SingleArcTransition<ApplicationImpl, ApplicationEvent> {
private void updateCollectorStatus(ApplicationImpl app) {
// Remove collectors info for finished apps.
// TODO check we remove related collectors info in failure cases
// (YARN-3038)
Map<ApplicationId, AppCollectorData> registeringCollectors
= app.context.getRegisteringCollectors();
if (registeringCollectors != null) {
registeringCollectors.remove(app.getAppId());
}
Map<ApplicationId, AppCollectorData> knownCollectors =
app.context.getKnownCollectors();
if (knownCollectors != null) {
knownCollectors.remove(app.getAppId());
}
// stop timelineClient when application get finished.
NMTimelinePublisher nmTimelinePublisher =
app.context.getNMTimelinePublisher();
if (nmTimelinePublisher != null) {
nmTimelinePublisher.stopTimelineClient(app.getAppId());
}
}
@Override @Override
public void transition(ApplicationImpl app, ApplicationEvent event) { public void transition(ApplicationImpl app, ApplicationEvent event) {
@ -552,20 +576,7 @@ public class ApplicationImpl implements Application {
new LogHandlerAppFinishedEvent(app.appId)); new LogHandlerAppFinishedEvent(app.appId));
app.context.getNMTokenSecretManager().appFinished(app.getAppId()); app.context.getNMTokenSecretManager().appFinished(app.getAppId());
// Remove collectors info for finished apps. updateCollectorStatus(app);
// TODO check we remove related collectors info in failure cases
// (YARN-3038)
Map<ApplicationId, String> registeredCollectors =
app.context.getRegisteredCollectors();
if (registeredCollectors != null) {
registeredCollectors.remove(app.getAppId());
}
// stop timelineClient when application get finished.
NMTimelinePublisher nmTimelinePublisher =
app.context.getNMTimelinePublisher();
if (nmTimelinePublisher != null) {
nmTimelinePublisher.stopTimelineClient(app.getAppId());
}
} }
} }

View File

@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.Context;
@ -618,8 +619,11 @@ public abstract class BaseAMRMProxyTest {
return null; return null;
} }
@Override public Map<ApplicationId, AppCollectorData> getRegisteringCollectors() {
public Map<ApplicationId, String> getRegisteredCollectors() { return null;
}
@Override public Map<ApplicationId, AppCollectorData> getKnownCollectors() {
return null; return null;
} }

View File

@ -82,8 +82,10 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
@ -319,7 +321,7 @@ public class ApplicationMasterService extends AbstractService implements
// Remove collector address when app get finished. // Remove collector address when app get finished.
if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) { if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
rmApp.removeCollectorAddr(); ((RMAppImpl) rmApp).removeCollectorData();
} }
// checking whether the app exits in RMStateStore at first not to throw // checking whether the app exits in RMStateStore at first not to throw
// ApplicationDoesNotExistInCacheException before and after // ApplicationDoesNotExistInCacheException before and after
@ -605,8 +607,10 @@ public class ApplicationMasterService extends AbstractService implements
// add collector address for this application // add collector address for this application
if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) { if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
allocateResponse.setCollectorAddr( AppCollectorData data = app.getCollectorData();
this.rmContext.getRMApps().get(applicationId).getCollectorAddr()); if (data != null) {
allocateResponse.setCollectorAddr(data.getCollectorAddr());
}
} }
// add preemption to the allocateResponse message (if any) // add preemption to the allocateResponse message (if any)

View File

@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@ -63,13 +64,14 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequ
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppCollectorUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@ -119,6 +121,8 @@ public class ResourceTrackerService extends AbstractService implements
private boolean isDelegatedCentralizedNodeLabelsConf; private boolean isDelegatedCentralizedNodeLabelsConf;
private DynamicResourceConfiguration drConf; private DynamicResourceConfiguration drConf;
private final AtomicLong timelineCollectorVersion = new AtomicLong(0);
public ResourceTrackerService(RMContext rmContext, public ResourceTrackerService(RMContext rmContext,
NodesListManager nodesListManager, NodesListManager nodesListManager,
NMLivelinessMonitor nmLivelinessMonitor, NMLivelinessMonitor nmLivelinessMonitor,
@ -525,9 +529,6 @@ public class ResourceTrackerService extends AbstractService implements
YarnConfiguration.timelineServiceV2Enabled(getConfig()); YarnConfiguration.timelineServiceV2Enabled(getConfig());
if (timelineV2Enabled) { if (timelineV2Enabled) {
// Check & update collectors info from request. // Check & update collectors info from request.
// TODO make sure it won't have race condition issue for AM failed over
// case that the older registration could possible override the newer
// one.
updateAppCollectorsMap(request); updateAppCollectorsMap(request);
} }
@ -613,14 +614,14 @@ public class ResourceTrackerService extends AbstractService implements
private void setAppCollectorsMapToResponse( private void setAppCollectorsMapToResponse(
List<ApplicationId> runningApps, NodeHeartbeatResponse response) { List<ApplicationId> runningApps, NodeHeartbeatResponse response) {
Map<ApplicationId, String> liveAppCollectorsMap = new Map<ApplicationId, AppCollectorData> liveAppCollectorsMap = new
HashMap<ApplicationId, String>(); HashMap<>();
Map<ApplicationId, RMApp> rmApps = rmContext.getRMApps(); Map<ApplicationId, RMApp> rmApps = rmContext.getRMApps();
// Set collectors for all running apps on this node. // Set collectors for all running apps on this node.
for (ApplicationId appId : runningApps) { for (ApplicationId appId : runningApps) {
String appCollectorAddr = rmApps.get(appId).getCollectorAddr(); AppCollectorData appCollectorData = rmApps.get(appId).getCollectorData();
if (appCollectorAddr != null) { if (appCollectorData != null) {
liveAppCollectorsMap.put(appId, appCollectorAddr); liveAppCollectorsMap.put(appId, appCollectorData);
} else { } else {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Collector for applicaton: " + appId + LOG.debug("Collector for applicaton: " + appId +
@ -628,32 +629,43 @@ public class ResourceTrackerService extends AbstractService implements
} }
} }
} }
response.setAppCollectorsMap(liveAppCollectorsMap); response.setAppCollectors(liveAppCollectorsMap);
} }
private void updateAppCollectorsMap(NodeHeartbeatRequest request) { private void updateAppCollectorsMap(NodeHeartbeatRequest request) {
Map<ApplicationId, String> registeredCollectorsMap = Map<ApplicationId, AppCollectorData> registeringCollectorsMap =
request.getRegisteredCollectors(); request.getRegisteringCollectors();
if (registeredCollectorsMap != null if (registeringCollectorsMap != null
&& !registeredCollectorsMap.isEmpty()) { && !registeringCollectorsMap.isEmpty()) {
Map<ApplicationId, RMApp> rmApps = rmContext.getRMApps(); Map<ApplicationId, RMApp> rmApps = rmContext.getRMApps();
for (Map.Entry<ApplicationId, String> entry: for (Map.Entry<ApplicationId, AppCollectorData> entry:
registeredCollectorsMap.entrySet()) { registeringCollectorsMap.entrySet()) {
ApplicationId appId = entry.getKey(); ApplicationId appId = entry.getKey();
String collectorAddr = entry.getValue(); AppCollectorData collectorData = entry.getValue();
if (collectorAddr != null && !collectorAddr.isEmpty()) { if (collectorData != null) {
if (!collectorData.isStamped()) {
// Stamp the collector if we have not done so
collectorData.setRMIdentifier(
ResourceManager.getClusterTimeStamp());
collectorData.setVersion(
timelineCollectorVersion.getAndIncrement());
}
RMApp rmApp = rmApps.get(appId); RMApp rmApp = rmApps.get(appId);
if (rmApp == null) { if (rmApp == null) {
LOG.warn("Cannot update collector info because application ID: " + LOG.warn("Cannot update collector info because application ID: " +
appId + " is not found in RMContext!"); appId + " is not found in RMContext!");
} else { } else {
String previousCollectorAddr = rmApp.getCollectorAddr(); AppCollectorData previousCollectorData = rmApp.getCollectorData();
if (previousCollectorAddr == null if (AppCollectorData.happensBefore(previousCollectorData,
|| !previousCollectorAddr.equals(collectorAddr)) { collectorData)) {
// sending collector update event. // Sending collector update event.
RMAppCollectorUpdateEvent event = // Note: RM has to store the newly received collector data
new RMAppCollectorUpdateEvent(appId, collectorAddr); // synchronously. Otherwise, the RM may send out stale collector
rmContext.getDispatcher().getEventHandler().handle(event); // data before this update is done, and the RM then crashes, the
// newly updated collector data will get lost.
LOG.info("Update collector information for application " + appId
+ " with new address: " + collectorData.getCollectorAddr());
((RMAppImpl) rmApp).setCollectorData(collectorData);
} }
} }
} }

View File

@ -22,6 +22,8 @@ import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -36,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@ -177,27 +180,16 @@ public interface RMApp extends EventHandler<RMAppEvent> {
String getTrackingUrl(); String getTrackingUrl();
/** /**
* The collector address for the application. It should be used only if the * The timeline collector information for the application. It should be used
* timeline service v.2 is enabled. * only if the timeline service v.2 is enabled.
* *
* @return the address for the application's collector, or null if the * @return the data for the application's collector, including collector
* timeline service v.2 is not enabled. * address, collector ID. Return null if the timeline service v.2 is not
* enabled.
*/ */
String getCollectorAddr(); @InterfaceAudience.Private
@InterfaceStability.Unstable
/** AppCollectorData getCollectorData();
* Set collector address for the application. It should be used only if the
* timeline service v.2 is enabled.
*
* @param collectorAddr the address of collector
*/
void setCollectorAddr(String collectorAddr);
/**
* Remove collector address when application is finished or killed. It should
* be used only if the timeline service v.2 is enabled.
*/
void removeCollectorAddr();
/** /**
* The original tracking url for the application master. * The original tracking url for the application master.

View File

@ -31,9 +31,6 @@ public enum RMAppEventType {
// Source: Scheduler // Source: Scheduler
APP_ACCEPTED, APP_ACCEPTED,
// TODO add source later
COLLECTOR_UPDATE,
// Source: RMAppAttempt // Source: RMAppAttempt
ATTEMPT_REGISTERED, ATTEMPT_REGISTERED,
ATTEMPT_UNREGISTERED, ATTEMPT_UNREGISTERED,

View File

@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier; import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
@ -157,7 +158,7 @@ public class RMAppImpl implements RMApp, Recoverable {
private long storedFinishTime = 0; private long storedFinishTime = 0;
private int firstAttemptIdInStateStore = 1; private int firstAttemptIdInStateStore = 1;
private int nextAttemptId = 1; private int nextAttemptId = 1;
private String collectorAddr; private AppCollectorData collectorData;
// This field isn't protected by readlock now. // This field isn't protected by readlock now.
private volatile RMAppAttempt currentAttempt; private volatile RMAppAttempt currentAttempt;
private String queue; private String queue;
@ -205,8 +206,6 @@ public class RMAppImpl implements RMApp, Recoverable {
// Transitions from NEW state // Transitions from NEW state
.addTransition(RMAppState.NEW, RMAppState.NEW, .addTransition(RMAppState.NEW, RMAppState.NEW,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.NEW, RMAppState.NEW,
RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
.addTransition(RMAppState.NEW, RMAppState.NEW_SAVING, .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
RMAppEventType.START, new RMAppNewlySavingTransition()) RMAppEventType.START, new RMAppNewlySavingTransition())
.addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED, .addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
@ -223,8 +222,6 @@ public class RMAppImpl implements RMApp, Recoverable {
// Transitions from NEW_SAVING state // Transitions from NEW_SAVING state
.addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING, .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
.addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED, .addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition()) RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())
.addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING, .addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING,
@ -243,8 +240,6 @@ public class RMAppImpl implements RMApp, Recoverable {
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED, .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
RMAppEventType.MOVE, new RMAppMoveTransition()) RMAppEventType.MOVE, new RMAppMoveTransition())
.addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
.addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING, .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
RMAppEventType.APP_REJECTED, RMAppEventType.APP_REJECTED,
new FinalSavingTransition( new FinalSavingTransition(
@ -261,8 +256,6 @@ public class RMAppImpl implements RMApp, Recoverable {
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
RMAppEventType.MOVE, new RMAppMoveTransition()) RMAppEventType.MOVE, new RMAppMoveTransition())
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
.addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING, .addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
RMAppEventType.ATTEMPT_REGISTERED, new RMAppStateUpdateTransition( RMAppEventType.ATTEMPT_REGISTERED, new RMAppStateUpdateTransition(
YarnApplicationState.RUNNING)) YarnApplicationState.RUNNING))
@ -290,8 +283,6 @@ public class RMAppImpl implements RMApp, Recoverable {
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING, .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
RMAppEventType.MOVE, new RMAppMoveTransition()) RMAppEventType.MOVE, new RMAppMoveTransition())
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
.addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING, .addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING,
RMAppEventType.ATTEMPT_UNREGISTERED, RMAppEventType.ATTEMPT_UNREGISTERED,
new FinalSavingTransition( new FinalSavingTransition(
@ -321,8 +312,6 @@ public class RMAppImpl implements RMApp, Recoverable {
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
RMAppEventType.APP_RUNNING_ON_NODE, RMAppEventType.APP_RUNNING_ON_NODE,
new AppRunningOnNodeTransition()) new AppRunningOnNodeTransition())
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
// ignorable transitions // ignorable transitions
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL, EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL,
@ -334,8 +323,6 @@ public class RMAppImpl implements RMApp, Recoverable {
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING, .addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
RMAppEventType.APP_RUNNING_ON_NODE, RMAppEventType.APP_RUNNING_ON_NODE,
new AppRunningOnNodeTransition()) new AppRunningOnNodeTransition())
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
// ignorable transitions // ignorable transitions
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING, .addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
EnumSet.of(RMAppEventType.NODE_UPDATE, EnumSet.of(RMAppEventType.NODE_UPDATE,
@ -347,8 +334,6 @@ public class RMAppImpl implements RMApp, Recoverable {
.addTransition(RMAppState.KILLING, RMAppState.KILLING, .addTransition(RMAppState.KILLING, RMAppState.KILLING,
RMAppEventType.APP_RUNNING_ON_NODE, RMAppEventType.APP_RUNNING_ON_NODE,
new AppRunningOnNodeTransition()) new AppRunningOnNodeTransition())
.addTransition(RMAppState.KILLING, RMAppState.KILLING,
RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
.addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING, .addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING,
RMAppEventType.ATTEMPT_KILLED, RMAppEventType.ATTEMPT_KILLED,
new FinalSavingTransition( new FinalSavingTransition(
@ -620,18 +605,16 @@ public class RMAppImpl implements RMApp, Recoverable {
} }
@Override @Override
public String getCollectorAddr() { public AppCollectorData getCollectorData() {
return this.collectorAddr; return this.collectorData;
} }
@Override public void setCollectorData(AppCollectorData incomingData) {
public void setCollectorAddr(String collectorAddress) { this.collectorData = incomingData;
this.collectorAddr = collectorAddress;
} }
@Override public void removeCollectorData() {
public void removeCollectorAddr() { this.collectorData = null;
this.collectorAddr = null;
} }
@Override @Override
@ -979,24 +962,6 @@ public class RMAppImpl implements RMApp, Recoverable {
}; };
} }
private static final class RMAppCollectorUpdateTransition
extends RMAppTransition {
public void transition(RMAppImpl app, RMAppEvent event) {
if (YarnConfiguration.timelineServiceV2Enabled(app.conf)) {
LOG.info("Updating collector info for app: " + app.getApplicationId());
RMAppCollectorUpdateEvent appCollectorUpdateEvent =
(RMAppCollectorUpdateEvent) event;
// Update collector address
app.setCollectorAddr(appCollectorUpdateEvent.getAppCollectorAddr());
// TODO persistent to RMStateStore for recover
// Save to RMStateStore
}
};
}
private static final class RMAppNodeUpdateTransition extends RMAppTransition { private static final class RMAppNodeUpdateTransition extends RMAppTransition {
public void transition(RMAppImpl app, RMAppEvent event) { public void transition(RMAppImpl app, RMAppEvent event) {
RMAppNodeUpdateEvent nodeUpdateEvent = (RMAppNodeUpdateEvent) event; RMAppNodeUpdateEvent nodeUpdateEvent = (RMAppNodeUpdateEvent) event;

View File

@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus;
@ -70,6 +71,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@ -1007,13 +1009,23 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
RMNodeImpl node2 = RMNodeImpl node2 =
(RMNodeImpl) rm.getRMContext().getRMNodes().get(nm2.getNodeId()); (RMNodeImpl) rm.getRMContext().getRMNodes().get(nm2.getNodeId());
RMApp app1 = rm.submitApp(1024); RMAppImpl app1 = (RMAppImpl) rm.submitApp(1024);
String collectorAddr1 = "1.2.3.4:5"; String collectorAddr1 = "1.2.3.4:5";
app1.setCollectorAddr(collectorAddr1); app1.setCollectorData(AppCollectorData.newInstance(
app1.getApplicationId(), collectorAddr1));
String collectorAddr2 = "5.4.3.2:1"; String collectorAddr2 = "5.4.3.2:1";
RMApp app2 = rm.submitApp(1024); RMAppImpl app2 = (RMAppImpl) rm.submitApp(1024);
app2.setCollectorAddr(collectorAddr2); app2.setCollectorData(AppCollectorData.newInstance(
app2.getApplicationId(), collectorAddr2));
String collectorAddr3 = "5.4.3.2:2";
app2.setCollectorData(AppCollectorData.newInstance(
app2.getApplicationId(), collectorAddr3, 0, 1));
String collectorAddr4 = "5.4.3.2:3";
app2.setCollectorData(AppCollectorData.newInstance(
app2.getApplicationId(), collectorAddr4, 1, 0));
// Create a running container for app1 running on nm1 // Create a running container for app1 running on nm1
ContainerId runningContainerId1 = BuilderUtils.newContainerId( ContainerId runningContainerId1 = BuilderUtils.newContainerId(
@ -1051,14 +1063,18 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
Assert.assertEquals(app2.getApplicationId(), node2.getRunningApps().get(0)); Assert.assertEquals(app2.getApplicationId(), node2.getRunningApps().get(0));
nodeHeartbeat1 = nm1.nodeHeartbeat(true); nodeHeartbeat1 = nm1.nodeHeartbeat(true);
Map<ApplicationId, String> map1 = nodeHeartbeat1.getAppCollectorsMap(); Map<ApplicationId, AppCollectorData> map1
= nodeHeartbeat1.getAppCollectors();
Assert.assertEquals(1, map1.size()); Assert.assertEquals(1, map1.size());
Assert.assertEquals(collectorAddr1, map1.get(app1.getApplicationId())); Assert.assertEquals(collectorAddr1,
map1.get(app1.getApplicationId()).getCollectorAddr());
nodeHeartbeat2 = nm2.nodeHeartbeat(true); nodeHeartbeat2 = nm2.nodeHeartbeat(true);
Map<ApplicationId, String> map2 = nodeHeartbeat2.getAppCollectorsMap(); Map<ApplicationId, AppCollectorData> map2
= nodeHeartbeat2.getAppCollectors();
Assert.assertEquals(1, map2.size()); Assert.assertEquals(1, map2.size());
Assert.assertEquals(collectorAddr2, map2.get(app2.getApplicationId())); Assert.assertEquals(collectorAddr4,
map2.get(app2.getApplicationId()).getCollectorAddr());
} }
private void checkRebootedNMCount(MockRM rm2, int count) private void checkRebootedNMCount(MockRM rm2, int count)

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
@ -95,15 +96,7 @@ public abstract class MockAsm extends MockApps {
throw new UnsupportedOperationException("Not supported yet."); throw new UnsupportedOperationException("Not supported yet.");
} }
@Override @Override
public String getCollectorAddr() { public AppCollectorData getCollectorData() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public void setCollectorAddr(String collectorAddr) {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public void removeCollectorAddr() {
throw new UnsupportedOperationException("Not supported yet."); throw new UnsupportedOperationException("Not supported yet.");
} }
@Override @Override

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@ -302,17 +303,9 @@ public class MockRMApp implements RMApp {
throw new UnsupportedOperationException("Not supported yet."); throw new UnsupportedOperationException("Not supported yet.");
} }
public String getCollectorAddr() { @Override
public AppCollectorData getCollectorData() {
throw new UnsupportedOperationException("Not supported yet."); throw new UnsupportedOperationException("Not supported yet.");
} }
@Override
public void removeCollectorAddr() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public void setCollectorAddr(String collectorAddr) {
throw new UnsupportedOperationException("Not supported yet.");
}
} }