YARN-9317. Avoid repeated YarnConfiguration#timelineServiceV2Enabled check. Contributed by Prabhu Joseph

This commit is contained in:
bibinchundatt 2019-02-23 08:03:28 +05:30
parent 7db50ffceb
commit 6d1cf7b395
8 changed files with 38 additions and 17 deletions

View File

@ -153,6 +153,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
private NMNodeLabelsHandler nodeLabelsHandler; private NMNodeLabelsHandler nodeLabelsHandler;
private final NodeLabelsProvider nodeLabelsProvider; private final NodeLabelsProvider nodeLabelsProvider;
private boolean timelineServiceV2Enabled;
public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
@ -240,6 +241,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
this.logAggregationEnabled = this.logAggregationEnabled =
conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED); YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
this.timelineServiceV2Enabled = YarnConfiguration.
timelineServiceV2Enabled(conf);
} }
@Override @Override
@ -1177,7 +1180,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
newResource.toString()); newResource.toString());
} }
} }
if (YarnConfiguration.timelineServiceV2Enabled(context.getConf())) { if (timelineServiceV2Enabled) {
updateTimelineCollectorData(response); updateTimelineCollectorData(response);
} }

View File

@ -227,6 +227,7 @@ public class ContainerManagerImpl extends CompositeService implements
// NM metrics publisher is set only if the timeline service v.2 is enabled // NM metrics publisher is set only if the timeline service v.2 is enabled
private NMTimelinePublisher nmMetricsPublisher; private NMTimelinePublisher nmMetricsPublisher;
private boolean timelineServiceV2Enabled;
public ContainerManagerImpl(Context context, ContainerExecutor exec, public ContainerManagerImpl(Context context, ContainerExecutor exec,
DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater, DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
@ -263,11 +264,13 @@ public class ContainerManagerImpl extends CompositeService implements
// initialize the metrics publisher if the timeline service v.2 is enabled // initialize the metrics publisher if the timeline service v.2 is enabled
// and the system publisher is enabled // and the system publisher is enabled
Configuration conf = context.getConf(); Configuration conf = context.getConf();
if (YarnConfiguration.timelineServiceV2Enabled(conf) && if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
YarnConfiguration.systemMetricsPublisherEnabled(conf)) { if (YarnConfiguration.systemMetricsPublisherEnabled(conf)) {
LOG.info("YARN system metrics publishing service is enabled"); LOG.info("YARN system metrics publishing service is enabled");
nmMetricsPublisher = createNMTimelinePublisher(context); nmMetricsPublisher = createNMTimelinePublisher(context);
context.setNMTimelinePublisher(nmMetricsPublisher); context.setNMTimelinePublisher(nmMetricsPublisher);
}
this.timelineServiceV2Enabled = true;
} }
this.containersMonitor = createContainersMonitor(exec); this.containersMonitor = createContainersMonitor(exec);
addService(this.containersMonitor); addService(this.containersMonitor);
@ -1184,7 +1187,7 @@ public class ContainerManagerImpl extends CompositeService implements
private FlowContext getFlowContext(ContainerLaunchContext launchContext, private FlowContext getFlowContext(ContainerLaunchContext launchContext,
ApplicationId applicationID) { ApplicationId applicationID) {
FlowContext flowContext = null; FlowContext flowContext = null;
if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) { if (timelineServiceV2Enabled) {
String flowName = launchContext.getEnvironment() String flowName = launchContext.getEnvironment()
.get(TimelineUtils.FLOW_NAME_TAG_PREFIX); .get(TimelineUtils.FLOW_NAME_TAG_PREFIX);
String flowVersion = launchContext.getEnvironment() String flowVersion = launchContext.getEnvironment()

View File

@ -96,6 +96,7 @@ public class ApplicationMasterService extends AbstractService implements
new ConcurrentHashMap<ApplicationAttemptId, AllocateResponseLock>(); new ConcurrentHashMap<ApplicationAttemptId, AllocateResponseLock>();
protected final RMContext rmContext; protected final RMContext rmContext;
private final AMSProcessingChain amsProcessingChain; private final AMSProcessingChain amsProcessingChain;
private boolean timelineServiceV2Enabled;
public ApplicationMasterService(RMContext rmContext, public ApplicationMasterService(RMContext rmContext,
YarnScheduler scheduler) { YarnScheduler scheduler) {
@ -213,6 +214,8 @@ public class ApplicationMasterService extends AbstractService implements
YarnConfiguration.RM_SCHEDULER_ADDRESS, YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
server.getListenerAddress()); server.getListenerAddress());
this.timelineServiceV2Enabled = YarnConfiguration.
timelineServiceV2Enabled(conf);
super.serviceStart(); super.serviceStart();
} }
@ -303,7 +306,7 @@ public class ApplicationMasterService extends AbstractService implements
rmContext.getRMApps().get(applicationAttemptId.getApplicationId()); rmContext.getRMApps().get(applicationAttemptId.getApplicationId());
// Remove collector address when app get finished. // Remove collector address when app get finished.
if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) { if (timelineServiceV2Enabled) {
((RMAppImpl) rmApp).removeCollectorData(); ((RMAppImpl) rmApp).removeCollectorData();
} }
// checking whether the app exits in RMStateStore at first not to throw // checking whether the app exits in RMStateStore at first not to throw

View File

@ -222,6 +222,7 @@ public class ClientRMService extends AbstractService implements
RMAppState.ACCEPTED, RMAppState.RUNNING); RMAppState.ACCEPTED, RMAppState.RUNNING);
private ResourceProfilesManager resourceProfilesManager; private ResourceProfilesManager resourceProfilesManager;
private boolean timelineServiceV2Enabled;
public ClientRMService(RMContext rmContext, YarnScheduler scheduler, public ClientRMService(RMContext rmContext, YarnScheduler scheduler,
RMAppManager rmAppManager, ApplicationACLsManager applicationACLsManager, RMAppManager rmAppManager, ApplicationACLsManager applicationACLsManager,
@ -292,6 +293,8 @@ public class ClientRMService extends AbstractService implements
YarnConfiguration.RM_ADDRESS, YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS,
server.getListenerAddress()); server.getListenerAddress());
this.timelineServiceV2Enabled = YarnConfiguration.
timelineServiceV2Enabled(conf);
super.serviceStart(); super.serviceStart();
} }
@ -571,7 +574,7 @@ public class ClientRMService extends AbstractService implements
throw RPCUtil.getRemoteException(ie); throw RPCUtil.getRemoteException(ie);
} }
if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) { if (timelineServiceV2Enabled) {
// Sanity check for flow run // Sanity check for flow run
String value = null; String value = null;
try { try {

View File

@ -107,12 +107,15 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
private RMContext rmContext; private RMContext rmContext;
private ResourceProfilesManager resourceProfilesManager; private ResourceProfilesManager resourceProfilesManager;
private boolean timelineServiceV2Enabled;
@Override @Override
public void init(ApplicationMasterServiceContext amsContext, public void init(ApplicationMasterServiceContext amsContext,
ApplicationMasterServiceProcessor nextProcessor) { ApplicationMasterServiceProcessor nextProcessor) {
this.rmContext = (RMContext)amsContext; this.rmContext = (RMContext)amsContext;
this.resourceProfilesManager = rmContext.getResourceProfilesManager(); this.resourceProfilesManager = rmContext.getResourceProfilesManager();
this.timelineServiceV2Enabled = YarnConfiguration.
timelineServiceV2Enabled(rmContext.getYarnConfiguration());
} }
@Override @Override
@ -318,8 +321,7 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
response.setNumClusterNodes(getScheduler().getNumClusterNodes()); response.setNumClusterNodes(getScheduler().getNumClusterNodes());
// add collector address for this application // add collector address for this application
if (YarnConfiguration.timelineServiceV2Enabled( if (timelineServiceV2Enabled) {
getRmContext().getYarnConfiguration())) {
CollectorInfo collectorInfo = app.getCollectorInfo(); CollectorInfo collectorInfo = app.getCollectorInfo();
if (collectorInfo != null) { if (collectorInfo != null) {
response.setCollectorInfo(collectorInfo); response.setCollectorInfo(collectorInfo);

View File

@ -94,6 +94,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
private final ApplicationACLsManager applicationACLsManager; private final ApplicationACLsManager applicationACLsManager;
private Configuration conf; private Configuration conf;
private YarnAuthorizationProvider authorizer; private YarnAuthorizationProvider authorizer;
private boolean timelineServiceV2Enabled;
public RMAppManager(RMContext context, public RMAppManager(RMContext context,
YarnScheduler scheduler, ApplicationMasterService masterService, YarnScheduler scheduler, ApplicationMasterService masterService,
@ -114,6 +115,8 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
this.maxCompletedAppsInStateStore = this.maxCompletedAppsInMemory; this.maxCompletedAppsInStateStore = this.maxCompletedAppsInMemory;
} }
this.authorizer = YarnAuthorizationProvider.getInstance(conf); this.authorizer = YarnAuthorizationProvider.getInstance(conf);
this.timelineServiceV2Enabled = YarnConfiguration.
timelineServiceV2Enabled(conf);
} }
/** /**
@ -451,7 +454,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
throw new YarnException(message); throw new YarnException(message);
} }
if (YarnConfiguration.timelineServiceV2Enabled(conf)) { if (timelineServiceV2Enabled) {
// Start timeline collector for the submitted app // Start timeline collector for the submitted app
application.startTimelineCollector(); application.startTimelineCollector();
} }

View File

@ -124,6 +124,7 @@ public class ResourceTrackerService extends AbstractService implements
private DynamicResourceConfiguration drConf; private DynamicResourceConfiguration drConf;
private final AtomicLong timelineCollectorVersion = new AtomicLong(0); private final AtomicLong timelineCollectorVersion = new AtomicLong(0);
private boolean timelineServiceV2Enabled;
public ResourceTrackerService(RMContext rmContext, public ResourceTrackerService(RMContext rmContext,
NodesListManager nodesListManager, NodesListManager nodesListManager,
@ -170,6 +171,8 @@ public class ResourceTrackerService extends AbstractService implements
minimumNodeManagerVersion = conf.get( minimumNodeManagerVersion = conf.get(
YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION, YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION,
YarnConfiguration.DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION); YarnConfiguration.DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION);
timelineServiceV2Enabled = YarnConfiguration.
timelineServiceV2Enabled(conf);
if (YarnConfiguration.areNodeLabelsEnabled(conf)) { if (YarnConfiguration.areNodeLabelsEnabled(conf)) {
isDistributedNodeLabelsConf = isDistributedNodeLabelsConf =
@ -578,9 +581,7 @@ public class ResourceTrackerService extends AbstractService implements
NodeAction.SHUTDOWN, message); NodeAction.SHUTDOWN, message);
} }
boolean timelineV2Enabled = if (timelineServiceV2Enabled) {
YarnConfiguration.timelineServiceV2Enabled(getConfig());
if (timelineV2Enabled) {
// Check & update collectors info from request. // Check & update collectors info from request.
updateAppCollectorsMap(request); updateAppCollectorsMap(request);
} }
@ -600,7 +601,7 @@ public class ResourceTrackerService extends AbstractService implements
nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials); nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials);
} }
if (timelineV2Enabled) { if (timelineServiceV2Enabled) {
// Return collectors' map that NM needs to know // Return collectors' map that NM needs to know
setAppCollectorsMapToResponse(rmNode.getRunningApps(), setAppCollectorsMapToResponse(rmNode.getRunningApps(),
nodeHeartBeatResponse); nodeHeartBeatResponse);

View File

@ -81,6 +81,7 @@ public class AMLauncher implements Runnable {
private final AMLauncherEventType eventType; private final AMLauncherEventType eventType;
private final RMContext rmContext; private final RMContext rmContext;
private final Container masterContainer; private final Container masterContainer;
private boolean timelineServiceV2Enabled;
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
private final EventHandler handler; private final EventHandler handler;
@ -93,6 +94,8 @@ public class AMLauncher implements Runnable {
this.rmContext = rmContext; this.rmContext = rmContext;
this.handler = rmContext.getDispatcher().getEventHandler(); this.handler = rmContext.getDispatcher().getEventHandler();
this.masterContainer = application.getMasterContainer(); this.masterContainer = application.getMasterContainer();
this.timelineServiceV2Enabled = YarnConfiguration.
timelineServiceV2Enabled(conf);
} }
private void connect() throws IOException { private void connect() throws IOException {
@ -239,7 +242,7 @@ public class AMLauncher implements Runnable {
} }
private void setFlowContext(ContainerLaunchContext container) { private void setFlowContext(ContainerLaunchContext container) {
if (YarnConfiguration.timelineServiceV2Enabled(conf)) { if (timelineServiceV2Enabled) {
Map<String, String> environment = container.getEnvironment(); Map<String, String> environment = container.getEnvironment();
ApplicationId applicationId = ApplicationId applicationId =
application.getAppAttemptId().getApplicationId(); application.getAppAttemptId().getApplicationId();