YARN-9317. Avoid repeated YarnConfiguration#timelineServiceV2Enabled check. Contributed by Prabhu Joseph
This commit is contained in:
parent
95fbbfed75
commit
ed13cf8406
|
@ -160,6 +160,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
private NodeLabelsProvider nodeLabelsProvider;
|
private NodeLabelsProvider nodeLabelsProvider;
|
||||||
private NodeAttributesProvider nodeAttributesProvider;
|
private NodeAttributesProvider nodeAttributesProvider;
|
||||||
private long tokenSequenceNo;
|
private long tokenSequenceNo;
|
||||||
|
private boolean timelineServiceV2Enabled;
|
||||||
|
|
||||||
public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
|
public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
|
||||||
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
|
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
|
||||||
|
@ -254,6 +255,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
this.logAggregationEnabled =
|
this.logAggregationEnabled =
|
||||||
conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
|
conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
|
||||||
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
|
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
|
||||||
|
this.timelineServiceV2Enabled = YarnConfiguration.
|
||||||
|
timelineServiceV2Enabled(conf);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1411,7 +1415,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
newResource.toString());
|
newResource.toString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (YarnConfiguration.timelineServiceV2Enabled(context.getConf())) {
|
if (timelineServiceV2Enabled) {
|
||||||
updateTimelineCollectorData(response);
|
updateTimelineCollectorData(response);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -230,6 +230,7 @@ public class ContainerManagerImpl extends CompositeService implements
|
||||||
|
|
||||||
// NM metrics publisher is set only if the timeline service v.2 is enabled
|
// NM metrics publisher is set only if the timeline service v.2 is enabled
|
||||||
private NMTimelinePublisher nmMetricsPublisher;
|
private NMTimelinePublisher nmMetricsPublisher;
|
||||||
|
private boolean timelineServiceV2Enabled;
|
||||||
|
|
||||||
public ContainerManagerImpl(Context context, ContainerExecutor exec,
|
public ContainerManagerImpl(Context context, ContainerExecutor exec,
|
||||||
DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
|
DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
|
||||||
|
@ -267,11 +268,13 @@ public class ContainerManagerImpl extends CompositeService implements
|
||||||
// initialize the metrics publisher if the timeline service v.2 is enabled
|
// initialize the metrics publisher if the timeline service v.2 is enabled
|
||||||
// and the system publisher is enabled
|
// and the system publisher is enabled
|
||||||
Configuration conf = context.getConf();
|
Configuration conf = context.getConf();
|
||||||
if (YarnConfiguration.timelineServiceV2Enabled(conf) &&
|
if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
|
||||||
YarnConfiguration.systemMetricsPublisherEnabled(conf)) {
|
if (YarnConfiguration.systemMetricsPublisherEnabled(conf)) {
|
||||||
LOG.info("YARN system metrics publishing service is enabled");
|
LOG.info("YARN system metrics publishing service is enabled");
|
||||||
nmMetricsPublisher = createNMTimelinePublisher(context);
|
nmMetricsPublisher = createNMTimelinePublisher(context);
|
||||||
context.setNMTimelinePublisher(nmMetricsPublisher);
|
context.setNMTimelinePublisher(nmMetricsPublisher);
|
||||||
|
}
|
||||||
|
this.timelineServiceV2Enabled = true;
|
||||||
}
|
}
|
||||||
this.containersMonitor = createContainersMonitor(exec);
|
this.containersMonitor = createContainersMonitor(exec);
|
||||||
addService(this.containersMonitor);
|
addService(this.containersMonitor);
|
||||||
|
@ -1191,7 +1194,7 @@ public class ContainerManagerImpl extends CompositeService implements
|
||||||
private FlowContext getFlowContext(ContainerLaunchContext launchContext,
|
private FlowContext getFlowContext(ContainerLaunchContext launchContext,
|
||||||
ApplicationId applicationID) {
|
ApplicationId applicationID) {
|
||||||
FlowContext flowContext = null;
|
FlowContext flowContext = null;
|
||||||
if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
|
if (timelineServiceV2Enabled) {
|
||||||
String flowName = launchContext.getEnvironment()
|
String flowName = launchContext.getEnvironment()
|
||||||
.get(TimelineUtils.FLOW_NAME_TAG_PREFIX);
|
.get(TimelineUtils.FLOW_NAME_TAG_PREFIX);
|
||||||
String flowVersion = launchContext.getEnvironment()
|
String flowVersion = launchContext.getEnvironment()
|
||||||
|
|
|
@ -95,6 +95,7 @@ public class ApplicationMasterService extends AbstractService implements
|
||||||
new ConcurrentHashMap<ApplicationAttemptId, AllocateResponseLock>();
|
new ConcurrentHashMap<ApplicationAttemptId, AllocateResponseLock>();
|
||||||
protected final RMContext rmContext;
|
protected final RMContext rmContext;
|
||||||
private final AMSProcessingChain amsProcessingChain;
|
private final AMSProcessingChain amsProcessingChain;
|
||||||
|
private boolean timelineServiceV2Enabled;
|
||||||
|
|
||||||
public ApplicationMasterService(RMContext rmContext,
|
public ApplicationMasterService(RMContext rmContext,
|
||||||
YarnScheduler scheduler) {
|
YarnScheduler scheduler) {
|
||||||
|
@ -212,6 +213,9 @@ public class ApplicationMasterService extends AbstractService implements
|
||||||
YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
|
||||||
server.getListenerAddress());
|
server.getListenerAddress());
|
||||||
|
this.timelineServiceV2Enabled = YarnConfiguration.
|
||||||
|
timelineServiceV2Enabled(conf);
|
||||||
|
|
||||||
super.serviceStart();
|
super.serviceStart();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -302,7 +306,7 @@ public class ApplicationMasterService extends AbstractService implements
|
||||||
rmContext.getRMApps().get(applicationAttemptId.getApplicationId());
|
rmContext.getRMApps().get(applicationAttemptId.getApplicationId());
|
||||||
|
|
||||||
// Remove collector address when app get finished.
|
// Remove collector address when app get finished.
|
||||||
if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
|
if (timelineServiceV2Enabled) {
|
||||||
((RMAppImpl) rmApp).removeCollectorData();
|
((RMAppImpl) rmApp).removeCollectorData();
|
||||||
}
|
}
|
||||||
// checking whether the app exits in RMStateStore at first not to throw
|
// checking whether the app exits in RMStateStore at first not to throw
|
||||||
|
|
|
@ -236,6 +236,7 @@ public class ClientRMService extends AbstractService implements
|
||||||
RMAppState.ACCEPTED, RMAppState.RUNNING);
|
RMAppState.ACCEPTED, RMAppState.RUNNING);
|
||||||
|
|
||||||
private ResourceProfilesManager resourceProfilesManager;
|
private ResourceProfilesManager resourceProfilesManager;
|
||||||
|
private boolean timelineServiceV2Enabled;
|
||||||
|
|
||||||
public ClientRMService(RMContext rmContext, YarnScheduler scheduler,
|
public ClientRMService(RMContext rmContext, YarnScheduler scheduler,
|
||||||
RMAppManager rmAppManager, ApplicationACLsManager applicationACLsManager,
|
RMAppManager rmAppManager, ApplicationACLsManager applicationACLsManager,
|
||||||
|
@ -306,6 +307,9 @@ public class ClientRMService extends AbstractService implements
|
||||||
YarnConfiguration.RM_ADDRESS,
|
YarnConfiguration.RM_ADDRESS,
|
||||||
YarnConfiguration.DEFAULT_RM_ADDRESS,
|
YarnConfiguration.DEFAULT_RM_ADDRESS,
|
||||||
server.getListenerAddress());
|
server.getListenerAddress());
|
||||||
|
this.timelineServiceV2Enabled = YarnConfiguration.
|
||||||
|
timelineServiceV2Enabled(conf);
|
||||||
|
|
||||||
super.serviceStart();
|
super.serviceStart();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -585,7 +589,7 @@ public class ClientRMService extends AbstractService implements
|
||||||
throw RPCUtil.getRemoteException(ie);
|
throw RPCUtil.getRemoteException(ie);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
|
if (timelineServiceV2Enabled) {
|
||||||
// Sanity check for flow run
|
// Sanity check for flow run
|
||||||
String value = null;
|
String value = null;
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -115,12 +115,15 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
|
||||||
|
|
||||||
private RMContext rmContext;
|
private RMContext rmContext;
|
||||||
private ResourceProfilesManager resourceProfilesManager;
|
private ResourceProfilesManager resourceProfilesManager;
|
||||||
|
private boolean timelineServiceV2Enabled;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(ApplicationMasterServiceContext amsContext,
|
public void init(ApplicationMasterServiceContext amsContext,
|
||||||
ApplicationMasterServiceProcessor nextProcessor) {
|
ApplicationMasterServiceProcessor nextProcessor) {
|
||||||
this.rmContext = (RMContext)amsContext;
|
this.rmContext = (RMContext)amsContext;
|
||||||
this.resourceProfilesManager = rmContext.getResourceProfilesManager();
|
this.resourceProfilesManager = rmContext.getResourceProfilesManager();
|
||||||
|
this.timelineServiceV2Enabled = YarnConfiguration.
|
||||||
|
timelineServiceV2Enabled(rmContext.getYarnConfiguration());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -326,8 +329,7 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
|
||||||
response.setNumClusterNodes(getScheduler().getNumClusterNodes());
|
response.setNumClusterNodes(getScheduler().getNumClusterNodes());
|
||||||
|
|
||||||
// add collector address for this application
|
// add collector address for this application
|
||||||
if (YarnConfiguration.timelineServiceV2Enabled(
|
if (timelineServiceV2Enabled) {
|
||||||
getRmContext().getYarnConfiguration())) {
|
|
||||||
CollectorInfo collectorInfo = app.getCollectorInfo();
|
CollectorInfo collectorInfo = app.getCollectorInfo();
|
||||||
if (collectorInfo != null) {
|
if (collectorInfo != null) {
|
||||||
response.setCollectorInfo(collectorInfo);
|
response.setCollectorInfo(collectorInfo);
|
||||||
|
|
|
@ -95,6 +95,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
|
||||||
private final ApplicationACLsManager applicationACLsManager;
|
private final ApplicationACLsManager applicationACLsManager;
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
private YarnAuthorizationProvider authorizer;
|
private YarnAuthorizationProvider authorizer;
|
||||||
|
private boolean timelineServiceV2Enabled;
|
||||||
|
|
||||||
public RMAppManager(RMContext context,
|
public RMAppManager(RMContext context,
|
||||||
YarnScheduler scheduler, ApplicationMasterService masterService,
|
YarnScheduler scheduler, ApplicationMasterService masterService,
|
||||||
|
@ -115,6 +116,8 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
|
||||||
this.maxCompletedAppsInStateStore = this.maxCompletedAppsInMemory;
|
this.maxCompletedAppsInStateStore = this.maxCompletedAppsInMemory;
|
||||||
}
|
}
|
||||||
this.authorizer = YarnAuthorizationProvider.getInstance(conf);
|
this.authorizer = YarnAuthorizationProvider.getInstance(conf);
|
||||||
|
this.timelineServiceV2Enabled = YarnConfiguration.
|
||||||
|
timelineServiceV2Enabled(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -493,7 +496,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
|
||||||
throw new YarnException(message);
|
throw new YarnException(message);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
|
if (timelineServiceV2Enabled) {
|
||||||
// Start timeline collector for the submitted app
|
// Start timeline collector for the submitted app
|
||||||
application.startTimelineCollector();
|
application.startTimelineCollector();
|
||||||
}
|
}
|
||||||
|
|
|
@ -128,6 +128,7 @@ public class ResourceTrackerService extends AbstractService implements
|
||||||
|
|
||||||
private final AtomicLong timelineCollectorVersion = new AtomicLong(0);
|
private final AtomicLong timelineCollectorVersion = new AtomicLong(0);
|
||||||
private boolean checkIpHostnameInRegistration;
|
private boolean checkIpHostnameInRegistration;
|
||||||
|
private boolean timelineServiceV2Enabled;
|
||||||
|
|
||||||
public ResourceTrackerService(RMContext rmContext,
|
public ResourceTrackerService(RMContext rmContext,
|
||||||
NodesListManager nodesListManager,
|
NodesListManager nodesListManager,
|
||||||
|
@ -177,6 +178,8 @@ public class ResourceTrackerService extends AbstractService implements
|
||||||
minimumNodeManagerVersion = conf.get(
|
minimumNodeManagerVersion = conf.get(
|
||||||
YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION,
|
YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION,
|
||||||
YarnConfiguration.DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION);
|
YarnConfiguration.DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION);
|
||||||
|
timelineServiceV2Enabled = YarnConfiguration.
|
||||||
|
timelineServiceV2Enabled(conf);
|
||||||
|
|
||||||
if (YarnConfiguration.areNodeLabelsEnabled(conf)) {
|
if (YarnConfiguration.areNodeLabelsEnabled(conf)) {
|
||||||
isDistributedNodeLabelsConf =
|
isDistributedNodeLabelsConf =
|
||||||
|
@ -621,9 +624,7 @@ public class ResourceTrackerService extends AbstractService implements
|
||||||
NodeAction.SHUTDOWN, message);
|
NodeAction.SHUTDOWN, message);
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean timelineV2Enabled =
|
if (timelineServiceV2Enabled) {
|
||||||
YarnConfiguration.timelineServiceV2Enabled(getConfig());
|
|
||||||
if (timelineV2Enabled) {
|
|
||||||
// Check & update collectors info from request.
|
// Check & update collectors info from request.
|
||||||
updateAppCollectorsMap(request);
|
updateAppCollectorsMap(request);
|
||||||
}
|
}
|
||||||
|
@ -639,7 +640,7 @@ public class ResourceTrackerService extends AbstractService implements
|
||||||
|
|
||||||
populateTokenSequenceNo(request, nodeHeartBeatResponse);
|
populateTokenSequenceNo(request, nodeHeartBeatResponse);
|
||||||
|
|
||||||
if (timelineV2Enabled) {
|
if (timelineServiceV2Enabled) {
|
||||||
// Return collectors' map that NM needs to know
|
// Return collectors' map that NM needs to know
|
||||||
setAppCollectorsMapToResponse(rmNode.getRunningApps(),
|
setAppCollectorsMapToResponse(rmNode.getRunningApps(),
|
||||||
nodeHeartBeatResponse);
|
nodeHeartBeatResponse);
|
||||||
|
|
|
@ -84,6 +84,7 @@ public class AMLauncher implements Runnable {
|
||||||
private final AMLauncherEventType eventType;
|
private final AMLauncherEventType eventType;
|
||||||
private final RMContext rmContext;
|
private final RMContext rmContext;
|
||||||
private final Container masterContainer;
|
private final Container masterContainer;
|
||||||
|
private boolean timelineServiceV2Enabled;
|
||||||
|
|
||||||
@SuppressWarnings("rawtypes")
|
@SuppressWarnings("rawtypes")
|
||||||
private final EventHandler handler;
|
private final EventHandler handler;
|
||||||
|
@ -96,6 +97,8 @@ public class AMLauncher implements Runnable {
|
||||||
this.rmContext = rmContext;
|
this.rmContext = rmContext;
|
||||||
this.handler = rmContext.getDispatcher().getEventHandler();
|
this.handler = rmContext.getDispatcher().getEventHandler();
|
||||||
this.masterContainer = application.getMasterContainer();
|
this.masterContainer = application.getMasterContainer();
|
||||||
|
this.timelineServiceV2Enabled = YarnConfiguration.
|
||||||
|
timelineServiceV2Enabled(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void connect() throws IOException {
|
private void connect() throws IOException {
|
||||||
|
@ -268,7 +271,7 @@ public class AMLauncher implements Runnable {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setFlowContext(ContainerLaunchContext container) {
|
private void setFlowContext(ContainerLaunchContext container) {
|
||||||
if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
|
if (timelineServiceV2Enabled) {
|
||||||
Map<String, String> environment = container.getEnvironment();
|
Map<String, String> environment = container.getEnvironment();
|
||||||
ApplicationId applicationId =
|
ApplicationId applicationId =
|
||||||
application.getAppAttemptId().getApplicationId();
|
application.getAppAttemptId().getApplicationId();
|
||||||
|
|
Loading…
Reference in New Issue