YARN-3210. Refactored timeline aggregator according to new code organization proposed in YARN-3166. Contributed by Li Lu.

(cherry picked from commit d3ff7f06cbc66d3a23c2551e7d4c752689f46afe)
This commit is contained in:
Zhijie Shen 2015-03-03 11:21:03 -08:00 committed by Sangjin Lee
parent d45ff878c4
commit bf54d32750
15 changed files with 326 additions and 349 deletions

View File

@ -78,7 +78,7 @@
import org.apache.hadoop.yarn.server.timeline.TimelineVersionWatcher; import org.apache.hadoop.yarn.server.timeline.TimelineVersionWatcher;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeAggregatorServer; import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeTimelineAggregatorsAuxService;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -159,7 +159,7 @@ private void setupInternal(int numNodeManager, float timelineVersion)
// enable aux-service based timeline aggregators // enable aux-service based timeline aggregators
conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME); conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME);
conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + TIMELINE_AUX_SERVICE_NAME conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + TIMELINE_AUX_SERVICE_NAME
+ ".class", PerNodeAggregatorServer.class.getName()); + ".class", PerNodeTimelineAggregatorsAuxService.class.getName());
} else { } else {
Assert.fail("Wrong timeline version number: " + timelineVersion); Assert.fail("Wrong timeline version number: " + timelineVersion);
} }

View File

@ -52,11 +52,6 @@
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId> <artifactId>hadoop-yarn-api</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-timelineservice</artifactId>
<version>${project.version}</version>
</dependency>
<dependency> <dependency>
<groupId>javax.xml.bind</groupId> <groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId> <artifactId>jaxb-api</artifactId>

View File

@ -30,9 +30,6 @@
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.ResourceView; import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.timelineservice.aggregator.AppLevelServiceManager;
import org.apache.hadoop.yarn.server.timelineservice.aggregator.AppLevelServiceManagerProvider;
import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeAggregatorWebService;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.WebApp; import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.WebApps; import org.apache.hadoop.yarn.webapp.WebApps;

View File

