YARN-3390. Reuse TimelineCollectorManager for RM (Zhijie Shen via sjlee)

(cherry picked from commit 58221188811e0f61d842dac89e1f4ad4fd8aa182)
This commit is contained in:
Sangjin Lee 2015-04-24 16:56:23 -07:00
parent 47f35a30bb
commit 11e8905d8d
17 changed files with 431 additions and 407 deletions

View File

@ -49,7 +49,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRen
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollector;
import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
@ -97,7 +97,7 @@ public class RMActiveServiceContext {
private ApplicationMasterService applicationMasterService;
private RMApplicationHistoryWriter rmApplicationHistoryWriter;
private SystemMetricsPublisher systemMetricsPublisher;
private RMTimelineCollector timelineCollector;
private RMTimelineCollectorManager timelineCollectorManager;
private RMNodeLabelsManager nodeLabelManager;
private RMDelegatedNodeLabelsUpdater rmDelegatedNodeLabelsUpdater;
@ -381,14 +381,15 @@ public class RMActiveServiceContext {
@Private
@Unstable
public RMTimelineCollector getRMTimelineCollector() {
return timelineCollector;
public RMTimelineCollectorManager getRMTimelineCollectorManager() {
return timelineCollectorManager;
}
@Private
@Unstable
public void setRMTimelineCollector(RMTimelineCollector timelineCollector) {
this.timelineCollector = timelineCollector;
public void setRMTimelineCollectorManager(
RMTimelineCollectorManager timelineCollectorManager) {
this.timelineCollectorManager = timelineCollectorManager;
}
@Private

View File

@ -385,12 +385,13 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
}
// Create RMApp
RMAppImpl application = new RMAppImpl(applicationId, rmContext, this.conf,
RMAppImpl application =
new RMAppImpl(applicationId, rmContext, this.conf,
submissionContext.getApplicationName(), user,
submissionContext.getQueue(), submissionContext, this.scheduler,
this.masterService, submitTime, submissionContext.getApplicationType(),
submissionContext.getQueue(),
submissionContext, this.scheduler, this.masterService,
submitTime, submissionContext.getApplicationType(),
submissionContext.getApplicationTags(), amReq);
// Concurrent app submissions with same applicationId will fail here
// Concurrent app submissions with different applicationIds will not
// influence each other
@ -401,6 +402,8 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
LOG.warn(message);
throw new YarnException(message);
}
// Start timeline collector for the submitted app
application.startTimelineCollector();
// Inform the ACLs Manager
this.applicationACLsManager.addApplication(applicationId,
submissionContext.getAMContainerSpec().getApplicationACLs());

View File

@ -47,7 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRen
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollector;
import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
/**
* Context of the ResourceManager.
@ -114,9 +114,10 @@ public interface RMContext {
SystemMetricsPublisher getSystemMetricsPublisher();
void setRMTimelineCollector(RMTimelineCollector timelineCollector);
void setRMTimelineCollectorManager(
RMTimelineCollectorManager timelineCollectorManager);
RMTimelineCollector getRMTimelineCollector();
RMTimelineCollectorManager getRMTimelineCollectorManager();
ConfigurationProvider getConfigurationProvider();

View File

@ -51,7 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRen
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollector;
import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
import org.apache.hadoop.yarn.util.Clock;
import com.google.common.annotations.VisibleForTesting;
@ -371,14 +371,14 @@ public class RMContextImpl implements RMContext {
}
@Override
public void setRMTimelineCollector(
RMTimelineCollector timelineCollector) {
activeServiceContext.setRMTimelineCollector(timelineCollector);
public void setRMTimelineCollectorManager(
RMTimelineCollectorManager timelineCollectorManager) {
activeServiceContext.setRMTimelineCollectorManager(timelineCollectorManager);
}
@Override
public RMTimelineCollector getRMTimelineCollector() {
return activeServiceContext.getRMTimelineCollector();
public RMTimelineCollectorManager getRMTimelineCollectorManager() {
return activeServiceContext.getRMTimelineCollectorManager();
}
@Override

View File

@ -104,11 +104,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollector;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilter;
import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilterInitializer;
import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher;
import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
import org.apache.hadoop.yarn.server.webproxy.WebAppProxy;
@ -460,8 +460,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
return new RMApplicationHistoryWriter();
}
private RMTimelineCollector createRMTimelineCollector() {
return new RMTimelineCollector();
private RMTimelineCollectorManager createRMTimelineCollectorManager() {
return new RMTimelineCollectorManager(rmContext);
}
protected SystemMetricsPublisher createSystemMetricsPublisher() {
@ -589,10 +589,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
addService(systemMetricsPublisher);
rmContext.setSystemMetricsPublisher(systemMetricsPublisher);
RMTimelineCollector timelineCollector =
createRMTimelineCollector();
addService(timelineCollector);
rmContext.setRMTimelineCollector(timelineCollector);
RMTimelineCollectorManager timelineCollectorManager =
createRMTimelineCollectorManager();
addService(timelineCollectorManager);
rmContext.setRMTimelineCollectorManager(timelineCollectorManager);
// Register event handler for NodesListManager
nodesListManager = new NodesListManager(rmContext);

View File

@ -97,6 +97,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
@ -510,6 +512,17 @@ public class RMAppImpl implements RMApp, Recoverable {
}
}
public void startTimelineCollector() {
AppLevelTimelineCollector collector =
new AppLevelTimelineCollector(applicationId);
rmContext.getRMTimelineCollectorManager().putIfAbsent(
applicationId, collector);
}
public void stopTimelineCollector() {
rmContext.getRMTimelineCollectorManager().remove(applicationId);
}
@Override
public ApplicationId getApplicationId() {
return this.applicationId;
@ -1366,6 +1379,8 @@ public class RMAppImpl implements RMApp, Recoverable {
.applicationFinished(app, finalState);
app.rmContext.getSystemMetricsPublisher()
.appFinished(app, finalState, app.finishTime);
app.stopTimelineCollector();
};
}

View File

@ -1,111 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.timelineservice;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsEvent;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsEventType;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
/**
* This class is responsible for posting application and appattempt lifecycle
* related events to timeline service V2
*/
@Private
@Unstable
public class RMTimelineCollector extends TimelineCollector {
private static final Log LOG = LogFactory.getLog(RMTimelineCollector.class);
public RMTimelineCollector() {
super("Resource Manager TimelineCollector");
}
private Dispatcher dispatcher;
private boolean publishSystemMetricsForV2;
@Override
protected void serviceInit(Configuration conf) throws Exception {
publishSystemMetricsForV2 =
conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)
&& conf.getBoolean(
YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED,
YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED);
if (publishSystemMetricsForV2) {
// having separate dispatcher to avoid load on RMDispatcher
LOG.info("RMTimelineCollector has been configured to publish"
+ " System Metrics in ATS V2");
dispatcher = new AsyncDispatcher();
dispatcher.register(SystemMetricsEventType.class,
new ForwardingEventHandler());
} else {
LOG.warn("RMTimelineCollector has not been configured to publish"
+ " System Metrics in ATS V2");
}
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
super.serviceStop();
}
protected void handleSystemMetricsEvent(SystemMetricsEvent event) {
switch (event.getType()) {
default:
LOG.error("Unknown SystemMetricsEvent type: " + event.getType());
}
}
@Override
protected TimelineCollectorContext getTimelineEntityContext() {
// TODO address in YARN-3390.
return null;
}
/**
* EventHandler implementation which forward events to SystemMetricsPublisher.
* Making use of it, SystemMetricsPublisher can avoid to have a public handle
* method.
*/
private final class ForwardingEventHandler implements
EventHandler<SystemMetricsEvent> {
@Override
public void handle(SystemMetricsEvent event) {
handleSystemMetricsEvent(event);
}
}
}

View File

@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.timelineservice;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class RMTimelineCollectorManager extends TimelineCollectorManager {
private RMContext rmContext;
public RMTimelineCollectorManager(RMContext rmContext) {
super(RMTimelineCollectorManager.class.getName());
this.rmContext = rmContext;
}
@Override
public void postPut(ApplicationId appId, TimelineCollector collector) {
RMApp app = rmContext.getRMApps().get(appId);
if (app == null) {
throw new YarnRuntimeException(
"Unable to get the timeline collector context info for a non-existing app " +
appId);
}
String userId = app.getUser();
if (userId != null && !userId.isEmpty()) {
collector.getTimelineEntityContext().setUserId(userId);
}
for (String tag : app.getApplicationTags()) {
String[] parts = tag.split(":", 2);
if (parts.length != 2 || parts[1].isEmpty()) {
continue;
}
switch (parts[0]) {
case TimelineUtils.FLOW_NAME_TAG_PREFIX:
collector.getTimelineEntityContext().setFlowName(parts[1]);
break;
case TimelineUtils.FLOW_VERSION_TAG_PREFIX:
collector.getTimelineEntityContext().setFlowVersion(parts[1]);
break;
case TimelineUtils.FLOW_RUN_ID_TAG_PREFIX:
collector.getTimelineEntityContext().setFlowRunId(
Long.valueOf(parts[1]));
break;
default:
break;
}
}
}
}

View File

@ -34,7 +34,7 @@ import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
import org.apache.hadoop.yarn.server.timelineservice.collector.NodeTimelineCollectorManager;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -42,13 +42,13 @@ import org.junit.Test;
import java.io.IOException;
public class TestTimelineServiceClientIntegration {
private static TimelineCollectorManager collectorManager;
private static NodeTimelineCollectorManager collectorManager;
private static PerNodeTimelineCollectorsAuxService auxService;
@BeforeClass
public static void setupClass() throws Exception {
try {
collectorManager = new MyTimelineCollectorManager();
collectorManager = new MockNodeTimelineCollectorManager();
auxService =
PerNodeTimelineCollectorsAuxService.launchServer(new String[0],
collectorManager);
@ -85,9 +85,9 @@ public class TestTimelineServiceClientIntegration {
}
}
private static class MyTimelineCollectorManager extends
TimelineCollectorManager {
public MyTimelineCollectorManager() {
private static class MockNodeTimelineCollectorManager extends
NodeTimelineCollectorManager {
public MockNodeTimelineCollectorManager() {
super();
}

View File

@ -75,7 +75,7 @@ public class AppLevelTimelineCollector extends TimelineCollector {
}
@Override
protected TimelineCollectorContext getTimelineEntityContext() {
public TimelineCollectorContext getTimelineEntityContext() {
return context;
}

View File

@ -0,0 +1,223 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.collector;
import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER;
import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.HashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.http.lib.StaticUserWebFilter;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import com.google.common.annotations.VisibleForTesting;
/**
*
* It is a singleton, and instances should be obtained via
* {@link #getInstance()}.
*
*/
@Private
@Unstable
public class NodeTimelineCollectorManager extends TimelineCollectorManager {
private static final Log LOG =
LogFactory.getLog(NodeTimelineCollectorManager.class);
private static final NodeTimelineCollectorManager INSTANCE =
new NodeTimelineCollectorManager();
// REST server for this collector manager
private HttpServer2 timelineRestServer;
private String timelineRestServerBindAddress;
private CollectorNodemanagerProtocol nmCollectorService;
private InetSocketAddress nmCollectorServiceAddress;
static final String COLLECTOR_MANAGER_ATTR_KEY = "collector.manager";
static NodeTimelineCollectorManager getInstance() {
return INSTANCE;
}
@VisibleForTesting
protected NodeTimelineCollectorManager() {
super(NodeTimelineCollectorManager.class.getName());
}
@Override
public void serviceInit(Configuration conf) throws Exception {
this.nmCollectorServiceAddress = conf.getSocketAddr(
YarnConfiguration.NM_BIND_HOST,
YarnConfiguration.NM_COLLECTOR_SERVICE_ADDRESS,
YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS,
YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_PORT);
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
nmCollectorService = getNMCollectorService();
startWebApp();
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
if (timelineRestServer != null) {
timelineRestServer.stop();
}
super.serviceStop();
}
@Override
public void postPut(ApplicationId appId, TimelineCollector collector) {
try {
// Get context info from NM
updateTimelineCollectorContext(appId, collector);
// Report to NM if a new collector is added.
reportNewCollectorToNM(appId);
} catch (YarnException | IOException e) {
// throw exception here as it cannot be used if failed communicate with NM
LOG.error("Failed to communicate with NM Collector Service for " + appId);
throw new YarnRuntimeException(e);
}
}
/**
* Launch the REST web server for this collector manager
*/
private void startWebApp() {
Configuration conf = getConfig();
String bindAddress = conf.get(YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_BIND_HOST) + ":0";
try {
Configuration confForInfoServer = new Configuration(conf);
confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10);
HttpServer2.Builder builder = new HttpServer2.Builder()
.setName("timeline")
.setConf(conf)
.addEndpoint(URI.create(
(YarnConfiguration.useHttps(conf) ? "https://" : "http://") +
bindAddress));
timelineRestServer = builder.build();
// TODO: replace this by an authentication filter in future.
HashMap<String, String> options = new HashMap<>();
String username = conf.get(HADOOP_HTTP_STATIC_USER,
DEFAULT_HADOOP_HTTP_STATIC_USER);
options.put(HADOOP_HTTP_STATIC_USER, username);
HttpServer2.defineFilter(timelineRestServer.getWebAppContext(),
"static_user_filter_timeline",
StaticUserWebFilter.StaticUserFilter.class.getName(),
options, new String[] {"/*"});
timelineRestServer.addJerseyResourcePackage(
TimelineCollectorWebService.class.getPackage().getName() + ";"
+ GenericExceptionHandler.class.getPackage().getName() + ";"
+ YarnJacksonJaxbJsonProvider.class.getPackage().getName(),
"/*");
timelineRestServer.setAttribute(COLLECTOR_MANAGER_ATTR_KEY, this);
timelineRestServer.start();
} catch (Exception e) {
String msg = "The per-node collector webapp failed to start.";
LOG.error(msg, e);
throw new YarnRuntimeException(msg, e);
}
//TODO: We need to think of the case of multiple interfaces
this.timelineRestServerBindAddress = WebAppUtils.getResolvedAddress(
timelineRestServer.getConnectorAddress(0));
LOG.info("Instantiated the per-node collector webapp at " +
timelineRestServerBindAddress);
}
private void reportNewCollectorToNM(ApplicationId appId)
throws YarnException, IOException {
ReportNewCollectorInfoRequest request =
ReportNewCollectorInfoRequest.newInstance(appId,
this.timelineRestServerBindAddress);
LOG.info("Report a new collector for application: " + appId +
" to the NM Collector Service.");
nmCollectorService.reportNewCollectorInfo(request);
}
private void updateTimelineCollectorContext(
ApplicationId appId, TimelineCollector collector)
throws YarnException, IOException {
GetTimelineCollectorContextRequest request =
GetTimelineCollectorContextRequest.newInstance(appId);
LOG.info("Get timeline collector context for " + appId);
GetTimelineCollectorContextResponse response =
nmCollectorService.getTimelineCollectorContext(request);
String userId = response.getUserId();
if (userId != null && !userId.isEmpty()) {
collector.getTimelineEntityContext().setUserId(userId);
}
String flowName = response.getFlowName();
if (flowName != null && !flowName.isEmpty()) {
collector.getTimelineEntityContext().setFlowName(flowName);
}
String flowVersion = response.getFlowVersion();
if (flowVersion != null && !flowVersion.isEmpty()) {
collector.getTimelineEntityContext().setFlowVersion(flowVersion);
}
long flowRunId = response.getFlowRunId();
if (flowRunId != 0L) {
collector.getTimelineEntityContext().setFlowRunId(flowRunId);
}
}
@VisibleForTesting
protected CollectorNodemanagerProtocol getNMCollectorService() {
Configuration conf = getConfig();
final YarnRPC rpc = YarnRPC.create(conf);
// TODO Security settings.
return (CollectorNodemanagerProtocol) rpc.getProxy(
CollectorNodemanagerProtocol.class,
nmCollectorServiceAddress, conf);
}
@VisibleForTesting
public String getRestServerBindAddress() {
return timelineRestServerBindAddress;
}
}

View File

@ -53,15 +53,15 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
LogFactory.getLog(PerNodeTimelineCollectorsAuxService.class);
private static final int SHUTDOWN_HOOK_PRIORITY = 30;
private final TimelineCollectorManager collectorManager;
private final NodeTimelineCollectorManager collectorManager;
public PerNodeTimelineCollectorsAuxService() {
// use the same singleton
this(TimelineCollectorManager.getInstance());
this(NodeTimelineCollectorManager.getInstance());
}
@VisibleForTesting PerNodeTimelineCollectorsAuxService(
TimelineCollectorManager collectorsManager) {
NodeTimelineCollectorManager collectorsManager) {
super("timeline_collector");
this.collectorManager = collectorsManager;
}
@ -108,8 +108,7 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
* @return whether it was removed successfully
*/
public boolean removeApplication(ApplicationId appId) {
String appIdString = appId.toString();
return collectorManager.remove(appIdString);
return collectorManager.remove(appId);
}
/**
@ -153,8 +152,8 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
}
@VisibleForTesting
boolean hasApplication(String appId) {
return collectorManager.containsKey(appId);
boolean hasApplication(ApplicationId appId) {
return collectorManager.containsTimelineCollector(appId);
}
@Override
@ -174,7 +173,7 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
@VisibleForTesting
public static PerNodeTimelineCollectorsAuxService
launchServer(String[] args, TimelineCollectorManager collectorManager) {
launchServer(String[] args, NodeTimelineCollectorManager collectorManager) {
Thread
.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(

View File

@ -124,6 +124,6 @@ public abstract class TimelineCollector extends CompositeService {
}
}
protected abstract TimelineCollectorContext getTimelineEntityContext();
public abstract TimelineCollectorContext getTimelineEntityContext();
}

View File

@ -18,173 +18,97 @@
package org.apache.hadoop.yarn.server.timelineservice.collector;
import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER;
import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.http.lib.StaticUserWebFilter;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import com.google.common.annotations.VisibleForTesting;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**
* Class that manages adding and removing collectors and their lifecycle. It
* provides thread safety access to the collectors inside.
*
* It is a singleton, and instances should be obtained via
* {@link #getInstance()}.
*/
@Private
@Unstable
public class TimelineCollectorManager extends CompositeService {
@InterfaceAudience.Private
@InterfaceStability.Unstable
public abstract class TimelineCollectorManager extends AbstractService {
private static final Log LOG =
LogFactory.getLog(TimelineCollectorManager.class);
private static final TimelineCollectorManager INSTANCE =
new TimelineCollectorManager();
// access to this map is synchronized with the map itself
private final Map<String, TimelineCollector> collectors =
private final Map<ApplicationId, TimelineCollector> collectors =
Collections.synchronizedMap(
new HashMap<String, TimelineCollector>());
new HashMap<ApplicationId, TimelineCollector>());
// REST server for this collector manager
private HttpServer2 timelineRestServer;
private String timelineRestServerBindAddress;
private CollectorNodemanagerProtocol nmCollectorService;
private InetSocketAddress nmCollectorServiceAddress;
static final String COLLECTOR_MANAGER_ATTR_KEY = "collector.manager";
static TimelineCollectorManager getInstance() {
return INSTANCE;
}
@VisibleForTesting
protected TimelineCollectorManager() {
super(TimelineCollectorManager.class.getName());
}
@Override
public void serviceInit(Configuration conf) throws Exception {
this.nmCollectorServiceAddress = conf.getSocketAddr(
YarnConfiguration.NM_BIND_HOST,
YarnConfiguration.NM_COLLECTOR_SERVICE_ADDRESS,
YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS,
YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_PORT);
}
@Override
protected void serviceStart() throws Exception {
nmCollectorService = getNMCollectorService();
startWebApp();
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
if (timelineRestServer != null) {
timelineRestServer.stop();
}
super.serviceStop();
protected TimelineCollectorManager(String name) {
super(name);
}
/**
* Put the collector into the collection if an collector mapped by id does
* not exist.
*
* @throws YarnRuntimeException if there was any exception in initializing and
* starting the app level service
* @throws YarnRuntimeException if there was any exception in initializing
* and starting the app level service
* @return the collector associated with id after the potential put.
*/
public TimelineCollector putIfAbsent(ApplicationId appId,
TimelineCollector collector) {
String id = appId.toString();
TimelineCollector collectorInTable;
boolean collectorIsNew = false;
TimelineCollector collectorInTable = null;
synchronized (collectors) {
collectorInTable = collectors.get(id);
collectorInTable = collectors.get(appId);
if (collectorInTable == null) {
try {
// initialize, start, and add it to the collection so it can be
// cleaned up when the parent shuts down
collector.init(getConfig());
collector.start();
collectors.put(id, collector);
LOG.info("the collector for " + id + " was added");
collectors.put(appId, collector);
LOG.info("the collector for " + appId + " was added");
collectorInTable = collector;
collectorIsNew = true;
postPut(appId, collectorInTable);
} catch (Exception e) {
throw new YarnRuntimeException(e);
}
} else {
String msg = "the collector for " + id + " already exists!";
LOG.error(msg);
}
}
// Report to NM if a new collector is added.
if (collectorIsNew) {
try {
updateTimelineCollectorContext(appId, collector);
reportNewCollectorToNM(appId);
} catch (Exception e) {
// throw exception here as it cannot be used if failed communicate with NM
LOG.error("Failed to communicate with NM Collector Service for " + appId);
throw new YarnRuntimeException(e);
LOG.info("the collector for " + appId + " already exists!");
}
}
return collectorInTable;
}
protected void postPut(ApplicationId appId, TimelineCollector collector) {
}
/**
* Removes the collector for the specified id. The collector is also stopped
* as a result. If the collector does not exist, no change is made.
*
* @return whether it was removed successfully
*/
public boolean remove(String id) {
synchronized (collectors) {
TimelineCollector collector = collectors.remove(id);
public boolean remove(ApplicationId appId) {
TimelineCollector collector = collectors.remove(appId);
if (collector == null) {
String msg = "the collector for " + id + " does not exist!";
LOG.error(msg);
return false;
LOG.error("the collector for " + appId + " does not exist!");
} else {
postRemove(appId, collector);
// stop the service to do clean up
collector.stop();
LOG.info("the collector service for " + id + " was removed");
return true;
LOG.info("the collector service for " + appId + " was removed");
}
return collector != null;
}
protected void postRemove(ApplicationId appId, TimelineCollector collector) {
}
/**
@ -192,113 +116,16 @@ public class TimelineCollectorManager extends CompositeService {
*
* @return the collector or null if it does not exist
*/
public TimelineCollector get(String id) {
return collectors.get(id);
public TimelineCollector get(ApplicationId appId) {
return collectors.get(appId);
}
/**
* Returns whether the collector for the specified id exists in this
* collection.
*/
public boolean containsKey(String id) {
return collectors.containsKey(id);
public boolean containsTimelineCollector(ApplicationId appId) {
return collectors.containsKey(appId);
}
/**
* Launch the REST web server for this collector manager
*/
private void startWebApp() {
Configuration conf = getConfig();
String bindAddress = conf.get(YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_BIND_HOST) + ":0";
try {
Configuration confForInfoServer = new Configuration(conf);
confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10);
HttpServer2.Builder builder = new HttpServer2.Builder()
.setName("timeline")
.setConf(conf)
.addEndpoint(URI.create(
(YarnConfiguration.useHttps(conf) ? "https://" : "http://") +
bindAddress));
timelineRestServer = builder.build();
// TODO: replace this by an authentication filter in future.
HashMap<String, String> options = new HashMap<>();
String username = conf.get(HADOOP_HTTP_STATIC_USER,
DEFAULT_HADOOP_HTTP_STATIC_USER);
options.put(HADOOP_HTTP_STATIC_USER, username);
HttpServer2.defineFilter(timelineRestServer.getWebAppContext(),
"static_user_filter_timeline",
StaticUserWebFilter.StaticUserFilter.class.getName(),
options, new String[] {"/*"});
timelineRestServer.addJerseyResourcePackage(
TimelineCollectorWebService.class.getPackage().getName() + ";"
+ GenericExceptionHandler.class.getPackage().getName() + ";"
+ YarnJacksonJaxbJsonProvider.class.getPackage().getName(),
"/*");
timelineRestServer.setAttribute(COLLECTOR_MANAGER_ATTR_KEY, this);
timelineRestServer.start();
} catch (Exception e) {
String msg = "The per-node collector webapp failed to start.";
LOG.error(msg, e);
throw new YarnRuntimeException(msg, e);
}
//TODO: We need to think of the case of multiple interfaces
this.timelineRestServerBindAddress = WebAppUtils.getResolvedAddress(
timelineRestServer.getConnectorAddress(0));
LOG.info("Instantiated the per-node collector webapp at " +
timelineRestServerBindAddress);
}
private void reportNewCollectorToNM(ApplicationId appId)
throws YarnException, IOException {
ReportNewCollectorInfoRequest request =
ReportNewCollectorInfoRequest.newInstance(appId,
this.timelineRestServerBindAddress);
LOG.info("Report a new collector for application: " + appId +
" to the NM Collector Service.");
nmCollectorService.reportNewCollectorInfo(request);
}
private void updateTimelineCollectorContext(
ApplicationId appId, TimelineCollector collector)
throws YarnException, IOException {
GetTimelineCollectorContextRequest request =
GetTimelineCollectorContextRequest.newInstance(appId);
LOG.info("Get timeline collector context for " + appId);
GetTimelineCollectorContextResponse response =
nmCollectorService.getTimelineCollectorContext(request);
String userId = response.getUserId();
if (userId != null && !userId.isEmpty()) {
collector.getTimelineEntityContext().setUserId(userId);
}
String flowName = response.getFlowName();
if (flowName != null && !flowName.isEmpty()) {
collector.getTimelineEntityContext().setFlowName(flowName);
}
String flowVersion = response.getFlowVersion();
if (flowVersion != null && !flowVersion.isEmpty()) {
collector.getTimelineEntityContext().setFlowVersion(flowVersion);
}
long flowRunId = response.getFlowRunId();
if (flowRunId != 0L) {
collector.getTimelineEntityContext().setFlowRunId(flowRunId);
}
}
@VisibleForTesting
protected CollectorNodemanagerProtocol getNMCollectorService() {
Configuration conf = getConfig();
final YarnRPC rpc = YarnRPC.create(conf);
// TODO Security settings.
return (CollectorNodemanagerProtocol) rpc.getProxy(
CollectorNodemanagerProtocol.class,
nmCollectorServiceAddress, conf);
}
@VisibleForTesting
public String getRestServerBindAddress() {
return timelineRestServerBindAddress;
}
}

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.webapp.ForbiddenException;
@ -129,11 +130,14 @@ public class TimelineCollectorWebService {
boolean isAsync = async != null && async.trim().equalsIgnoreCase("true");
try {
appId = parseApplicationId(appId);
if (appId == null) {
ApplicationId appID = parseApplicationId(appId);
if (appID == null) {
return Response.status(Response.Status.BAD_REQUEST).build();
}
TimelineCollector collector = getCollector(req, appId);
NodeTimelineCollectorManager collectorManager =
(NodeTimelineCollectorManager) context.getAttribute(
NodeTimelineCollectorManager.COLLECTOR_MANAGER_ATTR_KEY);
TimelineCollector collector = collectorManager.get(appID);
if (collector == null) {
LOG.error("Application: "+ appId + " is not found");
throw new NotFoundException(); // different exception?
@ -147,10 +151,10 @@ public class TimelineCollectorWebService {
}
}
private String parseApplicationId(String appId) {
private ApplicationId parseApplicationId(String appId) {
try {
if (appId != null) {
return ConverterUtils.toApplicationId(appId.trim()).toString();
return ConverterUtils.toApplicationId(appId.trim());
} else {
return null;
}
@ -159,15 +163,6 @@ public class TimelineCollectorWebService {
}
}
private TimelineCollector
getCollector(HttpServletRequest req, String appIdToParse) {
String appIdString = parseApplicationId(appIdToParse);
final TimelineCollectorManager collectorManager =
(TimelineCollectorManager) context.getAttribute(
TimelineCollectorManager.COLLECTOR_MANAGER_ATTR_KEY);
return collectorManager.get(appIdString);
}
private void init(HttpServletResponse response) {
response.setContentType(null);
}

View File

@ -49,8 +49,8 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestTimelineCollectorManager {
private TimelineCollectorManager collectorManager;
public class TestNMTimelineCollectorManager {
private NodeTimelineCollectorManager collectorManager;
@Before
public void setup() throws Exception {
@ -103,7 +103,7 @@ public class TestTimelineCollectorManager {
// check the keys
for (int i = 0; i < NUM_APPS; i++) {
final ApplicationId appId = ApplicationId.newInstance(0L, i);
assertTrue(collectorManager.containsKey(appId.toString()));
assertTrue(collectorManager.containsTimelineCollector(appId));
}
}
@ -119,7 +119,7 @@ public class TestTimelineCollectorManager {
new AppLevelTimelineCollector(appId);
boolean successPut =
(collectorManager.putIfAbsent(appId, collector) == collector);
return successPut && collectorManager.remove(appId.toString());
return successPut && collectorManager.remove(appId);
}
};
tasks.add(task);
@ -136,13 +136,13 @@ public class TestTimelineCollectorManager {
// check the keys
for (int i = 0; i < NUM_APPS; i++) {
final ApplicationId appId = ApplicationId.newInstance(0L, i);
assertFalse(collectorManager.containsKey(appId.toString()));
assertFalse(collectorManager.containsTimelineCollector(appId));
}
}
private TimelineCollectorManager createCollectorManager() {
final TimelineCollectorManager collectorManager =
spy(new TimelineCollectorManager());
private NodeTimelineCollectorManager createCollectorManager() {
final NodeTimelineCollectorManager collectorManager =
spy(new NodeTimelineCollectorManager());
doReturn(new Configuration()).when(collectorManager).getConfig();
CollectorNodemanagerProtocol nmCollectorService =
mock(CollectorNodemanagerProtocol.class);

View File

@ -67,8 +67,7 @@ public class TestPerNodeTimelineCollectorsAuxService {
public void testAddApplication() throws Exception {
auxService = createCollectorAndAddApplication();
// auxService should have a single app
assertTrue(auxService.hasApplication(
appAttemptId.getApplicationId().toString()));
assertTrue(auxService.hasApplication(appAttemptId.getApplicationId()));
auxService.close();
}
@ -82,16 +81,14 @@ public class TestPerNodeTimelineCollectorsAuxService {
when(context.getContainerId()).thenReturn(containerId);
auxService.initializeContainer(context);
// auxService should not have that app
assertFalse(auxService.hasApplication(
appAttemptId.getApplicationId().toString()));
assertFalse(auxService.hasApplication(appAttemptId.getApplicationId()));
}
@Test
public void testRemoveApplication() throws Exception {
auxService = createCollectorAndAddApplication();
// auxService should have a single app
String appIdStr = appAttemptId.getApplicationId().toString();
assertTrue(auxService.hasApplication(appIdStr));
assertTrue(auxService.hasApplication(appAttemptId.getApplicationId()));
ContainerId containerId = getAMContainerId();
ContainerTerminationContext context =
@ -99,7 +96,7 @@ public class TestPerNodeTimelineCollectorsAuxService {
when(context.getContainerId()).thenReturn(containerId);
auxService.stopContainer(context);
// auxService should not have that app
assertFalse(auxService.hasApplication(appIdStr));
assertFalse(auxService.hasApplication(appAttemptId.getApplicationId()));
auxService.close();
}
@ -107,8 +104,7 @@ public class TestPerNodeTimelineCollectorsAuxService {
public void testRemoveApplicationNonAMContainer() throws Exception {
auxService = createCollectorAndAddApplication();
// auxService should have a single app
String appIdStr = appAttemptId.getApplicationId().toString();
assertTrue(auxService.hasApplication(appIdStr));
assertTrue(auxService.hasApplication(appAttemptId.getApplicationId()));
ContainerId containerId = getContainerId(2L); // not an AM
ContainerTerminationContext context =
@ -116,7 +112,7 @@ public class TestPerNodeTimelineCollectorsAuxService {
when(context.getContainerId()).thenReturn(containerId);
auxService.stopContainer(context);
// auxService should still have that app
assertTrue(auxService.hasApplication(appIdStr));
assertTrue(auxService.hasApplication(appAttemptId.getApplicationId()));
auxService.close();
}
@ -147,7 +143,7 @@ public class TestPerNodeTimelineCollectorsAuxService {
}
private PerNodeTimelineCollectorsAuxService createCollector() {
TimelineCollectorManager collectorManager = createCollectorManager();
NodeTimelineCollectorManager collectorManager = createCollectorManager();
PerNodeTimelineCollectorsAuxService auxService =
spy(new PerNodeTimelineCollectorsAuxService(collectorManager));
auxService.init(new YarnConfiguration());
@ -155,9 +151,9 @@ public class TestPerNodeTimelineCollectorsAuxService {
return auxService;
}
private TimelineCollectorManager createCollectorManager() {
TimelineCollectorManager collectorManager =
spy(new TimelineCollectorManager());
private NodeTimelineCollectorManager createCollectorManager() {
NodeTimelineCollectorManager collectorManager =
spy(new NodeTimelineCollectorManager());
doReturn(new Configuration()).when(collectorManager).getConfig();
CollectorNodemanagerProtocol nmCollectorService =
mock(CollectorNodemanagerProtocol.class);