YARN-10846. Add dispatcher metrics to NM. (#4687)

This commit is contained in:
slfan1989 2023-04-13 00:53:20 +08:00 committed by GitHub
parent dd6d0ac510
commit 06f9bdffa6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 261 additions and 22 deletions

View File

@ -3070,6 +3070,10 @@ public class YarnConfiguration extends Configuration {
+ "amrmproxy.ha.enable";
public static final boolean DEFAULT_AMRM_PROXY_HA_ENABLED = false;
// Enable NM Dispatcher Metric default False.
public static final String NM_DISPATCHER_METRIC_ENABLED = NM_PREFIX + "dispatcher.metric.enable";
public static final boolean DEFAULT_NM_DISPATCHER_METRIC_ENABLED = false;
/**
* Default platform-agnostic CLASSPATH for YARN applications. A
* comma-separated list of CLASSPATH entries. The parameter expansion marker

View File

@ -57,10 +57,8 @@ public class GenericEventTypeMetrics<T extends Enum<T>>
//Initialize enum
for (final T type : enums) {
String eventCountMetricsName =
type.toString() + "_" + "event_count";
String processingTimeMetricsName =
type.toString() + "_" + "processing_time";
String eventCountMetricsName = type + "_" + "event_count";
String processingTimeMetricsName = type + "_" + "processing_time";
eventCountMetrics.put(type, this.registry.
newGauge(eventCountMetricsName, eventCountMetricsName, 0L));
processingTimeMetrics.put(type, this.registry.

View File

@ -5065,6 +5065,16 @@
</description>
</property>
<property>
<name>yarn.nodemanager.dispatcher.metric.enable</name>
<value>false</value>
<description>
Yarn NodeManager enables Dispatcher Metric.
if true, will enable dispatcher metric; if false, will not enable dispatcher metric;
Default is false.
</description>
</property>
<property>
<name>yarn.router.interceptor.user-thread-pool.minimum-pool-size</name>
<value>5</value>

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.yarn.metrics.GenericEventTypeMetrics;
import static org.apache.hadoop.metrics2.lib.Interns.info;
public final class GenericEventTypeMetricsManager {
private GenericEventTypeMetricsManager() {
// nothing to do
}
// Construct a GenericEventTypeMetrics for dispatcher
@SuppressWarnings("unchecked")
public static <T extends Enum<T>> GenericEventTypeMetrics
create(String dispatcherName, Class<T> eventTypeClass) {
return new GenericEventTypeMetrics.EventTypeMetricsBuilder<T>()
.setMs(DefaultMetricsSystem.instance())
.setInfo(info("GenericEventTypeMetrics for " + eventTypeClass.getName(),
"Metrics for " + dispatcherName))
.setEnumClass(eventTypeClass)
.setEnums(eventTypeClass.getEnumConstants())
.build().registerMetrics();
}
}

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.metrics.GenericEventTypeMetrics;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
@ -144,8 +145,10 @@ public class NodeManager extends CompositeService
private AtomicBoolean isStopping = new AtomicBoolean(false);
private boolean rmWorkPreservingRestartEnabled;
private boolean shouldExitOnShutdownEvent = false;
private boolean nmDispatherMetricEnabled;
private NMLogAggregationStatusTracker nmLogAggregationStatusTracker;
/**
* Default Container State transition listener.
*/
@ -366,6 +369,10 @@ public class NodeManager extends CompositeService
.RM_WORK_PRESERVING_RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED);
nmDispatherMetricEnabled = conf.getBoolean(
YarnConfiguration.NM_DISPATCHER_METRIC_ENABLED,
YarnConfiguration.DEFAULT_NM_DISPATCHER_METRIC_ENABLED);
try {
initAndStartRecoveryStore(conf);
} catch (IOException e) {
@ -1006,8 +1013,17 @@ public class NodeManager extends CompositeService
/**
* Unit test friendly.
*/
@SuppressWarnings("unchecked")
protected AsyncDispatcher createNMDispatcher() {
return new AsyncDispatcher("NM Event dispatcher");
dispatcher = new AsyncDispatcher("NM Event dispatcher");
if (nmDispatherMetricEnabled) {
GenericEventTypeMetrics<ContainerManagerEventType> eventTypeMetrics =
GenericEventTypeMetricsManager.create(dispatcher.getName(),
ContainerManagerEventType.class);
dispatcher.addMetrics(eventTypeMetrics, eventTypeMetrics.getEnumClass());
LOG.info("NM Event dispatcher Metric Initialization Completed.");
}
return dispatcher;
}
//For testing
@ -1052,4 +1068,10 @@ public class NodeManager extends CompositeService
Context ctxt) {
return new NMLogAggregationStatusTracker(ctxt);
}
@VisibleForTesting
@Private
public AsyncDispatcher getDispatcher() {
return dispatcher;
}
}