@ -6,7 +6,7 @@
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeAggregatorServer; import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeTimelineAggregatorsAuxService;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
@ -14,13 +14,13 @@
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
public class TestTimelineServiceClientIntegration { public class TestTimelineServiceClientIntegration {
private static PerNodeAggregatorServer server; private static PerNodeTimelineAggregatorsAuxService auxService;
@BeforeClass @BeforeClass
public static void setupClass() throws Exception { public static void setupClass() throws Exception {
try { try {
server = PerNodeAggregatorServer.launchServer(new String[0]); auxService = PerNodeTimelineAggregatorsAuxService.launchServer(new String[0]);
server.addApplication(ApplicationId.newInstance(0, 1)); auxService.addApplication(ApplicationId.newInstance(0, 1));
} catch (ExitUtil.ExitException e) { } catch (ExitUtil.ExitException e) {
fail(); fail();
} }
@ -28,8 +28,8 @@ public static void setupClass() throws Exception {
@AfterClass @AfterClass
public static void tearDownClass() throws Exception { public static void tearDownClass() throws Exception {
if (server != null) { if (auxService != null) {
server.stop(); auxService.stop();
} }
} }

View File

@ -1,136 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.aggregator;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
/**
* Class that manages adding and removing app level aggregator services and
* their lifecycle. It provides thread safety access to the app level services.
*
* It is a singleton, and instances should be obtained via
* {@link #getInstance()}.
*/
@Private
@Unstable
public class AppLevelServiceManager extends CompositeService {
private static final Log LOG =
LogFactory.getLog(AppLevelServiceManager.class);
private static final AppLevelServiceManager INSTANCE =
new AppLevelServiceManager();
// access to this map is synchronized with the map itself
private final Map<String,AppLevelAggregatorService> services =
Collections.synchronizedMap(
new HashMap<String,AppLevelAggregatorService>());
static AppLevelServiceManager getInstance() {
return INSTANCE;
}
AppLevelServiceManager() {
super(AppLevelServiceManager.class.getName());
}
/**
* Creates and adds an app level aggregator service for the specified
* application id. The service is also initialized and started. If the service
* already exists, no new service is created.
*
* @throws YarnRuntimeException if there was any exception in initializing and
* starting the app level service
* @return whether it was added successfully
*/
public boolean addService(String appId) {
synchronized (services) {
AppLevelAggregatorService service = services.get(appId);
if (service == null) {
try {
service = new AppLevelAggregatorService(appId);
// initialize, start, and add it to the parent service so it can be
// cleaned up when the parent shuts down
service.init(getConfig());
service.start();
services.put(appId, service);
LOG.info("the application aggregator service for " + appId +
" was added");
return true;
} catch (Exception e) {
throw new YarnRuntimeException(e);
}
} else {
String msg = "the application aggregator service for " + appId +
" already exists!";
LOG.error(msg);
return false;
}
}
}
/**
* Removes the app level aggregator service for the specified application id.
* The service is also stopped as a result. If the service does not exist, no
* change is made.
*
* @return whether it was removed successfully
*/
public boolean removeService(String appId) {
synchronized (services) {
AppLevelAggregatorService service = services.remove(appId);
if (service == null) {
String msg = "the application aggregator service for " + appId +
" does not exist!";
LOG.error(msg);
return false;
} else {
// stop the service to do clean up
service.stop();
LOG.info("the application aggregator service for " + appId +
" was removed");
return true;
}
}
}
/**
* Returns the app level aggregator service for the specified application id.
*
* @return the app level aggregator service or null if it does not exist
*/
public AppLevelAggregatorService getService(String appId) {
return services.get(appId);
}
/**
* Returns whether the app level aggregator service for the specified
* application id exists.
*/
public boolean hasService(String appId) {
return services.containsKey(appId);
}
}

View File

@ -1,33 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.aggregator;
import com.google.inject.Provider;
/**
* A guice provider that provides a global singleton instance of
* AppLevelServiceManager.
*/
public class AppLevelServiceManagerProvider
implements Provider<AppLevelServiceManager> {
@Override
public AppLevelServiceManager get() {
return AppLevelServiceManager.getInstance();
}
}

View File

@ -30,12 +30,12 @@
*/ */
@Private @Private
@Unstable @Unstable
public class AppLevelAggregatorService extends BaseAggregatorService { public class AppLevelTimelineAggregator extends TimelineAggregator {
private final String applicationId; private final String applicationId;
// TODO define key metadata such as flow metadata, user, and queue // TODO define key metadata such as flow metadata, user, and queue
public AppLevelAggregatorService(String applicationId) { public AppLevelTimelineAggregator(String applicationId) {
super(AppLevelAggregatorService.class.getName() + " - " + applicationId); super(AppLevelTimelineAggregator.class.getName() + " - " + applicationId);
this.applicationId = applicationId; this.applicationId = applicationId;
} }

View File

@ -18,16 +18,13 @@
package org.apache.hadoop.yarn.server.timelineservice.aggregator; package org.apache.hadoop.yarn.server.timelineservice.aggregator;
import java.net.URI;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.HashMap;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.lib.StaticUserWebFilter;
import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
@ -35,144 +32,91 @@
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryService; import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.server.api.ContainerContext; import org.apache.hadoop.yarn.server.api.ContainerContext;
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext; import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.apache.hadoop.http.HttpServer2;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER;
import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER;
/** /**
* The top-level server for the per-node timeline aggregator service. Currently * The top-level server for the per-node timeline aggregator collection. Currently
* it is defined as an auxiliary service to accommodate running within another * it is defined as an auxiliary service to accommodate running within another
* daemon (e.g. node manager). * daemon (e.g. node manager).
*/ */
@Private @Private
@Unstable @Unstable
public class PerNodeAggregatorServer extends AuxiliaryService { public class PerNodeTimelineAggregatorsAuxService extends AuxiliaryService {
private static final Log LOG = private static final Log LOG =
LogFactory.getLog(PerNodeAggregatorServer.class); LogFactory.getLog(PerNodeTimelineAggregatorsAuxService.class);
private static final int SHUTDOWN_HOOK_PRIORITY = 30; private static final int SHUTDOWN_HOOK_PRIORITY = 30;
static final String AGGREGATOR_COLLECTION_ATTR_KEY = "aggregator.collection";
private final AppLevelServiceManager serviceManager; private final TimelineAggregatorsCollection aggregatorCollection;
private HttpServer2 timelineRestServer;
public PerNodeAggregatorServer() { public PerNodeTimelineAggregatorsAuxService() {
// use the same singleton // use the same singleton
this(AppLevelServiceManager.getInstance()); this(TimelineAggregatorsCollection.getInstance());
} }
@VisibleForTesting @VisibleForTesting PerNodeTimelineAggregatorsAuxService(
PerNodeAggregatorServer(AppLevelServiceManager serviceManager) { TimelineAggregatorsCollection aggregatorCollection) {
super("timeline_aggregator"); super("timeline_aggregator");
this.serviceManager = serviceManager; this.aggregatorCollection = aggregatorCollection;
} }
@Override @Override
protected void serviceInit(Configuration conf) throws Exception { protected void serviceInit(Configuration conf) throws Exception {
serviceManager.init(conf); aggregatorCollection.init(conf);
super.serviceInit(conf); super.serviceInit(conf);
} }
@Override @Override
protected void serviceStart() throws Exception { protected void serviceStart() throws Exception {
aggregatorCollection.start();
super.serviceStart(); super.serviceStart();
serviceManager.start();
startWebApp();
} }
@Override @Override
protected void serviceStop() throws Exception { protected void serviceStop() throws Exception {
if (timelineRestServer != null) { aggregatorCollection.stop();
timelineRestServer.stop();
}
// stop the service manager
serviceManager.stop();
super.serviceStop(); super.serviceStop();
} }
private void startWebApp() {
Configuration conf = getConfig();
// use the same ports as the old ATS for now; we could create new properties
// for the new timeline service if needed
String bindAddress = WebAppUtils.getWebAppBindURL(conf,
YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
WebAppUtils.getAHSWebAppURLWithoutScheme(conf));
LOG.info("Instantiating the per-node aggregator webapp at " + bindAddress);
try {
Configuration confForInfoServer = new Configuration(conf);
confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10);
HttpServer2.Builder builder = new HttpServer2.Builder()
.setName("timeline")
.setConf(conf)
.addEndpoint(URI.create("http://" + bindAddress));
timelineRestServer = builder.build();
// TODO: replace this by an authentication filter in future.
HashMap<String, String> options = new HashMap<>();
String username = conf.get(HADOOP_HTTP_STATIC_USER,
DEFAULT_HADOOP_HTTP_STATIC_USER);
options.put(HADOOP_HTTP_STATIC_USER, username);
HttpServer2.defineFilter(timelineRestServer.getWebAppContext(),
"static_user_filter_timeline",
StaticUserWebFilter.StaticUserFilter.class.getName(),
options, new String[] {"/*"});
timelineRestServer.addJerseyResourcePackage(
PerNodeAggregatorWebService.class.getPackage().getName() + ";"
+ GenericExceptionHandler.class.getPackage().getName() + ";"
+ YarnJacksonJaxbJsonProvider.class.getPackage().getName(),
"/*");
timelineRestServer.setAttribute(AGGREGATOR_COLLECTION_ATTR_KEY,
AppLevelServiceManager.getInstance());
timelineRestServer.start();
} catch (Exception e) {
String msg = "The per-node aggregator webapp failed to start.";
LOG.error(msg, e);
throw new YarnRuntimeException(msg, e);
}
}
// these methods can be used as the basis for future service methods if the // these methods can be used as the basis for future service methods if the
// per-node aggregator runs separate from the node manager // per-node aggregator runs separate from the node manager
/** /**
* Creates and adds an app level aggregator service for the specified * Creates and adds an app level aggregator for the specified application id.
* application id. The service is also initialized and started. If the service * The aggregator is also initialized and started. If the service already
* already exists, no new service is created. * exists, no new service is created.
* *
* @return whether it was added successfully * @return whether it was added successfully
*/ */
public boolean addApplication(ApplicationId appId) { public boolean addApplication(ApplicationId appId) {
String appIdString = appId.toString(); String appIdString = appId.toString();
return serviceManager.addService(appIdString); AppLevelTimelineAggregator aggregator =
new AppLevelTimelineAggregator(appIdString);
return (aggregatorCollection.putIfAbsent(appIdString, aggregator)
== aggregator);
} }
/** /**
* Removes the app level aggregator service for the specified application id. * Removes the app level aggregator for the specified application id. The
* The service is also stopped as a result. If the service does not exist, no * aggregator is also stopped as a result. If the aggregator does not exist, no
* change is made. * change is made.
* *
* @return whether it was removed successfully * @return whether it was removed successfully
*/ */
public boolean removeApplication(ApplicationId appId) { public boolean removeApplication(ApplicationId appId) {
String appIdString = appId.toString(); String appIdString = appId.toString();
return serviceManager.removeService(appIdString); return aggregatorCollection.remove(appIdString);
} }
/** /**
* Creates and adds an app level aggregator service for the specified * Creates and adds an app level aggregator for the specified application id.
* application id. The service is also initialized and started. If the service * The aggregator is also initialized and started. If the aggregator already
* already exists, no new service is created. * exists, no new aggregator is created.
*/ */
@Override @Override
public void initializeContainer(ContainerInitializationContext context) { public void initializeContainer(ContainerInitializationContext context) {
@ -186,8 +130,8 @@ public void initializeContainer(ContainerInitializationContext context) {
} }
/** /**
* Removes the app level aggregator service for the specified application id. * Removes the app level aggregator for the specified application id. The
* The service is also stopped as a result. If the service does not exist, no * aggregator is also stopped as a result. If the aggregator does not exist, no
* change is made. * change is made.
*/ */
@Override @Override
@ -211,7 +155,7 @@ private boolean isApplicationMaster(ContainerContext context) {
@VisibleForTesting @VisibleForTesting
boolean hasApplication(String appId) { boolean hasApplication(String appId) {
return serviceManager.hasService(appId); return aggregatorCollection.containsKey(appId);
} }
@Override @Override
@ -230,35 +174,35 @@ public ByteBuffer getMetaData() {
} }
@VisibleForTesting @VisibleForTesting
public static PerNodeAggregatorServer launchServer(String[] args) { public static PerNodeTimelineAggregatorsAuxService launchServer(String[] args) {
Thread Thread
.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(PerNodeAggregatorServer.class, args, StringUtils.startupShutdownMessage(PerNodeTimelineAggregatorsAuxService.class, args,
LOG); LOG);
PerNodeAggregatorServer server = null; PerNodeTimelineAggregatorsAuxService auxService = null;
try { try {
server = new PerNodeAggregatorServer(); auxService = new PerNodeTimelineAggregatorsAuxService();
ShutdownHookManager.get().addShutdownHook(new ShutdownHook(server), ShutdownHookManager.get().addShutdownHook(new ShutdownHook(auxService),
SHUTDOWN_HOOK_PRIORITY); SHUTDOWN_HOOK_PRIORITY);
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = new YarnConfiguration();
server.init(conf); auxService.init(conf);
server.start(); auxService.start();
} catch (Throwable t) { } catch (Throwable t) {
LOG.fatal("Error starting PerNodeAggregatorServer", t); LOG.fatal("Error starting PerNodeAggregatorServer", t);
ExitUtil.terminate(-1, "Error starting PerNodeAggregatorServer"); ExitUtil.terminate(-1, "Error starting PerNodeAggregatorServer");
} }
return server; return auxService;
} }
private static class ShutdownHook implements Runnable { private static class ShutdownHook implements Runnable {
private final PerNodeAggregatorServer server; private final PerNodeTimelineAggregatorsAuxService auxService;
public ShutdownHook(PerNodeAggregatorServer server) { public ShutdownHook(PerNodeTimelineAggregatorsAuxService auxService) {
this.server = server; this.auxService = auxService;
} }
public void run() { public void run() {
server.stop(); auxService.stop();
} }
} }

View File

@ -31,15 +31,15 @@
* Service that handles writes to the timeline service and writes them to the * Service that handles writes to the timeline service and writes them to the
* backing storage. * backing storage.
* *
* Classes that extend this can add their own lifecycle management or * Classes that extend this can putIfAbsent their own lifecycle management or
* customization of request handling. * customization of request handling.
*/ */
@Private @Private
@Unstable @Unstable
public class BaseAggregatorService extends CompositeService { public abstract class TimelineAggregator extends CompositeService {
private static final Log LOG = LogFactory.getLog(BaseAggregatorService.class); private static final Log LOG = LogFactory.getLog(TimelineAggregator.class);
public BaseAggregatorService(String name) { public TimelineAggregator(String name) {
super(name); super(name);
} }

View File

@ -53,9 +53,9 @@
@Unstable @Unstable
@Singleton @Singleton
@Path("/ws/v2/timeline") @Path("/ws/v2/timeline")
public class PerNodeAggregatorWebService { public class TimelineAggregatorWebService {
private static final Log LOG = private static final Log LOG =
LogFactory.getLog(PerNodeAggregatorWebService.class); LogFactory.getLog(TimelineAggregatorWebService.class);
private @Context ServletContext context; private @Context ServletContext context;
@ -128,7 +128,7 @@ public Response putEntities(
if (appId == null) { if (appId == null) {
return Response.status(Response.Status.BAD_REQUEST).build(); return Response.status(Response.Status.BAD_REQUEST).build();
} }
AppLevelAggregatorService service = getAggregatorService(req, appId); TimelineAggregator service = getAggregatorService(req, appId);
if (service == null) { if (service == null) {
LOG.error("Application not found"); LOG.error("Application not found");
throw new NotFoundException(); // different exception? throw new NotFoundException(); // different exception?
@ -156,13 +156,13 @@ private String parseApplicationId(String appId) {
} }
} }
private AppLevelAggregatorService private TimelineAggregator
getAggregatorService(HttpServletRequest req, String appIdToParse) { getAggregatorService(HttpServletRequest req, String appIdToParse) {
String appIdString = parseApplicationId(appIdToParse); String appIdString = parseApplicationId(appIdToParse);
final AppLevelServiceManager serviceManager = final TimelineAggregatorsCollection aggregatorCollection =
(AppLevelServiceManager) context.getAttribute( (TimelineAggregatorsCollection) context.getAttribute(
PerNodeAggregatorServer.AGGREGATOR_COLLECTION_ATTR_KEY); TimelineAggregatorsCollection.AGGREGATOR_COLLECTION_ATTR_KEY);
return serviceManager.getService(appIdString); return aggregatorCollection.get(appIdString);
} }
private void init(HttpServletResponse response) { private void init(HttpServletResponse response) {

View File

@ -0,0 +1,203 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.aggregator;
import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.http.lib.StaticUserWebFilter;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER;
import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER;
/**
* Class that manages adding and removing aggregators and their lifecycle. It
* provides thread safety access to the aggregators inside.
*
* It is a singleton, and instances should be obtained via
* {@link #getInstance()}.
*/
@Private
@Unstable
public class TimelineAggregatorsCollection extends CompositeService {
private static final Log LOG =
LogFactory.getLog(TimelineAggregatorsCollection.class);
private static final TimelineAggregatorsCollection INSTANCE =
new TimelineAggregatorsCollection();
// access to this map is synchronized with the map itself
private final Map<String, TimelineAggregator> aggregators =
Collections.synchronizedMap(
new HashMap<String, TimelineAggregator>());
// REST server for this aggregator collection
private HttpServer2 timelineRestServer;
static final String AGGREGATOR_COLLECTION_ATTR_KEY = "aggregator.collection";
static TimelineAggregatorsCollection getInstance() {
return INSTANCE;
}
TimelineAggregatorsCollection() {
super(TimelineAggregatorsCollection.class.getName());
}
@Override
protected void serviceStart() throws Exception {
startWebApp();
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
if (timelineRestServer != null) {
timelineRestServer.stop();
}
super.serviceStop();
}
/**
* Put the aggregator into the collection if an aggregator mapped by id does
* not exist.
*
* @throws YarnRuntimeException if there was any exception in initializing and
* starting the app level service
* @return the aggregator associated with id after the potential put.
*/
public TimelineAggregator putIfAbsent(String id, TimelineAggregator aggregator) {
synchronized (aggregators) {
TimelineAggregator aggregatorInTable = aggregators.get(id);
if (aggregatorInTable == null) {
try {
// initialize, start, and add it to the collection so it can be
// cleaned up when the parent shuts down
aggregator.init(getConfig());
aggregator.start();
aggregators.put(id, aggregator);
LOG.info("the aggregator for " + id + " was added");
return aggregator;
} catch (Exception e) {
throw new YarnRuntimeException(e);
}
} else {
String msg = "the aggregator for " + id + " already exists!";
LOG.error(msg);
return aggregatorInTable;
}
}
}
/**
* Removes the aggregator for the specified id. The aggregator is also stopped
* as a result. If the aggregator does not exist, no change is made.
*
* @return whether it was removed successfully
*/
public boolean remove(String id) {
synchronized (aggregators) {
TimelineAggregator aggregator = aggregators.remove(id);
if (aggregator == null) {
String msg = "the aggregator for " + id + " does not exist!";
LOG.error(msg);
return false;
} else {
// stop the service to do clean up
aggregator.stop();
LOG.info("the aggregator service for " + id + " was removed");
return true;
}
}
}
/**
* Returns the aggregator for the specified id.
*
* @return the aggregator or null if it does not exist
*/
public TimelineAggregator get(String id) {
return aggregators.get(id);
}
/**
* Returns whether the aggregator for the specified id exists in this
* collection.
*/
public boolean containsKey(String id) {
return aggregators.containsKey(id);
}
/**
* Launch the REST web server for this aggregator collection
*/
private void startWebApp() {
Configuration conf = getConfig();
// use the same ports as the old ATS for now; we could create new properties
// for the new timeline service if needed
String bindAddress = WebAppUtils.getWebAppBindURL(conf,
YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
WebAppUtils.getAHSWebAppURLWithoutScheme(conf));
LOG.info("Instantiating the per-node aggregator webapp at " + bindAddress);
try {
Configuration confForInfoServer = new Configuration(conf);
confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10);
HttpServer2.Builder builder = new HttpServer2.Builder()
.setName("timeline")
.setConf(conf)
.addEndpoint(URI.create("http://" + bindAddress));
timelineRestServer = builder.build();
// TODO: replace this by an authentication filter in future.
HashMap<String, String> options = new HashMap<>();
String username = conf.get(HADOOP_HTTP_STATIC_USER,
DEFAULT_HADOOP_HTTP_STATIC_USER);
options.put(HADOOP_HTTP_STATIC_USER, username);
HttpServer2.defineFilter(timelineRestServer.getWebAppContext(),
"static_user_filter_timeline",
StaticUserWebFilter.StaticUserFilter.class.getName(),
options, new String[] {"/*"});
timelineRestServer.addJerseyResourcePackage(
TimelineAggregatorWebService.class.getPackage().getName() + ";"
+ GenericExceptionHandler.class.getPackage().getName() + ";"
+ YarnJacksonJaxbJsonProvider.class.getPackage().getName(),
"/*");
timelineRestServer.setAttribute(AGGREGATOR_COLLECTION_ATTR_KEY,
TimelineAggregatorsCollection.getInstance());
timelineRestServer.start();
} catch (Exception e) {
String msg = "The per-node aggregator webapp failed to start.";
LOG.error(msg, e);
throw new YarnRuntimeException(msg, e);
}
}
}

View File

@ -19,5 +19,5 @@
package org.apache.hadoop.yarn.server.timelineservice.aggregator; package org.apache.hadoop.yarn.server.timelineservice.aggregator;
public class TestAppLevelAggregatorService { public class TestAppLevelTimelineAggregator {
} }

View File

@ -36,10 +36,10 @@
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
import org.junit.Test; import org.junit.Test;
public class TestPerNodeAggregatorServer { public class TestPerNodeTimelineAggregatorsAuxService {
private ApplicationAttemptId appAttemptId; private ApplicationAttemptId appAttemptId;
public TestPerNodeAggregatorServer() { public TestPerNodeTimelineAggregatorsAuxService() {
ApplicationId appId = ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1); ApplicationId.newInstance(System.currentTimeMillis(), 1);
appAttemptId = ApplicationAttemptId.newInstance(appId, 1); appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
@ -47,96 +47,97 @@ public TestPerNodeAggregatorServer() {
@Test @Test
public void testAddApplication() throws Exception { public void testAddApplication() throws Exception {
PerNodeAggregatorServer aggregator = createAggregatorAndAddApplication(); PerNodeTimelineAggregatorsAuxService auxService = createAggregatorAndAddApplication();
// aggregator should have a single app // auxService should have a single app
assertTrue(aggregator.hasApplication( assertTrue(auxService.hasApplication(
appAttemptId.getApplicationId().toString())); appAttemptId.getApplicationId().toString()));
aggregator.close(); auxService.close();
} }
@Test @Test
public void testAddApplicationNonAMContainer() throws Exception { public void testAddApplicationNonAMContainer() throws Exception {
PerNodeAggregatorServer aggregator = createAggregator(); PerNodeTimelineAggregatorsAuxService auxService = createAggregator();
ContainerId containerId = getContainerId(2L); // not an AM ContainerId containerId = getContainerId(2L); // not an AM
ContainerInitializationContext context = ContainerInitializationContext context =
mock(ContainerInitializationContext.class); mock(ContainerInitializationContext.class);
when(context.getContainerId()).thenReturn(containerId); when(context.getContainerId()).thenReturn(containerId);
aggregator.initializeContainer(context); auxService.initializeContainer(context);
// aggregator should not have that app // auxService should not have that app
assertFalse(aggregator.hasApplication( assertFalse(auxService.hasApplication(
appAttemptId.getApplicationId().toString())); appAttemptId.getApplicationId().toString()));
} }
@Test @Test
public void testRemoveApplication() throws Exception { public void testRemoveApplication() throws Exception {
PerNodeAggregatorServer aggregator = createAggregatorAndAddApplication(); PerNodeTimelineAggregatorsAuxService auxService = createAggregatorAndAddApplication();
// aggregator should have a single app // auxService should have a single app
String appIdStr = appAttemptId.getApplicationId().toString(); String appIdStr = appAttemptId.getApplicationId().toString();
assertTrue(aggregator.hasApplication(appIdStr)); assertTrue(auxService.hasApplication(appIdStr));
ContainerId containerId = getAMContainerId(); ContainerId containerId = getAMContainerId();
ContainerTerminationContext context = ContainerTerminationContext context =
mock(ContainerTerminationContext.class); mock(ContainerTerminationContext.class);
when(context.getContainerId()).thenReturn(containerId); when(context.getContainerId()).thenReturn(containerId);
aggregator.stopContainer(context); auxService.stopContainer(context);
// aggregator should not have that app // auxService should not have that app
assertFalse(aggregator.hasApplication(appIdStr)); assertFalse(auxService.hasApplication(appIdStr));
aggregator.close(); auxService.close();
} }
@Test @Test
public void testRemoveApplicationNonAMContainer() throws Exception { public void testRemoveApplicationNonAMContainer() throws Exception {
PerNodeAggregatorServer aggregator = createAggregatorAndAddApplication(); PerNodeTimelineAggregatorsAuxService auxService = createAggregatorAndAddApplication();
// aggregator should have a single app // auxService should have a single app
String appIdStr = appAttemptId.getApplicationId().toString(); String appIdStr = appAttemptId.getApplicationId().toString();
assertTrue(aggregator.hasApplication(appIdStr)); assertTrue(auxService.hasApplication(appIdStr));
ContainerId containerId = getContainerId(2L); // not an AM ContainerId containerId = getContainerId(2L); // not an AM
ContainerTerminationContext context = ContainerTerminationContext context =
mock(ContainerTerminationContext.class); mock(ContainerTerminationContext.class);
when(context.getContainerId()).thenReturn(containerId); when(context.getContainerId()).thenReturn(containerId);
aggregator.stopContainer(context); auxService.stopContainer(context);
// aggregator should still have that app // auxService should still have that app
assertTrue(aggregator.hasApplication(appIdStr)); assertTrue(auxService.hasApplication(appIdStr));
aggregator.close(); auxService.close();
} }
@Test(timeout = 60000) @Test(timeout = 60000)
public void testLaunch() throws Exception { public void testLaunch() throws Exception {
ExitUtil.disableSystemExit(); ExitUtil.disableSystemExit();
PerNodeAggregatorServer server = null; PerNodeTimelineAggregatorsAuxService auxService = null;
try { try {
server = auxService =
PerNodeAggregatorServer.launchServer(new String[0]); PerNodeTimelineAggregatorsAuxService.launchServer(new String[0]);
} catch (ExitUtil.ExitException e) { } catch (ExitUtil.ExitException e) {
assertEquals(0, e.status); assertEquals(0, e.status);
ExitUtil.resetFirstExitException(); ExitUtil.resetFirstExitException();
fail(); fail();
} finally { } finally {
if (server != null) { if (auxService != null) {
server.stop(); auxService.stop();
} }
} }
} }
private PerNodeAggregatorServer createAggregatorAndAddApplication() { private PerNodeTimelineAggregatorsAuxService createAggregatorAndAddApplication() {
PerNodeAggregatorServer aggregator = createAggregator(); PerNodeTimelineAggregatorsAuxService auxService = createAggregator();
// create an AM container // create an AM container
ContainerId containerId = getAMContainerId(); ContainerId containerId = getAMContainerId();
ContainerInitializationContext context = ContainerInitializationContext context =
mock(ContainerInitializationContext.class); mock(ContainerInitializationContext.class);
when(context.getContainerId()).thenReturn(containerId); when(context.getContainerId()).thenReturn(containerId);
aggregator.initializeContainer(context); auxService.initializeContainer(context);
return aggregator; return auxService;
} }
private PerNodeAggregatorServer createAggregator() { private PerNodeTimelineAggregatorsAuxService createAggregator() {
AppLevelServiceManager serviceManager = spy(new AppLevelServiceManager()); TimelineAggregatorsCollection
doReturn(new Configuration()).when(serviceManager).getConfig(); aggregatorsCollection = spy(new TimelineAggregatorsCollection());
PerNodeAggregatorServer aggregator = doReturn(new Configuration()).when(aggregatorsCollection).getConfig();
spy(new PerNodeAggregatorServer(serviceManager)); PerNodeTimelineAggregatorsAuxService auxService =
return aggregator; spy(new PerNodeTimelineAggregatorsAuxService(aggregatorsCollection));
return auxService;
} }
private ContainerId getAMContainerId() { private ContainerId getAMContainerId() {

View File

@ -30,16 +30,17 @@
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.junit.Test; import org.junit.Test;
public class TestAppLevelServiceManager { public class TestTimelineAggregatorsCollection {
@Test(timeout=60000) @Test(timeout=60000)
public void testMultithreadedAdd() throws Exception { public void testMultithreadedAdd() throws Exception {
final AppLevelServiceManager serviceManager = final TimelineAggregatorsCollection aggregatorCollection =
spy(new AppLevelServiceManager()); spy(new TimelineAggregatorsCollection());
doReturn(new Configuration()).when(serviceManager).getConfig(); doReturn(new Configuration()).when(aggregatorCollection).getConfig();
final int NUM_APPS = 5; final int NUM_APPS = 5;
List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>(); List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
@ -47,7 +48,9 @@ public void testMultithreadedAdd() throws Exception {
final String appId = String.valueOf(i); final String appId = String.valueOf(i);
Callable<Boolean> task = new Callable<Boolean>() { Callable<Boolean> task = new Callable<Boolean>() {
public Boolean call() { public Boolean call() {
return serviceManager.addService(appId); AppLevelTimelineAggregator aggregator =
new AppLevelTimelineAggregator(appId);
return (aggregatorCollection.putIfAbsent(appId, aggregator) == aggregator);
} }
}; };
tasks.add(task); tasks.add(task);
@ -63,15 +66,15 @@ public Boolean call() {
} }
// check the keys // check the keys
for (int i = 0; i < NUM_APPS; i++) { for (int i = 0; i < NUM_APPS; i++) {
assertTrue(serviceManager.hasService(String.valueOf(i))); assertTrue(aggregatorCollection.containsKey(String.valueOf(i)));
} }
} }
@Test @Test
public void testMultithreadedAddAndRemove() throws Exception { public void testMultithreadedAddAndRemove() throws Exception {
final AppLevelServiceManager serviceManager = final TimelineAggregatorsCollection aggregatorCollection =
spy(new AppLevelServiceManager()); spy(new TimelineAggregatorsCollection());
doReturn(new Configuration()).when(serviceManager).getConfig(); doReturn(new Configuration()).when(aggregatorCollection).getConfig();
final int NUM_APPS = 5; final int NUM_APPS = 5;
List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>(); List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
@ -79,8 +82,11 @@ public void testMultithreadedAddAndRemove() throws Exception {
final String appId = String.valueOf(i); final String appId = String.valueOf(i);
Callable<Boolean> task = new Callable<Boolean>() { Callable<Boolean> task = new Callable<Boolean>() {
public Boolean call() { public Boolean call() {
return serviceManager.addService(appId) && AppLevelTimelineAggregator aggregator =
serviceManager.removeService(appId); new AppLevelTimelineAggregator(appId);
boolean successPut =
(aggregatorCollection.putIfAbsent(appId, aggregator) == aggregator);
return successPut && aggregatorCollection.remove(appId);
} }
}; };
tasks.add(task); tasks.add(task);
@ -96,7 +102,7 @@ public Boolean call() {
} }
// check the keys // check the keys
for (int i = 0; i < NUM_APPS; i++) { for (int i = 0; i < NUM_APPS; i++) {
assertFalse(serviceManager.hasService(String.valueOf(i))); assertFalse(aggregatorCollection.containsKey(String.valueOf(i)));
} }
} }
} }