YARN-6102. Addendum patch for branch-2. (Rohith Sharma K S via asuresh)
(cherry picked from commit 0411c710d2
)
This commit is contained in:
parent
d754b24738
commit
dc65df2a1a
|
@ -493,7 +493,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
}
|
}
|
||||||
|
|
||||||
private RMTimelineCollectorManager createRMTimelineCollectorManager() {
|
private RMTimelineCollectorManager createRMTimelineCollectorManager() {
|
||||||
return new RMTimelineCollectorManager(rmContext);
|
return new RMTimelineCollectorManager(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected SystemMetricsPublisher createSystemMetricsPublisher() {
|
protected SystemMetricsPublisher createSystemMetricsPublisher() {
|
||||||
|
@ -504,7 +504,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
// we're dealing with the v.2.x publisher
|
// we're dealing with the v.2.x publisher
|
||||||
LOG.info("system metrics publisher with the timeline service V2 is " +
|
LOG.info("system metrics publisher with the timeline service V2 is " +
|
||||||
"configured");
|
"configured");
|
||||||
publisher = new TimelineServiceV2Publisher(rmContext);
|
publisher = new TimelineServiceV2Publisher(
|
||||||
|
rmContext.getRMTimelineCollectorManager());
|
||||||
} else {
|
} else {
|
||||||
// we're dealing with the v.1.x publisher
|
// we're dealing with the v.1.x publisher
|
||||||
LOG.info("system metrics publisher with the timeline service V1 is " +
|
LOG.info("system metrics publisher with the timeline service V1 is " +
|
||||||
|
|
|
@ -48,7 +48,6 @@ import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
|
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
|
||||||
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
|
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
|
||||||
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
|
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
||||||
|
@ -76,9 +75,10 @@ public class TimelineServiceV2Publisher extends AbstractSystemMetricsPublisher {
|
||||||
private RMTimelineCollectorManager rmTimelineCollectorManager;
|
private RMTimelineCollectorManager rmTimelineCollectorManager;
|
||||||
private boolean publishContainerEvents;
|
private boolean publishContainerEvents;
|
||||||
|
|
||||||
public TimelineServiceV2Publisher(RMContext rmContext) {
|
public TimelineServiceV2Publisher(
|
||||||
|
RMTimelineCollectorManager timelineCollectorManager) {
|
||||||
super("TimelineserviceV2Publisher");
|
super("TimelineserviceV2Publisher");
|
||||||
rmTimelineCollectorManager = rmContext.getRMTimelineCollectorManager();
|
rmTimelineCollectorManager = timelineCollectorManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -24,7 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector;
|
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
|
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
|
||||||
|
@ -41,16 +41,16 @@ public class RMTimelineCollectorManager extends TimelineCollectorManager {
|
||||||
private static final Log LOG =
|
private static final Log LOG =
|
||||||
LogFactory.getLog(RMTimelineCollectorManager.class);
|
LogFactory.getLog(RMTimelineCollectorManager.class);
|
||||||
|
|
||||||
private RMContext rmContext;
|
private ResourceManager rm;
|
||||||
|
|
||||||
public RMTimelineCollectorManager(RMContext rmContext) {
|
public RMTimelineCollectorManager(ResourceManager resourceManager) {
|
||||||
super(RMTimelineCollectorManager.class.getName());
|
super(RMTimelineCollectorManager.class.getName());
|
||||||
this.rmContext = rmContext;
|
this.rm = resourceManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doPostPut(ApplicationId appId, TimelineCollector collector) {
|
protected void doPostPut(ApplicationId appId, TimelineCollector collector) {
|
||||||
RMApp app = rmContext.getRMApps().get(appId);
|
RMApp app = rm.getRMContext().getRMApps().get(appId);
|
||||||
if (app == null) {
|
if (app == null) {
|
||||||
throw new YarnRuntimeException(
|
throw new YarnRuntimeException(
|
||||||
"Unable to get the timeline collector context info for a " +
|
"Unable to get the timeline collector context info for a " +
|
||||||
|
|
|
@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
|
||||||
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
|
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
|
||||||
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
|
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
|
||||||
|
@ -101,11 +102,12 @@ public class TestSystemMetricsPublisherForV2 {
|
||||||
FileContext.getLocalFSFileContext().delete(
|
FileContext.getLocalFSFileContext().delete(
|
||||||
new Path(testRootDir.getAbsolutePath()), true);
|
new Path(testRootDir.getAbsolutePath()), true);
|
||||||
}
|
}
|
||||||
|
ResourceManager rm = mock(ResourceManager.class);
|
||||||
RMContext rmContext = mock(RMContext.class);
|
RMContext rmContext = mock(RMContext.class);
|
||||||
rmAppsMapInContext = new ConcurrentHashMap<ApplicationId, RMApp>();
|
rmAppsMapInContext = new ConcurrentHashMap<ApplicationId, RMApp>();
|
||||||
when(rmContext.getRMApps()).thenReturn(rmAppsMapInContext);
|
when(rmContext.getRMApps()).thenReturn(rmAppsMapInContext);
|
||||||
rmTimelineCollectorManager = new RMTimelineCollectorManager(rmContext);
|
when(rm.getRMContext()).thenReturn(rmContext);
|
||||||
|
rmTimelineCollectorManager = new RMTimelineCollectorManager(rm);
|
||||||
when(rmContext.getRMTimelineCollectorManager()).thenReturn(
|
when(rmContext.getRMTimelineCollectorManager()).thenReturn(
|
||||||
rmTimelineCollectorManager);
|
rmTimelineCollectorManager);
|
||||||
|
|
||||||
|
@ -117,12 +119,12 @@ public class TestSystemMetricsPublisherForV2 {
|
||||||
|
|
||||||
dispatcher.init(conf);
|
dispatcher.init(conf);
|
||||||
dispatcher.start();
|
dispatcher.start();
|
||||||
metricsPublisher = new TimelineServiceV2Publisher(rmContext) {
|
metricsPublisher =
|
||||||
@Override
|
new TimelineServiceV2Publisher(rmTimelineCollectorManager) {
|
||||||
protected Dispatcher getDispatcher() {
|
@Override protected Dispatcher getDispatcher() {
|
||||||
return dispatcher;
|
return dispatcher;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
metricsPublisher.init(conf);
|
metricsPublisher.init(conf);
|
||||||
metricsPublisher.start();
|
metricsPublisher.start();
|
||||||
}
|
}
|
||||||
|
@ -166,7 +168,7 @@ public class TestSystemMetricsPublisherForV2 {
|
||||||
public void testSystemMetricPublisherInitialization() {
|
public void testSystemMetricPublisherInitialization() {
|
||||||
@SuppressWarnings("resource")
|
@SuppressWarnings("resource")
|
||||||
TimelineServiceV2Publisher publisher =
|
TimelineServiceV2Publisher publisher =
|
||||||
new TimelineServiceV2Publisher(mock(RMContext.class));
|
new TimelineServiceV2Publisher(mock(RMTimelineCollectorManager.class));
|
||||||
try {
|
try {
|
||||||
Configuration conf = getTimelineV2Conf();
|
Configuration conf = getTimelineV2Conf();
|
||||||
conf.setBoolean(YarnConfiguration.RM_PUBLISH_CONTAINER_EVENTS_ENABLED,
|
conf.setBoolean(YarnConfiguration.RM_PUBLISH_CONTAINER_EVENTS_ENABLED,
|
||||||
|
@ -178,7 +180,8 @@ public class TestSystemMetricsPublisherForV2 {
|
||||||
|
|
||||||
publisher.stop();
|
publisher.stop();
|
||||||
|
|
||||||
publisher = new TimelineServiceV2Publisher(mock(RMContext.class));
|
publisher = new TimelineServiceV2Publisher(
|
||||||
|
mock(RMTimelineCollectorManager.class));
|
||||||
conf = getTimelineV2Conf();
|
conf = getTimelineV2Conf();
|
||||||
publisher.init(conf);
|
publisher.init(conf);
|
||||||
assertTrue("Expected to have registered event handlers and set ready to "
|
assertTrue("Expected to have registered event handlers and set ready to "
|
||||||
|
|
Loading…
Reference in New Issue