View File

@ -24,7 +24,9 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesResponse;
import org.apache.hadoop.yarn.api.records.LocalizationStatus;
import org.apache.hadoop.yarn.metrics.GenericEventTypeMetrics;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.UpdateContainerTokenEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerTokenUpdatedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent;
import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator;
@ -105,6 +107,7 @@ import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.GenericEventTypeMetricsManager;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent;
import org.apache.hadoop.yarn.server.nodemanager.CMgrUpdateContainersEvent;
import org.apache.hadoop.yarn.server.nodemanager.CMgrSignalContainersEvent;
@ -120,6 +123,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerInitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent;
@ -217,7 +221,7 @@ public class ContainerManagerImpl extends CompositeService implements
protected final NodeStatusUpdater nodeStatusUpdater;
protected LocalDirsHandlerService dirsHandler;
protected final AsyncDispatcher dispatcher;
private AsyncDispatcher dispatcher;
private final DeletionService deletionService;
private LogHandler logHandler;
@ -233,6 +237,7 @@ public class ContainerManagerImpl extends CompositeService implements
// NM metrics publisher is set only if the timeline service v.2 is enabled
private NMTimelinePublisher nmMetricsPublisher;
private boolean timelineServiceV2Enabled;
private boolean nmDispatherMetricEnabled;
public ContainerManagerImpl(Context context, ContainerExecutor exec,
DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
@ -242,7 +247,7 @@ public class ContainerManagerImpl extends CompositeService implements
this.dirsHandler = dirsHandler;
// ContainerManager level dispatcher.
dispatcher = new AsyncDispatcher("NM ContainerManager dispatcher");
dispatcher = createContainerManagerDispatcher();
this.deletionService = deletionContext;
this.metrics = metrics;
@ -324,10 +329,67 @@ public class ContainerManagerImpl extends CompositeService implements
YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS) +
SHUTDOWN_CLEANUP_SLOP_MS;
nmDispatherMetricEnabled = conf.getBoolean(
YarnConfiguration.NM_DISPATCHER_METRIC_ENABLED,
YarnConfiguration.DEFAULT_NM_DISPATCHER_METRIC_ENABLED);
super.serviceInit(conf);
recover();
}
@SuppressWarnings("unchecked")
protected AsyncDispatcher createContainerManagerDispatcher() {
dispatcher = new AsyncDispatcher("NM ContainerManager dispatcher");
if (!nmDispatherMetricEnabled) {
return dispatcher;
}
GenericEventTypeMetrics<ContainerEventType> containerEventTypeMetrics =
GenericEventTypeMetricsManager.create(dispatcher.getName(), ContainerEventType.class);
dispatcher.addMetrics(containerEventTypeMetrics, containerEventTypeMetrics.getEnumClass());
GenericEventTypeMetrics<LocalizationEventType> localizationEventTypeMetrics =
GenericEventTypeMetricsManager.create(dispatcher.getName(), LocalizationEventType.class);
dispatcher.addMetrics(localizationEventTypeMetrics,
localizationEventTypeMetrics.getEnumClass());
GenericEventTypeMetrics<ApplicationEventType> applicationEventTypeMetrics =
GenericEventTypeMetricsManager.create(dispatcher.getName(), ApplicationEventType.class);
dispatcher.addMetrics(applicationEventTypeMetrics,
applicationEventTypeMetrics.getEnumClass());
GenericEventTypeMetrics<ContainersLauncherEventType> containersLauncherEventTypeMetrics =
GenericEventTypeMetricsManager.create(dispatcher.getName(),
ContainersLauncherEventType.class);
dispatcher.addMetrics(containersLauncherEventTypeMetrics,
containersLauncherEventTypeMetrics.getEnumClass());
GenericEventTypeMetrics<ContainerSchedulerEventType> containerSchedulerEventTypeMetrics =
GenericEventTypeMetricsManager.create(dispatcher.getName(),
ContainerSchedulerEventType.class);
dispatcher.addMetrics(containerSchedulerEventTypeMetrics,
containerSchedulerEventTypeMetrics.getEnumClass());
GenericEventTypeMetrics<ContainersMonitorEventType> containersMonitorEventTypeMetrics =
GenericEventTypeMetricsManager.create(dispatcher.getName(),
ContainersMonitorEventType.class);
dispatcher.addMetrics(containersMonitorEventTypeMetrics,
containersMonitorEventTypeMetrics.getEnumClass());
GenericEventTypeMetrics<AuxServicesEventType> auxServicesEventTypeTypeMetrics =
GenericEventTypeMetricsManager.create(dispatcher.getName(), AuxServicesEventType.class);
dispatcher.addMetrics(auxServicesEventTypeTypeMetrics,
auxServicesEventTypeTypeMetrics.getEnumClass());
GenericEventTypeMetrics<LocalizerEventType> localizerEventTypeMetrics =
GenericEventTypeMetricsManager.create(dispatcher.getName(), LocalizerEventType.class);
dispatcher.addMetrics(localizerEventTypeMetrics, localizerEventTypeMetrics.getEnumClass());
LOG.info("NM ContainerManager dispatcher Metric Initialization Completed.");
return dispatcher;
}
protected void createAMRMProxyService(Configuration conf) {
this.amrmProxyEnabled =
conf.getBoolean(YarnConfiguration.AMRM_PROXY_ENABLED,
@ -2034,4 +2096,8 @@ public class ContainerManagerImpl extends CompositeService implements
public ResourceLocalizationService getResourceLocalizationService() {
return rsrcLocalizationSrvc;
}
public AsyncDispatcher getDispatcher() {
return dispatcher;
}
}

View File

@ -70,7 +70,7 @@ public class DummyContainerManager extends ContainerManagerImpl {
NodeManagerMetrics metrics, LocalDirsHandlerService dirsHandler) {
super(context, exec, deletionContext, nodeStatusUpdater, metrics,
dirsHandler);
dispatcher.disableExitOnDispatchException();
getDispatcher().disableExitOnDispatchException();
}
@Override
@ -78,7 +78,7 @@ public class DummyContainerManager extends ContainerManagerImpl {
protected ResourceLocalizationService createResourceLocalizationService(
ContainerExecutor exec, DeletionService deletionContext, Context context,
NodeManagerMetrics metrics) {
return new ResourceLocalizationService(super.dispatcher, exec,
return new ResourceLocalizationService(getDispatcher(), exec,
deletionContext, super.dirsHandler, context, metrics) {
@Override
public void handle(LocalizationEvent event) {
@ -148,7 +148,7 @@ public class DummyContainerManager extends ContainerManagerImpl {
@SuppressWarnings("unchecked")
protected ContainersLauncher createContainersLauncher(Context context,
ContainerExecutor exec) {
return new ContainersLauncher(context, super.dispatcher, exec,
return new ContainersLauncher(context, getDispatcher(), exec,
super.dirsHandler, this) {
@Override
public void handle(ContainersLauncherEvent event) {
@ -156,12 +156,12 @@ public class DummyContainerManager extends ContainerManagerImpl {
ContainerId containerId = container.getContainerId();
switch (event.getType()) {
case LAUNCH_CONTAINER:
dispatcher.getEventHandler().handle(
getDispatcher().getEventHandler().handle(
new ContainerEvent(containerId,
ContainerEventType.CONTAINER_LAUNCHED));
break;
case CLEANUP_CONTAINER:
dispatcher.getEventHandler().handle(
getDispatcher().getEventHandler().handle(
new ContainerExitEvent(containerId,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST, 0,
"Container exited with exit code 0."));

View File

@ -135,7 +135,7 @@ public class BaseContainerSchedulerTest extends BaseContainerManagerTest {
@Override
protected ContainersMonitor createContainersMonitor(
ContainerExecutor exec) {
return new ContainersMonitorImpl(exec, dispatcher, this.context) {
return new ContainersMonitorImpl(exec, getDispatcher(), this.context) {
// Define resources available for containers to be executed.
@Override
public long getPmemAllocatedForContainers() {

View File

@ -797,7 +797,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
}
@Override
protected ContainerScheduler createContainerScheduler(Context context) {
return new ContainerScheduler(context, dispatcher, metrics){
return new ContainerScheduler(context, getDispatcher(), metrics){
@Override
public ContainersMonitor getContainersMonitor() {
return new ContainersMonitorImpl(null, null, null) {
@ -1001,7 +1001,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
return null;
}
};
containerManager.dispatcher.disableExitOnDispatchException();
containerManager.getDispatcher().disableExitOnDispatchException();
return containerManager;
}

View File

@ -17,11 +17,24 @@
*/
package org.apache.hadoop.yarn.server.nodemanager.metrics;
import org.apache.hadoop.metrics2.MetricsInfo;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.source.JvmMetrics;
import static org.apache.hadoop.metrics2.lib.Interns.info;
import static org.apache.hadoop.test.MetricsAsserts.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.metrics.GenericEventTypeMetrics;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.util.Records;
import org.junit.After;
@ -37,6 +50,7 @@ public class TestNodeManagerMetrics {
@Before
public void setup() {
DefaultMetricsSystem.initialize("NodeManager");
DefaultMetricsSystem.setMiniClusterMode(true);
metrics = NodeManagerMetrics.create();
}
@ -140,4 +154,84 @@ public class TestNodeManagerMetrics {
assertGauge("NodeGpuUtilization", nodeGpuUtilization, rb);
assertGauge("ApplicationsRunning", applicationsRunning, rb);
}
private enum TestEnum {
TestEventType
}
private static class TestHandler implements EventHandler<Event> {
private long sleepTime = 1500;
TestHandler() {
}
TestHandler(long sleepTime) {
this.sleepTime = sleepTime;
}
@Override
public void handle(Event event) {
try {
// As long as 10000 events queued
Thread.sleep(this.sleepTime);
} catch (InterruptedException e) {
}
}
}
@Test
@SuppressWarnings("unchecked")
public void testNMDispatcherMetricsHistogram() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
NodeManager nm = new NodeManager();
nm.init(conf);
AsyncDispatcher dispatcher = nm.getDispatcher();
MetricsInfo metricsInfo = info(
"GenericEventTypeMetrics for " + TestEnum.class.getName(),
"Metrics for " + dispatcher.getName());
GenericEventTypeMetrics<TestEnum> genericEventTypeMetrics =
new GenericEventTypeMetrics.EventTypeMetricsBuilder()
.setMs(DefaultMetricsSystem.instance())
.setInfo(metricsInfo)
.setEnumClass(TestEnum.class)
.setEnums(TestEnum.class.getEnumConstants())
.build().registerMetrics();
dispatcher.addMetrics(genericEventTypeMetrics, genericEventTypeMetrics.getEnumClass());
dispatcher.init(conf);
// Register handler
dispatcher.register(TestEnum.class, new TestHandler());
dispatcher.start();
for (int i = 0; i < 3; ++i) {
Event event = mock(Event.class);
when(event.getType()).thenReturn(TestEnum.TestEventType);
dispatcher.getEventHandler().handle(event);
}
// Check event type count.
GenericTestUtils.waitFor(() -> genericEventTypeMetrics.
get(TestEnum.TestEventType) == 3, 1000, 10000);
String testEventTypeCountExpect =
Long.toString(genericEventTypeMetrics.get(TestEnum.TestEventType));
Assert.assertNotNull(testEventTypeCountExpect);
String testEventTypeCountMetric =
genericEventTypeMetrics.getRegistry().get("TestEventType_event_count").toString();
Assert.assertNotNull(testEventTypeCountMetric);
Assert.assertEquals(testEventTypeCountExpect, testEventTypeCountMetric);
String testEventTypeProcessingTimeExpect =
Long.toString(genericEventTypeMetrics.getTotalProcessingTime(TestEnum.TestEventType));
Assert.assertNotNull(testEventTypeProcessingTimeExpect);
String testEventTypeProcessingTimeMetric =
genericEventTypeMetrics.getRegistry().get("TestEventType_processing_time").toString();
Assert.assertNotNull(testEventTypeProcessingTimeMetric);
Assert.assertEquals(testEventTypeProcessingTimeExpect, testEventTypeProcessingTimeMetric);
}
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import org.apache.hadoop.metrics2.MetricsInfo;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.yarn.metrics.GenericEventTypeMetrics;
@ -31,10 +32,11 @@ public final class GenericEventTypeMetricsManager {
// Construct a GenericEventTypeMetrics for dispatcher
public static <T extends Enum<T>> GenericEventTypeMetrics
create(String dispatcherName, Class<T> eventTypeClass) {
MetricsInfo metricsInfo = info("GenericEventTypeMetrics for " + eventTypeClass.getName(),
"Metrics for " + dispatcherName);
return new GenericEventTypeMetrics.EventTypeMetricsBuilder<T>()
.setMs(DefaultMetricsSystem.instance())
.setInfo(info("GenericEventTypeMetrics for " + eventTypeClass.getName(),
"Metrics for " + dispatcherName))
.setInfo(metricsInfo)
.setEnumClass(eventTypeClass)
.setEnums(eventTypeClass.getEnumConstants())
.build().registerMetrics();

View File

@ -902,8 +902,8 @@ public class MiniYARNCluster extends CompositeService {
LOG.info("CustomAMRMProxyService is enabled. "
+ "All the AM->RM requests will be intercepted by the proxy");
AMRMProxyService amrmProxyService =
useRpc ? new AMRMProxyService(getContext(), dispatcher)
: new ShortCircuitedAMRMProxy(getContext(), dispatcher);
useRpc ? new AMRMProxyService(getContext(), getDispatcher())
: new ShortCircuitedAMRMProxy(getContext(), getDispatcher());
this.setAMRMProxyService(amrmProxyService);
addService(this.getAMRMProxyService());
} else {
@ -934,8 +934,8 @@ public class MiniYARNCluster extends CompositeService {
LOG.info("CustomAMRMProxyService is enabled. "
+ "All the AM->RM requests will be intercepted by the proxy");
AMRMProxyService amrmProxyService =
useRpc ? new AMRMProxyService(getContext(), dispatcher)
: new ShortCircuitedAMRMProxy(getContext(), dispatcher);
useRpc ? new AMRMProxyService(getContext(), getDispatcher())
: new ShortCircuitedAMRMProxy(getContext(), getDispatcher());
this.setAMRMProxyService(amrmProxyService);
addService(this.getAMRMProxyService());
} else {
@ -946,7 +946,7 @@ public class MiniYARNCluster extends CompositeService {
@Override
protected ContainersMonitor createContainersMonitor(ContainerExecutor
exec) {
return new ContainersMonitorImpl(exec, dispatcher, this.context) {
return new ContainersMonitorImpl(exec, getDispatcher(), this.context) {
@Override
public float getVmemRatio() {
return 2.0f;