From 11e8905d8daf129afb6fe2e5a0eca11bcb1719c8 Mon Sep 17 00:00:00 2001 From: Sangjin Lee Date: Fri, 24 Apr 2015 16:56:23 -0700 Subject: [PATCH] YARN-3390. Reuse TimelineCollectorManager for RM (Zhijie Shen via sjlee) (cherry picked from commit 58221188811e0f61d842dac89e1f4ad4fd8aa182) --- .../RMActiveServiceContext.java | 13 +- .../server/resourcemanager/RMAppManager.java | 15 +- .../server/resourcemanager/RMContext.java | 7 +- .../server/resourcemanager/RMContextImpl.java | 12 +- .../resourcemanager/ResourceManager.java | 14 +- .../resourcemanager/rmapp/RMAppImpl.java | 15 + .../timelineservice/RMTimelineCollector.java | 111 -------- .../RMTimelineCollectorManager.java | 75 +++++ .../TestTimelineServiceClientIntegration.java | 12 +- .../collector/AppLevelTimelineCollector.java | 2 +- .../NodeTimelineCollectorManager.java | 223 +++++++++++++++ .../PerNodeTimelineCollectorsAuxService.java | 15 +- .../collector/TimelineCollector.java | 2 +- .../collector/TimelineCollectorManager.java | 259 +++--------------- .../TimelineCollectorWebService.java | 23 +- ...va => TestNMTimelineCollectorManager.java} | 16 +- ...stPerNodeTimelineCollectorsAuxService.java | 24 +- 17 files changed, 431 insertions(+), 407 deletions(-) delete mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollector.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/{TestTimelineCollectorManager.java => TestNMTimelineCollectorManager.java} (91%) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java index c2f5f8efc70..fc053101904 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java @@ -49,7 +49,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRen import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; -import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollector; +import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; @@ -97,7 +97,7 @@ public class RMActiveServiceContext { private ApplicationMasterService applicationMasterService; private RMApplicationHistoryWriter rmApplicationHistoryWriter; private SystemMetricsPublisher systemMetricsPublisher; - private RMTimelineCollector timelineCollector; + private RMTimelineCollectorManager timelineCollectorManager; private RMNodeLabelsManager nodeLabelManager; private RMDelegatedNodeLabelsUpdater rmDelegatedNodeLabelsUpdater; @@ -381,14 +381,15 @@ public class RMActiveServiceContext { @Private @Unstable - public RMTimelineCollector getRMTimelineCollector() { - return timelineCollector; + public RMTimelineCollectorManager getRMTimelineCollectorManager() { + return timelineCollectorManager; } @Private @Unstable - public void setRMTimelineCollector(RMTimelineCollector timelineCollector) { - this.timelineCollector = timelineCollector; + public void setRMTimelineCollectorManager( + RMTimelineCollectorManager timelineCollectorManager) { + this.timelineCollectorManager = timelineCollectorManager; } @Private diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 279f0b340d2..3d2fdc43e9a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -385,12 +385,13 @@ public class RMAppManager implements EventHandler, } // Create RMApp - RMAppImpl application = new RMAppImpl(applicationId, rmContext, this.conf, - submissionContext.getApplicationName(), user, - submissionContext.getQueue(), submissionContext, this.scheduler, - this.masterService, submitTime, submissionContext.getApplicationType(), - submissionContext.getApplicationTags(), amReq); - + RMAppImpl application = + new RMAppImpl(applicationId, rmContext, this.conf, + submissionContext.getApplicationName(), user, + submissionContext.getQueue(), + submissionContext, this.scheduler, this.masterService, + submitTime, submissionContext.getApplicationType(), + submissionContext.getApplicationTags(), amReq); // Concurrent app submissions with same applicationId will fail here // Concurrent app submissions with different applicationIds will not // influence each other @@ -401,6 +402,8 @@ public class RMAppManager implements EventHandler, LOG.warn(message); throw new YarnException(message); } + // Start timeline collector for the submitted app + application.startTimelineCollector(); // Inform the ACLs Manager this.applicationACLsManager.addApplication(applicationId, submissionContext.getAMContainerSpec().getApplicationACLs()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index 949632e1e74..3b8a5b4675d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -47,7 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRen import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; -import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollector; +import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; /** * Context of the ResourceManager. @@ -114,9 +114,10 @@ public interface RMContext { SystemMetricsPublisher getSystemMetricsPublisher(); - void setRMTimelineCollector(RMTimelineCollector timelineCollector); + void setRMTimelineCollectorManager( + RMTimelineCollectorManager timelineCollectorManager); - RMTimelineCollector getRMTimelineCollector(); + RMTimelineCollectorManager getRMTimelineCollectorManager(); ConfigurationProvider getConfigurationProvider(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 34bb890d8fd..368fb1e6135 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -51,7 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRen import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; -import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollector; +import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; import org.apache.hadoop.yarn.util.Clock; import com.google.common.annotations.VisibleForTesting; @@ -371,14 +371,14 @@ public class RMContextImpl implements RMContext { } @Override - public void setRMTimelineCollector( - RMTimelineCollector timelineCollector) { - activeServiceContext.setRMTimelineCollector(timelineCollector); + public void setRMTimelineCollectorManager( + RMTimelineCollectorManager timelineCollectorManager) { + activeServiceContext.setRMTimelineCollectorManager(timelineCollectorManager); } @Override - public RMTimelineCollector getRMTimelineCollector() { - return activeServiceContext.getRMTimelineCollector(); + public RMTimelineCollectorManager getRMTimelineCollectorManager() { + return activeServiceContext.getRMTimelineCollectorManager(); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index a38b40c45f7..6bc95a5b915 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -104,11 +104,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; -import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollector; import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilter; import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilterInitializer; +import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher; import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils; import org.apache.hadoop.yarn.server.webproxy.WebAppProxy; @@ -460,8 +460,8 @@ public class ResourceManager extends CompositeService implements Recoverable { return new RMApplicationHistoryWriter(); } - private RMTimelineCollector createRMTimelineCollector() { - return new RMTimelineCollector(); + private RMTimelineCollectorManager createRMTimelineCollectorManager() { + return new RMTimelineCollectorManager(rmContext); } protected SystemMetricsPublisher createSystemMetricsPublisher() { @@ -589,10 +589,10 @@ public class ResourceManager extends CompositeService implements Recoverable { addService(systemMetricsPublisher); rmContext.setSystemMetricsPublisher(systemMetricsPublisher); - RMTimelineCollector timelineCollector = - createRMTimelineCollector(); - addService(timelineCollector); - rmContext.setRMTimelineCollector(timelineCollector); + RMTimelineCollectorManager timelineCollectorManager = + createRMTimelineCollectorManager(); + addService(timelineCollectorManager); + rmContext.setRMTimelineCollectorManager(timelineCollectorManager); // Register event handler for NodesListManager nodesListManager = new NodesListManager(rmContext); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index ac6a585abdf..efbf0e05576 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -97,6 +97,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; @@ -510,6 +512,17 @@ public class RMAppImpl implements RMApp, Recoverable { } } + public void startTimelineCollector() { + AppLevelTimelineCollector collector = + new AppLevelTimelineCollector(applicationId); + rmContext.getRMTimelineCollectorManager().putIfAbsent( + applicationId, collector); + } + + public void stopTimelineCollector() { + rmContext.getRMTimelineCollectorManager().remove(applicationId); + } + @Override public ApplicationId getApplicationId() { return this.applicationId; @@ -1366,6 +1379,8 @@ public class RMAppImpl implements RMApp, Recoverable { .applicationFinished(app, finalState); app.rmContext.getSystemMetricsPublisher() .appFinished(app, finalState, app.finishTime); + + app.stopTimelineCollector(); }; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollector.java deleted file mode 100644 index 4ea7a0394fd..00000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollector.java +++ /dev/null @@ -1,111 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.timelineservice; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.AsyncDispatcher; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsEvent; -import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsEventType; -import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector; -import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; - -/** - * This class is responsible for posting application and appattempt lifecycle - * related events to timeline service V2 - */ -@Private -@Unstable -public class RMTimelineCollector extends TimelineCollector { - private static final Log LOG = LogFactory.getLog(RMTimelineCollector.class); - - public RMTimelineCollector() { - super("Resource Manager TimelineCollector"); - } - - private Dispatcher dispatcher; - - private boolean publishSystemMetricsForV2; - - @Override - protected void serviceInit(Configuration conf) throws Exception { - publishSystemMetricsForV2 = - conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED) - && conf.getBoolean( - YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, - YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED); - - if (publishSystemMetricsForV2) { - // having separate dispatcher to avoid load on RMDispatcher - LOG.info("RMTimelineCollector has been configured to publish" - + " System Metrics in ATS V2"); - dispatcher = new AsyncDispatcher(); - dispatcher.register(SystemMetricsEventType.class, - new ForwardingEventHandler()); - } else { - LOG.warn("RMTimelineCollector has not been configured to publish" - + " System Metrics in ATS V2"); - } - super.serviceInit(conf); - } - - @Override - protected void serviceStart() throws Exception { - super.serviceStart(); - } - - @Override - protected void serviceStop() throws Exception { - super.serviceStop(); - } - - protected void handleSystemMetricsEvent(SystemMetricsEvent event) { - switch (event.getType()) { - default: - LOG.error("Unknown SystemMetricsEvent type: " + event.getType()); - } - } - - @Override - protected TimelineCollectorContext getTimelineEntityContext() { - // TODO address in YARN-3390. - return null; - } - - /** - * EventHandler implementation which forward events to SystemMetricsPublisher. - * Making use of it, SystemMetricsPublisher can avoid to have a public handle - * method. - */ - private final class ForwardingEventHandler implements - EventHandler { - - @Override - public void handle(SystemMetricsEvent event) { - handleSystemMetricsEvent(event); - } - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java new file mode 100644 index 00000000000..25e0e0fe8cd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.timelineservice; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; + + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class RMTimelineCollectorManager extends TimelineCollectorManager { + private RMContext rmContext; + + public RMTimelineCollectorManager(RMContext rmContext) { + super(RMTimelineCollectorManager.class.getName()); + this.rmContext = rmContext; + } + + @Override + public void postPut(ApplicationId appId, TimelineCollector collector) { + RMApp app = rmContext.getRMApps().get(appId); + if (app == null) { + throw new YarnRuntimeException( + "Unable to get the timeline collector context info for a non-existing app " + + appId); + } + String userId = app.getUser(); + if (userId != null && !userId.isEmpty()) { + collector.getTimelineEntityContext().setUserId(userId); + } + for (String tag : app.getApplicationTags()) { + String[] parts = tag.split(":", 2); + if (parts.length != 2 || parts[1].isEmpty()) { + continue; + } + switch (parts[0]) { + case TimelineUtils.FLOW_NAME_TAG_PREFIX: + collector.getTimelineEntityContext().setFlowName(parts[1]); + break; + case TimelineUtils.FLOW_VERSION_TAG_PREFIX: + collector.getTimelineEntityContext().setFlowVersion(parts[1]); + break; + case TimelineUtils.FLOW_RUN_ID_TAG_PREFIX: + collector.getTimelineEntityContext().setFlowRunId( + Long.valueOf(parts[1])); + break; + default: + break; + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java index 54c806cc612..0bdb68aac53 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java @@ -34,7 +34,7 @@ import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol; import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService; -import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager; +import org.apache.hadoop.yarn.server.timelineservice.collector.NodeTimelineCollectorManager; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -42,13 +42,13 @@ import org.junit.Test; import java.io.IOException; public class TestTimelineServiceClientIntegration { - private static TimelineCollectorManager collectorManager; + private static NodeTimelineCollectorManager collectorManager; private static PerNodeTimelineCollectorsAuxService auxService; @BeforeClass public static void setupClass() throws Exception { try { - collectorManager = new MyTimelineCollectorManager(); + collectorManager = new MockNodeTimelineCollectorManager(); auxService = PerNodeTimelineCollectorsAuxService.launchServer(new String[0], collectorManager); @@ -85,9 +85,9 @@ public class TestTimelineServiceClientIntegration { } } - private static class MyTimelineCollectorManager extends - TimelineCollectorManager { - public MyTimelineCollectorManager() { + private static class MockNodeTimelineCollectorManager extends + NodeTimelineCollectorManager { + public MockNodeTimelineCollectorManager() { super(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java index 5bc70e3f72d..fa3221185be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java @@ -75,7 +75,7 @@ public class AppLevelTimelineCollector extends TimelineCollector { } @Override - protected TimelineCollectorContext getTimelineEntityContext() { + public TimelineCollectorContext getTimelineEntityContext() { return context; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java new file mode 100644 index 00000000000..03ac7708651 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.collector; + +import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER; +import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URI; +import java.util.HashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.http.HttpServer2; +import org.apache.hadoop.http.lib.StaticUserWebFilter; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; +import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; +import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; +import org.apache.hadoop.yarn.webapp.util.WebAppUtils; + +import com.google.common.annotations.VisibleForTesting; + + +/** + * + * It is a singleton, and instances should be obtained via + * {@link #getInstance()}. + * + */ +@Private +@Unstable +public class NodeTimelineCollectorManager extends TimelineCollectorManager { + private static final Log LOG = + LogFactory.getLog(NodeTimelineCollectorManager.class); + private static final NodeTimelineCollectorManager INSTANCE = + new NodeTimelineCollectorManager(); + + + // REST server for this collector manager + private HttpServer2 timelineRestServer; + + private String timelineRestServerBindAddress; + + private CollectorNodemanagerProtocol nmCollectorService; + + private InetSocketAddress nmCollectorServiceAddress; + + static final String COLLECTOR_MANAGER_ATTR_KEY = "collector.manager"; + + static NodeTimelineCollectorManager getInstance() { + return INSTANCE; + } + + @VisibleForTesting + protected NodeTimelineCollectorManager() { + super(NodeTimelineCollectorManager.class.getName()); + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + this.nmCollectorServiceAddress = conf.getSocketAddr( + YarnConfiguration.NM_BIND_HOST, + YarnConfiguration.NM_COLLECTOR_SERVICE_ADDRESS, + YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS, + YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_PORT); + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + nmCollectorService = getNMCollectorService(); + startWebApp(); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + if (timelineRestServer != null) { + timelineRestServer.stop(); + } + super.serviceStop(); + } + + @Override + public void postPut(ApplicationId appId, TimelineCollector collector) { + try { + // Get context info from NM + updateTimelineCollectorContext(appId, collector); + // Report to NM if a new collector is added. + reportNewCollectorToNM(appId); + } catch (YarnException | IOException e) { + // throw exception here as it cannot be used if failed communicate with NM + LOG.error("Failed to communicate with NM Collector Service for " + appId); + throw new YarnRuntimeException(e); + } + } + + /** + * Launch the REST web server for this collector manager + */ + private void startWebApp() { + Configuration conf = getConfig(); + String bindAddress = conf.get(YarnConfiguration.TIMELINE_SERVICE_BIND_HOST, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_BIND_HOST) + ":0"; + try { + Configuration confForInfoServer = new Configuration(conf); + confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10); + HttpServer2.Builder builder = new HttpServer2.Builder() + .setName("timeline") + .setConf(conf) + .addEndpoint(URI.create( + (YarnConfiguration.useHttps(conf) ? "https://" : "http://") + + bindAddress)); + timelineRestServer = builder.build(); + // TODO: replace this by an authentication filter in future. + HashMap options = new HashMap<>(); + String username = conf.get(HADOOP_HTTP_STATIC_USER, + DEFAULT_HADOOP_HTTP_STATIC_USER); + options.put(HADOOP_HTTP_STATIC_USER, username); + HttpServer2.defineFilter(timelineRestServer.getWebAppContext(), + "static_user_filter_timeline", + StaticUserWebFilter.StaticUserFilter.class.getName(), + options, new String[] {"/*"}); + + timelineRestServer.addJerseyResourcePackage( + TimelineCollectorWebService.class.getPackage().getName() + ";" + + GenericExceptionHandler.class.getPackage().getName() + ";" + + YarnJacksonJaxbJsonProvider.class.getPackage().getName(), + "/*"); + timelineRestServer.setAttribute(COLLECTOR_MANAGER_ATTR_KEY, this); + timelineRestServer.start(); + } catch (Exception e) { + String msg = "The per-node collector webapp failed to start."; + LOG.error(msg, e); + throw new YarnRuntimeException(msg, e); + } + //TODO: We need to think of the case of multiple interfaces + this.timelineRestServerBindAddress = WebAppUtils.getResolvedAddress( + timelineRestServer.getConnectorAddress(0)); + LOG.info("Instantiated the per-node collector webapp at " + + timelineRestServerBindAddress); + } + + private void reportNewCollectorToNM(ApplicationId appId) + throws YarnException, IOException { + ReportNewCollectorInfoRequest request = + ReportNewCollectorInfoRequest.newInstance(appId, + this.timelineRestServerBindAddress); + LOG.info("Report a new collector for application: " + appId + + " to the NM Collector Service."); + nmCollectorService.reportNewCollectorInfo(request); + } + + private void updateTimelineCollectorContext( + ApplicationId appId, TimelineCollector collector) + throws YarnException, IOException { + GetTimelineCollectorContextRequest request = + GetTimelineCollectorContextRequest.newInstance(appId); + LOG.info("Get timeline collector context for " + appId); + GetTimelineCollectorContextResponse response = + nmCollectorService.getTimelineCollectorContext(request); + String userId = response.getUserId(); + if (userId != null && !userId.isEmpty()) { + collector.getTimelineEntityContext().setUserId(userId); + } + String flowName = response.getFlowName(); + if (flowName != null && !flowName.isEmpty()) { + collector.getTimelineEntityContext().setFlowName(flowName); + } + String flowVersion = response.getFlowVersion(); + if (flowVersion != null && !flowVersion.isEmpty()) { + collector.getTimelineEntityContext().setFlowVersion(flowVersion); + } + long flowRunId = response.getFlowRunId(); + if (flowRunId != 0L) { + collector.getTimelineEntityContext().setFlowRunId(flowRunId); + } + } + + @VisibleForTesting + protected CollectorNodemanagerProtocol getNMCollectorService() { + Configuration conf = getConfig(); + final YarnRPC rpc = YarnRPC.create(conf); + + // TODO Security settings. + return (CollectorNodemanagerProtocol) rpc.getProxy( + CollectorNodemanagerProtocol.class, + nmCollectorServiceAddress, conf); + } + + @VisibleForTesting + public String getRestServerBindAddress() { + return timelineRestServerBindAddress; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java index 2017d012ac9..36ff5c0ab70 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java @@ -53,15 +53,15 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService { LogFactory.getLog(PerNodeTimelineCollectorsAuxService.class); private static final int SHUTDOWN_HOOK_PRIORITY = 30; - private final TimelineCollectorManager collectorManager; + private final NodeTimelineCollectorManager collectorManager; public PerNodeTimelineCollectorsAuxService() { // use the same singleton - this(TimelineCollectorManager.getInstance()); + this(NodeTimelineCollectorManager.getInstance()); } @VisibleForTesting PerNodeTimelineCollectorsAuxService( - TimelineCollectorManager collectorsManager) { + NodeTimelineCollectorManager collectorsManager) { super("timeline_collector"); this.collectorManager = collectorsManager; } @@ -108,8 +108,7 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService { * @return whether it was removed successfully */ public boolean removeApplication(ApplicationId appId) { - String appIdString = appId.toString(); - return collectorManager.remove(appIdString); + return collectorManager.remove(appId); } /** @@ -153,8 +152,8 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService { } @VisibleForTesting - boolean hasApplication(String appId) { - return collectorManager.containsKey(appId); + boolean hasApplication(ApplicationId appId) { + return collectorManager.containsTimelineCollector(appId); } @Override @@ -174,7 +173,7 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService { @VisibleForTesting public static PerNodeTimelineCollectorsAuxService - launchServer(String[] args, TimelineCollectorManager collectorManager) { + launchServer(String[] args, NodeTimelineCollectorManager collectorManager) { Thread .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); StringUtils.startupShutdownMessage( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java index f1d3d72b656..4eced5b69ae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java @@ -124,6 +124,6 @@ public abstract class TimelineCollector extends CompositeService { } } - protected abstract TimelineCollectorContext getTimelineEntityContext(); + public abstract TimelineCollectorContext getTimelineEntityContext(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java index 9a566a2f918..7b3da6bc31b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java @@ -18,173 +18,97 @@ package org.apache.hadoop.yarn.server.timelineservice.collector; -import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER; -import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.URI; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.http.HttpServer2; -import org.apache.hadoop.http.lib.StaticUserWebFilter; -import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.ipc.YarnRPC; -import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol; -import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; -import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; -import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; -import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; -import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; -import org.apache.hadoop.yarn.webapp.util.WebAppUtils; -import com.google.common.annotations.VisibleForTesting; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; /** * Class that manages adding and removing collectors and their lifecycle. It * provides thread safety access to the collectors inside. * - * It is a singleton, and instances should be obtained via - * {@link #getInstance()}. */ -@Private -@Unstable -public class TimelineCollectorManager extends CompositeService { +@InterfaceAudience.Private +@InterfaceStability.Unstable +public abstract class TimelineCollectorManager extends AbstractService { private static final Log LOG = LogFactory.getLog(TimelineCollectorManager.class); - private static final TimelineCollectorManager INSTANCE = - new TimelineCollectorManager(); // access to this map is synchronized with the map itself - private final Map collectors = + private final Map collectors = Collections.synchronizedMap( - new HashMap()); + new HashMap()); - // REST server for this collector manager - private HttpServer2 timelineRestServer; - - private String timelineRestServerBindAddress; - - private CollectorNodemanagerProtocol nmCollectorService; - - private InetSocketAddress nmCollectorServiceAddress; - - static final String COLLECTOR_MANAGER_ATTR_KEY = "collector.manager"; - - static TimelineCollectorManager getInstance() { - return INSTANCE; - } - - @VisibleForTesting - protected TimelineCollectorManager() { - super(TimelineCollectorManager.class.getName()); - } - - @Override - public void serviceInit(Configuration conf) throws Exception { - this.nmCollectorServiceAddress = conf.getSocketAddr( - YarnConfiguration.NM_BIND_HOST, - YarnConfiguration.NM_COLLECTOR_SERVICE_ADDRESS, - YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS, - YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_PORT); - - } - - @Override - protected void serviceStart() throws Exception { - nmCollectorService = getNMCollectorService(); - startWebApp(); - super.serviceStart(); - } - - @Override - protected void serviceStop() throws Exception { - if (timelineRestServer != null) { - timelineRestServer.stop(); - } - super.serviceStop(); + protected TimelineCollectorManager(String name) { + super(name); } /** * Put the collector into the collection if an collector mapped by id does * not exist. * - * @throws YarnRuntimeException if there was any exception in initializing and - * starting the app level service + * @throws YarnRuntimeException if there was any exception in initializing + * and starting the app level service * @return the collector associated with id after the potential put. */ public TimelineCollector putIfAbsent(ApplicationId appId, TimelineCollector collector) { - String id = appId.toString(); - TimelineCollector collectorInTable; - boolean collectorIsNew = false; + TimelineCollector collectorInTable = null; synchronized (collectors) { - collectorInTable = collectors.get(id); + collectorInTable = collectors.get(appId); if (collectorInTable == null) { try { // initialize, start, and add it to the collection so it can be // cleaned up when the parent shuts down collector.init(getConfig()); collector.start(); - collectors.put(id, collector); - LOG.info("the collector for " + id + " was added"); + collectors.put(appId, collector); + LOG.info("the collector for " + appId + " was added"); collectorInTable = collector; - collectorIsNew = true; + postPut(appId, collectorInTable); } catch (Exception e) { throw new YarnRuntimeException(e); } } else { - String msg = "the collector for " + id + " already exists!"; - LOG.error(msg); - } - - } - // Report to NM if a new collector is added. - if (collectorIsNew) { - try { - updateTimelineCollectorContext(appId, collector); - reportNewCollectorToNM(appId); - } catch (Exception e) { - // throw exception here as it cannot be used if failed communicate with NM - LOG.error("Failed to communicate with NM Collector Service for " + appId); - throw new YarnRuntimeException(e); + LOG.info("the collector for " + appId + " already exists!"); } } - return collectorInTable; } + protected void postPut(ApplicationId appId, TimelineCollector collector) { + + } + /** * Removes the collector for the specified id. The collector is also stopped * as a result. If the collector does not exist, no change is made. * * @return whether it was removed successfully */ - public boolean remove(String id) { - synchronized (collectors) { - TimelineCollector collector = collectors.remove(id); - if (collector == null) { - String msg = "the collector for " + id + " does not exist!"; - LOG.error(msg); - return false; - } else { - // stop the service to do clean up - collector.stop(); - LOG.info("the collector service for " + id + " was removed"); - return true; - } + public boolean remove(ApplicationId appId) { + TimelineCollector collector = collectors.remove(appId); + if (collector == null) { + LOG.error("the collector for " + appId + " does not exist!"); + } else { + postRemove(appId, collector); + // stop the service to do clean up + collector.stop(); + LOG.info("the collector service for " + appId + " was removed"); } + return collector != null; + } + + protected void postRemove(ApplicationId appId, TimelineCollector collector) { + } /** @@ -192,113 +116,16 @@ public class TimelineCollectorManager extends CompositeService { * * @return the collector or null if it does not exist */ - public TimelineCollector get(String id) { - return collectors.get(id); + public TimelineCollector get(ApplicationId appId) { + return collectors.get(appId); } /** * Returns whether the collector for the specified id exists in this * collection. */ - public boolean containsKey(String id) { - return collectors.containsKey(id); + public boolean containsTimelineCollector(ApplicationId appId) { + return collectors.containsKey(appId); } - /** - * Launch the REST web server for this collector manager - */ - private void startWebApp() { - Configuration conf = getConfig(); - String bindAddress = conf.get(YarnConfiguration.TIMELINE_SERVICE_BIND_HOST, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_BIND_HOST) + ":0"; - try { - Configuration confForInfoServer = new Configuration(conf); - confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10); - HttpServer2.Builder builder = new HttpServer2.Builder() - .setName("timeline") - .setConf(conf) - .addEndpoint(URI.create( - (YarnConfiguration.useHttps(conf) ? "https://" : "http://") + - bindAddress)); - timelineRestServer = builder.build(); - // TODO: replace this by an authentication filter in future. - HashMap options = new HashMap<>(); - String username = conf.get(HADOOP_HTTP_STATIC_USER, - DEFAULT_HADOOP_HTTP_STATIC_USER); - options.put(HADOOP_HTTP_STATIC_USER, username); - HttpServer2.defineFilter(timelineRestServer.getWebAppContext(), - "static_user_filter_timeline", - StaticUserWebFilter.StaticUserFilter.class.getName(), - options, new String[] {"/*"}); - - timelineRestServer.addJerseyResourcePackage( - TimelineCollectorWebService.class.getPackage().getName() + ";" - + GenericExceptionHandler.class.getPackage().getName() + ";" - + YarnJacksonJaxbJsonProvider.class.getPackage().getName(), - "/*"); - timelineRestServer.setAttribute(COLLECTOR_MANAGER_ATTR_KEY, this); - timelineRestServer.start(); - } catch (Exception e) { - String msg = "The per-node collector webapp failed to start."; - LOG.error(msg, e); - throw new YarnRuntimeException(msg, e); - } - //TODO: We need to think of the case of multiple interfaces - this.timelineRestServerBindAddress = WebAppUtils.getResolvedAddress( - timelineRestServer.getConnectorAddress(0)); - LOG.info("Instantiated the per-node collector webapp at " + - timelineRestServerBindAddress); - } - - private void reportNewCollectorToNM(ApplicationId appId) - throws YarnException, IOException { - ReportNewCollectorInfoRequest request = - ReportNewCollectorInfoRequest.newInstance(appId, - this.timelineRestServerBindAddress); - LOG.info("Report a new collector for application: " + appId + - " to the NM Collector Service."); - nmCollectorService.reportNewCollectorInfo(request); - } - - private void updateTimelineCollectorContext( - ApplicationId appId, TimelineCollector collector) - throws YarnException, IOException { - GetTimelineCollectorContextRequest request = - GetTimelineCollectorContextRequest.newInstance(appId); - LOG.info("Get timeline collector context for " + appId); - GetTimelineCollectorContextResponse response = - nmCollectorService.getTimelineCollectorContext(request); - String userId = response.getUserId(); - if (userId != null && !userId.isEmpty()) { - collector.getTimelineEntityContext().setUserId(userId); - } - String flowName = response.getFlowName(); - if (flowName != null && !flowName.isEmpty()) { - collector.getTimelineEntityContext().setFlowName(flowName); - } - String flowVersion = response.getFlowVersion(); - if (flowVersion != null && !flowVersion.isEmpty()) { - collector.getTimelineEntityContext().setFlowVersion(flowVersion); - } - long flowRunId = response.getFlowRunId(); - if (flowRunId != 0L) { - collector.getTimelineEntityContext().setFlowRunId(flowRunId); - } - } - - @VisibleForTesting - protected CollectorNodemanagerProtocol getNMCollectorService() { - Configuration conf = getConfig(); - final YarnRPC rpc = YarnRPC.create(conf); - - // TODO Security settings. - return (CollectorNodemanagerProtocol) rpc.getProxy( - CollectorNodemanagerProtocol.class, - nmCollectorServiceAddress, conf); - } - - @VisibleForTesting - public String getRestServerBindAddress() { - return timelineRestServerBindAddress; - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java index edec0d357b2..2165c662367 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java @@ -42,6 +42,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.webapp.ForbiddenException; @@ -129,11 +130,14 @@ public class TimelineCollectorWebService { boolean isAsync = async != null && async.trim().equalsIgnoreCase("true"); try { - appId = parseApplicationId(appId); - if (appId == null) { + ApplicationId appID = parseApplicationId(appId); + if (appID == null) { return Response.status(Response.Status.BAD_REQUEST).build(); } - TimelineCollector collector = getCollector(req, appId); + NodeTimelineCollectorManager collectorManager = + (NodeTimelineCollectorManager) context.getAttribute( + NodeTimelineCollectorManager.COLLECTOR_MANAGER_ATTR_KEY); + TimelineCollector collector = collectorManager.get(appID); if (collector == null) { LOG.error("Application: "+ appId + " is not found"); throw new NotFoundException(); // different exception? @@ -147,10 +151,10 @@ public class TimelineCollectorWebService { } } - private String parseApplicationId(String appId) { + private ApplicationId parseApplicationId(String appId) { try { if (appId != null) { - return ConverterUtils.toApplicationId(appId.trim()).toString(); + return ConverterUtils.toApplicationId(appId.trim()); } else { return null; } @@ -159,15 +163,6 @@ public class TimelineCollectorWebService { } } - private TimelineCollector - getCollector(HttpServletRequest req, String appIdToParse) { - String appIdString = parseApplicationId(appIdToParse); - final TimelineCollectorManager collectorManager = - (TimelineCollectorManager) context.getAttribute( - TimelineCollectorManager.COLLECTOR_MANAGER_ATTR_KEY); - return collectorManager.get(appIdString); - } - private void init(HttpServletResponse response) { response.setContentType(null); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java similarity index 91% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorManager.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java index c662998f829..87343fd6372 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java @@ -49,8 +49,8 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -public class TestTimelineCollectorManager { - private TimelineCollectorManager collectorManager; +public class TestNMTimelineCollectorManager { + private NodeTimelineCollectorManager collectorManager; @Before public void setup() throws Exception { @@ -103,7 +103,7 @@ public class TestTimelineCollectorManager { // check the keys for (int i = 0; i < NUM_APPS; i++) { final ApplicationId appId = ApplicationId.newInstance(0L, i); - assertTrue(collectorManager.containsKey(appId.toString())); + assertTrue(collectorManager.containsTimelineCollector(appId)); } } @@ -119,7 +119,7 @@ public class TestTimelineCollectorManager { new AppLevelTimelineCollector(appId); boolean successPut = (collectorManager.putIfAbsent(appId, collector) == collector); - return successPut && collectorManager.remove(appId.toString()); + return successPut && collectorManager.remove(appId); } }; tasks.add(task); @@ -136,13 +136,13 @@ public class TestTimelineCollectorManager { // check the keys for (int i = 0; i < NUM_APPS; i++) { final ApplicationId appId = ApplicationId.newInstance(0L, i); - assertFalse(collectorManager.containsKey(appId.toString())); + assertFalse(collectorManager.containsTimelineCollector(appId)); } } - private TimelineCollectorManager createCollectorManager() { - final TimelineCollectorManager collectorManager = - spy(new TimelineCollectorManager()); + private NodeTimelineCollectorManager createCollectorManager() { + final NodeTimelineCollectorManager collectorManager = + spy(new NodeTimelineCollectorManager()); doReturn(new Configuration()).when(collectorManager).getConfig(); CollectorNodemanagerProtocol nmCollectorService = mock(CollectorNodemanagerProtocol.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java index abbe13ad623..b1a5b046b21 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java @@ -67,8 +67,7 @@ public class TestPerNodeTimelineCollectorsAuxService { public void testAddApplication() throws Exception { auxService = createCollectorAndAddApplication(); // auxService should have a single app - assertTrue(auxService.hasApplication( - appAttemptId.getApplicationId().toString())); + assertTrue(auxService.hasApplication(appAttemptId.getApplicationId())); auxService.close(); } @@ -82,16 +81,14 @@ public class TestPerNodeTimelineCollectorsAuxService { when(context.getContainerId()).thenReturn(containerId); auxService.initializeContainer(context); // auxService should not have that app - assertFalse(auxService.hasApplication( - appAttemptId.getApplicationId().toString())); + assertFalse(auxService.hasApplication(appAttemptId.getApplicationId())); } @Test public void testRemoveApplication() throws Exception { auxService = createCollectorAndAddApplication(); // auxService should have a single app - String appIdStr = appAttemptId.getApplicationId().toString(); - assertTrue(auxService.hasApplication(appIdStr)); + assertTrue(auxService.hasApplication(appAttemptId.getApplicationId())); ContainerId containerId = getAMContainerId(); ContainerTerminationContext context = @@ -99,7 +96,7 @@ public class TestPerNodeTimelineCollectorsAuxService { when(context.getContainerId()).thenReturn(containerId); auxService.stopContainer(context); // auxService should not have that app - assertFalse(auxService.hasApplication(appIdStr)); + assertFalse(auxService.hasApplication(appAttemptId.getApplicationId())); auxService.close(); } @@ -107,8 +104,7 @@ public class TestPerNodeTimelineCollectorsAuxService { public void testRemoveApplicationNonAMContainer() throws Exception { auxService = createCollectorAndAddApplication(); // auxService should have a single app - String appIdStr = appAttemptId.getApplicationId().toString(); - assertTrue(auxService.hasApplication(appIdStr)); + assertTrue(auxService.hasApplication(appAttemptId.getApplicationId())); ContainerId containerId = getContainerId(2L); // not an AM ContainerTerminationContext context = @@ -116,7 +112,7 @@ public class TestPerNodeTimelineCollectorsAuxService { when(context.getContainerId()).thenReturn(containerId); auxService.stopContainer(context); // auxService should still have that app - assertTrue(auxService.hasApplication(appIdStr)); + assertTrue(auxService.hasApplication(appAttemptId.getApplicationId())); auxService.close(); } @@ -147,7 +143,7 @@ public class TestPerNodeTimelineCollectorsAuxService { } private PerNodeTimelineCollectorsAuxService createCollector() { - TimelineCollectorManager collectorManager = createCollectorManager(); + NodeTimelineCollectorManager collectorManager = createCollectorManager(); PerNodeTimelineCollectorsAuxService auxService = spy(new PerNodeTimelineCollectorsAuxService(collectorManager)); auxService.init(new YarnConfiguration()); @@ -155,9 +151,9 @@ public class TestPerNodeTimelineCollectorsAuxService { return auxService; } - private TimelineCollectorManager createCollectorManager() { - TimelineCollectorManager collectorManager = - spy(new TimelineCollectorManager()); + private NodeTimelineCollectorManager createCollectorManager() { + NodeTimelineCollectorManager collectorManager = + spy(new NodeTimelineCollectorManager()); doReturn(new Configuration()).when(collectorManager).getConfig(); CollectorNodemanagerProtocol nmCollectorService = mock(CollectorNodemanagerProtocol.class);