diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
index 8b06c51bf0a..615df6758e3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
@@ -78,7 +78,7 @@ import org.apache.hadoop.yarn.server.timeline.TimelineVersion;
import org.apache.hadoop.yarn.server.timeline.TimelineVersionWatcher;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeAggregatorServer;
+import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeTimelineAggregatorsAuxService;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -159,7 +159,7 @@ public class TestDistributedShell {
// enable aux-service based timeline aggregators
conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME);
conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + TIMELINE_AUX_SERVICE_NAME
- + ".class", PerNodeAggregatorServer.class.getName());
+ + ".class", PerNodeTimelineAggregatorsAuxService.class.getName());
} else {
Assert.fail("Wrong timeline version number: " + timelineVersion);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml
index 27f72722292..4a69c9c31bd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml
@@ -52,11 +52,6 @@
org.apache.hadoop
hadoop-yarn-api
-
- org.apache.hadoop
- hadoop-yarn-server-timelineservice
- ${project.version}
-
javax.xml.bind
jaxb-api
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java
index ff4013378a4..bb444db0a86 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java
@@ -30,9 +30,6 @@ import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
-import org.apache.hadoop.yarn.server.timelineservice.aggregator.AppLevelServiceManager;
-import org.apache.hadoop.yarn.server.timelineservice.aggregator.AppLevelServiceManagerProvider;
-import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeAggregatorWebService;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.WebApps;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
index a5159a2d587..32ee5d8a6c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
@@ -6,7 +6,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeAggregatorServer;
+import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeTimelineAggregatorsAuxService;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -14,13 +14,13 @@ import org.junit.Test;
import static org.junit.Assert.fail;
public class TestTimelineServiceClientIntegration {
- private static PerNodeAggregatorServer server;
+ private static PerNodeTimelineAggregatorsAuxService auxService;
@BeforeClass
public static void setupClass() throws Exception {
try {
- server = PerNodeAggregatorServer.launchServer(new String[0]);
- server.addApplication(ApplicationId.newInstance(0, 1));
+ auxService = PerNodeTimelineAggregatorsAuxService.launchServer(new String[0]);
+ auxService.addApplication(ApplicationId.newInstance(0, 1));
} catch (ExitUtil.ExitException e) {
fail();
}
@@ -28,8 +28,8 @@ public class TestTimelineServiceClientIntegration {
@AfterClass
public static void tearDownClass() throws Exception {
- if (server != null) {
- server.stop();
+ if (auxService != null) {
+ auxService.stop();
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManager.java
deleted file mode 100644
index 05d321fa7aa..00000000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManager.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-
-/**
- * Class that manages adding and removing app level aggregator services and
- * their lifecycle. It provides thread safety access to the app level services.
- *
- * It is a singleton, and instances should be obtained via
- * {@link #getInstance()}.
- */
-@Private
-@Unstable
-public class AppLevelServiceManager extends CompositeService {
- private static final Log LOG =
- LogFactory.getLog(AppLevelServiceManager.class);
- private static final AppLevelServiceManager INSTANCE =
- new AppLevelServiceManager();
-
- // access to this map is synchronized with the map itself
- private final Map services =
- Collections.synchronizedMap(
- new HashMap());
-
- static AppLevelServiceManager getInstance() {
- return INSTANCE;
- }
-
- AppLevelServiceManager() {
- super(AppLevelServiceManager.class.getName());
- }
-
- /**
- * Creates and adds an app level aggregator service for the specified
- * application id. The service is also initialized and started. If the service
- * already exists, no new service is created.
- *
- * @throws YarnRuntimeException if there was any exception in initializing and
- * starting the app level service
- * @return whether it was added successfully
- */
- public boolean addService(String appId) {
- synchronized (services) {
- AppLevelAggregatorService service = services.get(appId);
- if (service == null) {
- try {
- service = new AppLevelAggregatorService(appId);
- // initialize, start, and add it to the parent service so it can be
- // cleaned up when the parent shuts down
- service.init(getConfig());
- service.start();
- services.put(appId, service);
- LOG.info("the application aggregator service for " + appId +
- " was added");
- return true;
- } catch (Exception e) {
- throw new YarnRuntimeException(e);
- }
- } else {
- String msg = "the application aggregator service for " + appId +
- " already exists!";
- LOG.error(msg);
- return false;
- }
- }
- }
-
- /**
- * Removes the app level aggregator service for the specified application id.
- * The service is also stopped as a result. If the service does not exist, no
- * change is made.
- *
- * @return whether it was removed successfully
- */
- public boolean removeService(String appId) {
- synchronized (services) {
- AppLevelAggregatorService service = services.remove(appId);
- if (service == null) {
- String msg = "the application aggregator service for " + appId +
- " does not exist!";
- LOG.error(msg);
- return false;
- } else {
- // stop the service to do clean up
- service.stop();
- LOG.info("the application aggregator service for " + appId +
- " was removed");
- return true;
- }
- }
- }
-
- /**
- * Returns the app level aggregator service for the specified application id.
- *
- * @return the app level aggregator service or null if it does not exist
- */
- public AppLevelAggregatorService getService(String appId) {
- return services.get(appId);
- }
-
- /**
- * Returns whether the app level aggregator service for the specified
- * application id exists.
- */
- public boolean hasService(String appId) {
- return services.containsKey(appId);
- }
-}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManagerProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManagerProvider.java
deleted file mode 100644
index 8768575f29b..00000000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManagerProvider.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-import com.google.inject.Provider;
-
-/**
- * A guice provider that provides a global singleton instance of
- * AppLevelServiceManager.
- */
-public class AppLevelServiceManagerProvider
- implements Provider {
- @Override
- public AppLevelServiceManager get() {
- return AppLevelServiceManager.getInstance();
- }
-}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelAggregatorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelTimelineAggregator.java
similarity index 89%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelAggregatorService.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelTimelineAggregator.java
index bf72fb99e7d..95ec9f8e40a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelAggregatorService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelTimelineAggregator.java
@@ -30,12 +30,12 @@ import org.apache.hadoop.conf.Configuration;
*/
@Private
@Unstable
-public class AppLevelAggregatorService extends BaseAggregatorService {
+public class AppLevelTimelineAggregator extends TimelineAggregator {
private final String applicationId;
// TODO define key metadata such as flow metadata, user, and queue
- public AppLevelAggregatorService(String applicationId) {
- super(AppLevelAggregatorService.class.getName() + " - " + applicationId);
+ public AppLevelTimelineAggregator(String applicationId) {
+ super(AppLevelTimelineAggregator.class.getName() + " - " + applicationId);
this.applicationId = applicationId;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java
similarity index 53%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java
index deb21c7681f..cdc4e351b0e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java
@@ -18,16 +18,13 @@
package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-import java.net.URI;
import java.nio.ByteBuffer;
-import java.util.HashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.http.lib.StaticUserWebFilter;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
@@ -35,144 +32,91 @@ import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.server.api.ContainerContext;
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
-import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
-import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
-import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
-import org.apache.hadoop.http.HttpServer2;
import com.google.common.annotations.VisibleForTesting;
-import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER;
-import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER;
-
/**
- * The top-level server for the per-node timeline aggregator service. Currently
+ * The top-level server for the per-node timeline aggregator collection. Currently
* it is defined as an auxiliary service to accommodate running within another
* daemon (e.g. node manager).
*/
@Private
@Unstable
-public class PerNodeAggregatorServer extends AuxiliaryService {
+public class PerNodeTimelineAggregatorsAuxService extends AuxiliaryService {
private static final Log LOG =
- LogFactory.getLog(PerNodeAggregatorServer.class);
+ LogFactory.getLog(PerNodeTimelineAggregatorsAuxService.class);
private static final int SHUTDOWN_HOOK_PRIORITY = 30;
- static final String AGGREGATOR_COLLECTION_ATTR_KEY = "aggregator.collection";
- private final AppLevelServiceManager serviceManager;
- private HttpServer2 timelineRestServer;
+ private final TimelineAggregatorsCollection aggregatorCollection;
- public PerNodeAggregatorServer() {
+ public PerNodeTimelineAggregatorsAuxService() {
// use the same singleton
- this(AppLevelServiceManager.getInstance());
+ this(TimelineAggregatorsCollection.getInstance());
}
- @VisibleForTesting
- PerNodeAggregatorServer(AppLevelServiceManager serviceManager) {
+ @VisibleForTesting PerNodeTimelineAggregatorsAuxService(
+ TimelineAggregatorsCollection aggregatorCollection) {
super("timeline_aggregator");
- this.serviceManager = serviceManager;
+ this.aggregatorCollection = aggregatorCollection;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
- serviceManager.init(conf);
+ aggregatorCollection.init(conf);
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
+ aggregatorCollection.start();
super.serviceStart();
- serviceManager.start();
- startWebApp();
}
@Override
protected void serviceStop() throws Exception {
- if (timelineRestServer != null) {
- timelineRestServer.stop();
- }
- // stop the service manager
- serviceManager.stop();
+ aggregatorCollection.stop();
super.serviceStop();
}
- private void startWebApp() {
- Configuration conf = getConfig();
- // use the same ports as the old ATS for now; we could create new properties
- // for the new timeline service if needed
- String bindAddress = WebAppUtils.getWebAppBindURL(conf,
- YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
- WebAppUtils.getAHSWebAppURLWithoutScheme(conf));
- LOG.info("Instantiating the per-node aggregator webapp at " + bindAddress);
- try {
- Configuration confForInfoServer = new Configuration(conf);
- confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10);
- HttpServer2.Builder builder = new HttpServer2.Builder()
- .setName("timeline")
- .setConf(conf)
- .addEndpoint(URI.create("http://" + bindAddress));
- timelineRestServer = builder.build();
- // TODO: replace this by an authentication filter in future.
- HashMap options = new HashMap<>();
- String username = conf.get(HADOOP_HTTP_STATIC_USER,
- DEFAULT_HADOOP_HTTP_STATIC_USER);
- options.put(HADOOP_HTTP_STATIC_USER, username);
- HttpServer2.defineFilter(timelineRestServer.getWebAppContext(),
- "static_user_filter_timeline",
- StaticUserWebFilter.StaticUserFilter.class.getName(),
- options, new String[] {"/*"});
-
- timelineRestServer.addJerseyResourcePackage(
- PerNodeAggregatorWebService.class.getPackage().getName() + ";"
- + GenericExceptionHandler.class.getPackage().getName() + ";"
- + YarnJacksonJaxbJsonProvider.class.getPackage().getName(),
- "/*");
- timelineRestServer.setAttribute(AGGREGATOR_COLLECTION_ATTR_KEY,
- AppLevelServiceManager.getInstance());
- timelineRestServer.start();
- } catch (Exception e) {
- String msg = "The per-node aggregator webapp failed to start.";
- LOG.error(msg, e);
- throw new YarnRuntimeException(msg, e);
- }
- }
-
// these methods can be used as the basis for future service methods if the
// per-node aggregator runs separate from the node manager
/**
- * Creates and adds an app level aggregator service for the specified
- * application id. The service is also initialized and started. If the service
- * already exists, no new service is created.
+ * Creates and adds an app level aggregator for the specified application id.
+ * The aggregator is also initialized and started. If the service already
+ * exists, no new service is created.
*
* @return whether it was added successfully
*/
public boolean addApplication(ApplicationId appId) {
String appIdString = appId.toString();
- return serviceManager.addService(appIdString);
+ AppLevelTimelineAggregator aggregator =
+ new AppLevelTimelineAggregator(appIdString);
+ return (aggregatorCollection.putIfAbsent(appIdString, aggregator)
+ == aggregator);
}
/**
- * Removes the app level aggregator service for the specified application id.
- * The service is also stopped as a result. If the service does not exist, no
+ * Removes the app level aggregator for the specified application id. The
+ * aggregator is also stopped as a result. If the aggregator does not exist, no
* change is made.
*
* @return whether it was removed successfully
*/
public boolean removeApplication(ApplicationId appId) {
String appIdString = appId.toString();
- return serviceManager.removeService(appIdString);
+ return aggregatorCollection.remove(appIdString);
}
/**
- * Creates and adds an app level aggregator service for the specified
- * application id. The service is also initialized and started. If the service
- * already exists, no new service is created.
+ * Creates and adds an app level aggregator for the specified application id.
+ * The aggregator is also initialized and started. If the aggregator already
+ * exists, no new aggregator is created.
*/
@Override
public void initializeContainer(ContainerInitializationContext context) {
@@ -186,8 +130,8 @@ public class PerNodeAggregatorServer extends AuxiliaryService {
}
/**
- * Removes the app level aggregator service for the specified application id.
- * The service is also stopped as a result. If the service does not exist, no
+ * Removes the app level aggregator for the specified application id. The
+ * aggregator is also stopped as a result. If the aggregator does not exist, no
* change is made.
*/
@Override
@@ -211,7 +155,7 @@ public class PerNodeAggregatorServer extends AuxiliaryService {
@VisibleForTesting
boolean hasApplication(String appId) {
- return serviceManager.hasService(appId);
+ return aggregatorCollection.containsKey(appId);
}
@Override
@@ -230,35 +174,35 @@ public class PerNodeAggregatorServer extends AuxiliaryService {
}
@VisibleForTesting
- public static PerNodeAggregatorServer launchServer(String[] args) {
+ public static PerNodeTimelineAggregatorsAuxService launchServer(String[] args) {
Thread
.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
- StringUtils.startupShutdownMessage(PerNodeAggregatorServer.class, args,
+ StringUtils.startupShutdownMessage(PerNodeTimelineAggregatorsAuxService.class, args,
LOG);
- PerNodeAggregatorServer server = null;
+ PerNodeTimelineAggregatorsAuxService auxService = null;
try {
- server = new PerNodeAggregatorServer();
- ShutdownHookManager.get().addShutdownHook(new ShutdownHook(server),
+ auxService = new PerNodeTimelineAggregatorsAuxService();
+ ShutdownHookManager.get().addShutdownHook(new ShutdownHook(auxService),
SHUTDOWN_HOOK_PRIORITY);
YarnConfiguration conf = new YarnConfiguration();
- server.init(conf);
- server.start();
+ auxService.init(conf);
+ auxService.start();
} catch (Throwable t) {
LOG.fatal("Error starting PerNodeAggregatorServer", t);
ExitUtil.terminate(-1, "Error starting PerNodeAggregatorServer");
}
- return server;
+ return auxService;
}
private static class ShutdownHook implements Runnable {
- private final PerNodeAggregatorServer server;
+ private final PerNodeTimelineAggregatorsAuxService auxService;
- public ShutdownHook(PerNodeAggregatorServer server) {
- this.server = server;
+ public ShutdownHook(PerNodeTimelineAggregatorsAuxService auxService) {
+ this.auxService = auxService;
}
public void run() {
- server.stop();
+ auxService.stop();
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java
similarity index 93%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java
index e3621398c27..42277124a3c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java
@@ -31,15 +31,15 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
* Service that handles writes to the timeline service and writes them to the
* backing storage.
*
- * Classes that extend this can add their own lifecycle management or
+ * Classes that extend this can putIfAbsent their own lifecycle management or
* customization of request handling.
*/
@Private
@Unstable
-public class BaseAggregatorService extends CompositeService {
- private static final Log LOG = LogFactory.getLog(BaseAggregatorService.class);
+public abstract class TimelineAggregator extends CompositeService {
+ private static final Log LOG = LogFactory.getLog(TimelineAggregator.class);
- public BaseAggregatorService(String name) {
+ public TimelineAggregator(String name) {
super(name);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorWebService.java
similarity index 92%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorWebService.java
index ffe099e8836..7d42f94eb29 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorWebService.java
@@ -53,9 +53,9 @@ import com.google.inject.Singleton;
@Unstable
@Singleton
@Path("/ws/v2/timeline")
-public class PerNodeAggregatorWebService {
+public class TimelineAggregatorWebService {
private static final Log LOG =
- LogFactory.getLog(PerNodeAggregatorWebService.class);
+ LogFactory.getLog(TimelineAggregatorWebService.class);
private @Context ServletContext context;
@@ -128,7 +128,7 @@ public class PerNodeAggregatorWebService {
if (appId == null) {
return Response.status(Response.Status.BAD_REQUEST).build();
}
- AppLevelAggregatorService service = getAggregatorService(req, appId);
+ TimelineAggregator service = getAggregatorService(req, appId);
if (service == null) {
LOG.error("Application not found");
throw new NotFoundException(); // different exception?
@@ -156,13 +156,13 @@ public class PerNodeAggregatorWebService {
}
}
- private AppLevelAggregatorService
+ private TimelineAggregator
getAggregatorService(HttpServletRequest req, String appIdToParse) {
String appIdString = parseApplicationId(appIdToParse);
- final AppLevelServiceManager serviceManager =
- (AppLevelServiceManager) context.getAttribute(
- PerNodeAggregatorServer.AGGREGATOR_COLLECTION_ATTR_KEY);
- return serviceManager.getService(appIdString);
+ final TimelineAggregatorsCollection aggregatorCollection =
+ (TimelineAggregatorsCollection) context.getAttribute(
+ TimelineAggregatorsCollection.AGGREGATOR_COLLECTION_ATTR_KEY);
+ return aggregatorCollection.get(appIdString);
}
private void init(HttpServletResponse response) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java
new file mode 100644
index 00000000000..73b6d524c81
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import java.net.URI;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.http.HttpServer2;
+import org.apache.hadoop.http.lib.StaticUserWebFilter;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
+import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER;
+
+/**
+ * Class that manages adding and removing aggregators and their lifecycle. It
+ * provides thread safety access to the aggregators inside.
+ *
+ * It is a singleton, and instances should be obtained via
+ * {@link #getInstance()}.
+ */
+@Private
+@Unstable
+public class TimelineAggregatorsCollection extends CompositeService {
+ private static final Log LOG =
+ LogFactory.getLog(TimelineAggregatorsCollection.class);
+ private static final TimelineAggregatorsCollection INSTANCE =
+ new TimelineAggregatorsCollection();
+
+ // access to this map is synchronized with the map itself
+ private final Map aggregators =
+ Collections.synchronizedMap(
+ new HashMap());
+
+ // REST server for this aggregator collection
+ private HttpServer2 timelineRestServer;
+
+ static final String AGGREGATOR_COLLECTION_ATTR_KEY = "aggregator.collection";
+
+ static TimelineAggregatorsCollection getInstance() {
+ return INSTANCE;
+ }
+
+ TimelineAggregatorsCollection() {
+ super(TimelineAggregatorsCollection.class.getName());
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ startWebApp();
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ if (timelineRestServer != null) {
+ timelineRestServer.stop();
+ }
+ super.serviceStop();
+ }
+
+ /**
+ * Put the aggregator into the collection if an aggregator mapped by id does
+ * not exist.
+ *
+ * @throws YarnRuntimeException if there was any exception in initializing and
+ * starting the app level service
+ * @return the aggregator associated with id after the potential put.
+ */
+ public TimelineAggregator putIfAbsent(String id, TimelineAggregator aggregator) {
+ synchronized (aggregators) {
+ TimelineAggregator aggregatorInTable = aggregators.get(id);
+ if (aggregatorInTable == null) {
+ try {
+ // initialize, start, and add it to the collection so it can be
+ // cleaned up when the parent shuts down
+ aggregator.init(getConfig());
+ aggregator.start();
+ aggregators.put(id, aggregator);
+ LOG.info("the aggregator for " + id + " was added");
+ return aggregator;
+ } catch (Exception e) {
+ throw new YarnRuntimeException(e);
+ }
+ } else {
+ String msg = "the aggregator for " + id + " already exists!";
+ LOG.error(msg);
+ return aggregatorInTable;
+ }
+ }
+ }
+
+ /**
+ * Removes the aggregator for the specified id. The aggregator is also stopped
+ * as a result. If the aggregator does not exist, no change is made.
+ *
+ * @return whether it was removed successfully
+ */
+ public boolean remove(String id) {
+ synchronized (aggregators) {
+ TimelineAggregator aggregator = aggregators.remove(id);
+ if (aggregator == null) {
+ String msg = "the aggregator for " + id + " does not exist!";
+ LOG.error(msg);
+ return false;
+ } else {
+ // stop the service to do clean up
+ aggregator.stop();
+ LOG.info("the aggregator service for " + id + " was removed");
+ return true;
+ }
+ }
+ }
+
+ /**
+ * Returns the aggregator for the specified id.
+ *
+ * @return the aggregator or null if it does not exist
+ */
+ public TimelineAggregator get(String id) {
+ return aggregators.get(id);
+ }
+
+ /**
+ * Returns whether the aggregator for the specified id exists in this
+ * collection.
+ */
+ public boolean containsKey(String id) {
+ return aggregators.containsKey(id);
+ }
+
+ /**
+ * Launch the REST web server for this aggregator collection
+ */
+ private void startWebApp() {
+ Configuration conf = getConfig();
+ // use the same ports as the old ATS for now; we could create new properties
+ // for the new timeline service if needed
+ String bindAddress = WebAppUtils.getWebAppBindURL(conf,
+ YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
+ WebAppUtils.getAHSWebAppURLWithoutScheme(conf));
+ LOG.info("Instantiating the per-node aggregator webapp at " + bindAddress);
+ try {
+ Configuration confForInfoServer = new Configuration(conf);
+ confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10);
+ HttpServer2.Builder builder = new HttpServer2.Builder()
+ .setName("timeline")
+ .setConf(conf)
+ .addEndpoint(URI.create("http://" + bindAddress));
+ timelineRestServer = builder.build();
+ // TODO: replace this by an authentication filter in future.
+ HashMap options = new HashMap<>();
+ String username = conf.get(HADOOP_HTTP_STATIC_USER,
+ DEFAULT_HADOOP_HTTP_STATIC_USER);
+ options.put(HADOOP_HTTP_STATIC_USER, username);
+ HttpServer2.defineFilter(timelineRestServer.getWebAppContext(),
+ "static_user_filter_timeline",
+ StaticUserWebFilter.StaticUserFilter.class.getName(),
+ options, new String[] {"/*"});
+
+ timelineRestServer.addJerseyResourcePackage(
+ TimelineAggregatorWebService.class.getPackage().getName() + ";"
+ + GenericExceptionHandler.class.getPackage().getName() + ";"
+ + YarnJacksonJaxbJsonProvider.class.getPackage().getName(),
+ "/*");
+ timelineRestServer.setAttribute(AGGREGATOR_COLLECTION_ATTR_KEY,
+ TimelineAggregatorsCollection.getInstance());
+ timelineRestServer.start();
+ } catch (Exception e) {
+ String msg = "The per-node aggregator webapp failed to start.";
+ LOG.error(msg, e);
+ throw new YarnRuntimeException(msg, e);
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelAggregatorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelTimelineAggregator.java
similarity index 95%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelAggregatorService.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelTimelineAggregator.java
index c0af8c53c18..8f95814a035 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelAggregatorService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelTimelineAggregator.java
@@ -19,5 +19,5 @@
package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-public class TestAppLevelAggregatorService {
+public class TestAppLevelTimelineAggregator {
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeAggregatorServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeTimelineAggregatorsAuxService.java
similarity index 64%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeAggregatorServer.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeTimelineAggregatorsAuxService.java
index 902047d1151..1c89ead58f5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeAggregatorServer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeTimelineAggregatorsAuxService.java
@@ -36,10 +36,10 @@ import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
import org.junit.Test;
-public class TestPerNodeAggregatorServer {
+public class TestPerNodeTimelineAggregatorsAuxService {
private ApplicationAttemptId appAttemptId;
- public TestPerNodeAggregatorServer() {
+ public TestPerNodeTimelineAggregatorsAuxService() {
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
@@ -47,96 +47,97 @@ public class TestPerNodeAggregatorServer {
@Test
public void testAddApplication() throws Exception {
- PerNodeAggregatorServer aggregator = createAggregatorAndAddApplication();
- // aggregator should have a single app
- assertTrue(aggregator.hasApplication(
+ PerNodeTimelineAggregatorsAuxService auxService = createAggregatorAndAddApplication();
+ // auxService should have a single app
+ assertTrue(auxService.hasApplication(
appAttemptId.getApplicationId().toString()));
- aggregator.close();
+ auxService.close();
}
@Test
public void testAddApplicationNonAMContainer() throws Exception {
- PerNodeAggregatorServer aggregator = createAggregator();
+ PerNodeTimelineAggregatorsAuxService auxService = createAggregator();
ContainerId containerId = getContainerId(2L); // not an AM
ContainerInitializationContext context =
mock(ContainerInitializationContext.class);
when(context.getContainerId()).thenReturn(containerId);
- aggregator.initializeContainer(context);
- // aggregator should not have that app
- assertFalse(aggregator.hasApplication(
+ auxService.initializeContainer(context);
+ // auxService should not have that app
+ assertFalse(auxService.hasApplication(
appAttemptId.getApplicationId().toString()));
}
@Test
public void testRemoveApplication() throws Exception {
- PerNodeAggregatorServer aggregator = createAggregatorAndAddApplication();
- // aggregator should have a single app
+ PerNodeTimelineAggregatorsAuxService auxService = createAggregatorAndAddApplication();
+ // auxService should have a single app
String appIdStr = appAttemptId.getApplicationId().toString();
- assertTrue(aggregator.hasApplication(appIdStr));
+ assertTrue(auxService.hasApplication(appIdStr));
ContainerId containerId = getAMContainerId();
ContainerTerminationContext context =
mock(ContainerTerminationContext.class);
when(context.getContainerId()).thenReturn(containerId);
- aggregator.stopContainer(context);
- // aggregator should not have that app
- assertFalse(aggregator.hasApplication(appIdStr));
- aggregator.close();
+ auxService.stopContainer(context);
+ // auxService should not have that app
+ assertFalse(auxService.hasApplication(appIdStr));
+ auxService.close();
}
@Test
public void testRemoveApplicationNonAMContainer() throws Exception {
- PerNodeAggregatorServer aggregator = createAggregatorAndAddApplication();
- // aggregator should have a single app
+ PerNodeTimelineAggregatorsAuxService auxService = createAggregatorAndAddApplication();
+ // auxService should have a single app
String appIdStr = appAttemptId.getApplicationId().toString();
- assertTrue(aggregator.hasApplication(appIdStr));
+ assertTrue(auxService.hasApplication(appIdStr));
ContainerId containerId = getContainerId(2L); // not an AM
ContainerTerminationContext context =
mock(ContainerTerminationContext.class);
when(context.getContainerId()).thenReturn(containerId);
- aggregator.stopContainer(context);
- // aggregator should still have that app
- assertTrue(aggregator.hasApplication(appIdStr));
- aggregator.close();
+ auxService.stopContainer(context);
+ // auxService should still have that app
+ assertTrue(auxService.hasApplication(appIdStr));
+ auxService.close();
}
@Test(timeout = 60000)
public void testLaunch() throws Exception {
ExitUtil.disableSystemExit();
- PerNodeAggregatorServer server = null;
+ PerNodeTimelineAggregatorsAuxService auxService = null;
try {
- server =
- PerNodeAggregatorServer.launchServer(new String[0]);
+ auxService =
+ PerNodeTimelineAggregatorsAuxService.launchServer(new String[0]);
} catch (ExitUtil.ExitException e) {
assertEquals(0, e.status);
ExitUtil.resetFirstExitException();
fail();
} finally {
- if (server != null) {
- server.stop();
+ if (auxService != null) {
+ auxService.stop();
}
}
}
- private PerNodeAggregatorServer createAggregatorAndAddApplication() {
- PerNodeAggregatorServer aggregator = createAggregator();
+ private PerNodeTimelineAggregatorsAuxService createAggregatorAndAddApplication() {
+ PerNodeTimelineAggregatorsAuxService auxService = createAggregator();
// create an AM container
ContainerId containerId = getAMContainerId();
ContainerInitializationContext context =
mock(ContainerInitializationContext.class);
when(context.getContainerId()).thenReturn(containerId);
- aggregator.initializeContainer(context);
- return aggregator;
+ auxService.initializeContainer(context);
+ return auxService;
}
- private PerNodeAggregatorServer createAggregator() {
- AppLevelServiceManager serviceManager = spy(new AppLevelServiceManager());
- doReturn(new Configuration()).when(serviceManager).getConfig();
- PerNodeAggregatorServer aggregator =
- spy(new PerNodeAggregatorServer(serviceManager));
- return aggregator;
+ private PerNodeTimelineAggregatorsAuxService createAggregator() {
+ TimelineAggregatorsCollection
+ aggregatorsCollection = spy(new TimelineAggregatorsCollection());
+ doReturn(new Configuration()).when(aggregatorsCollection).getConfig();
+ PerNodeTimelineAggregatorsAuxService auxService =
+ spy(new PerNodeTimelineAggregatorsAuxService(aggregatorsCollection));
+ return auxService;
}
private ContainerId getAMContainerId() {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestBaseAggregatorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregator.java
similarity index 95%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestBaseAggregatorService.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregator.java
index 55953cd7c69..821e455f97b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestBaseAggregatorService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregator.java
@@ -18,6 +18,6 @@
package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-public class TestBaseAggregatorService {
+public class TestTimelineAggregator {
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregatorsCollection.java
similarity index 71%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelServiceManager.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregatorsCollection.java
index 3f981c7a8e4..cec1d71eae9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelServiceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregatorsCollection.java
@@ -30,16 +30,17 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider;
import org.apache.hadoop.conf.Configuration;
import org.junit.Test;
-public class TestAppLevelServiceManager {
+public class TestTimelineAggregatorsCollection {
@Test(timeout=60000)
public void testMultithreadedAdd() throws Exception {
- final AppLevelServiceManager serviceManager =
- spy(new AppLevelServiceManager());
- doReturn(new Configuration()).when(serviceManager).getConfig();
+ final TimelineAggregatorsCollection aggregatorCollection =
+ spy(new TimelineAggregatorsCollection());
+ doReturn(new Configuration()).when(aggregatorCollection).getConfig();
final int NUM_APPS = 5;
List> tasks = new ArrayList>();
@@ -47,7 +48,9 @@ public class TestAppLevelServiceManager {
final String appId = String.valueOf(i);
Callable task = new Callable() {
public Boolean call() {
- return serviceManager.addService(appId);
+ AppLevelTimelineAggregator aggregator =
+ new AppLevelTimelineAggregator(appId);
+ return (aggregatorCollection.putIfAbsent(appId, aggregator) == aggregator);
}
};
tasks.add(task);
@@ -63,15 +66,15 @@ public class TestAppLevelServiceManager {
}
// check the keys
for (int i = 0; i < NUM_APPS; i++) {
- assertTrue(serviceManager.hasService(String.valueOf(i)));
+ assertTrue(aggregatorCollection.containsKey(String.valueOf(i)));
}
}
@Test
public void testMultithreadedAddAndRemove() throws Exception {
- final AppLevelServiceManager serviceManager =
- spy(new AppLevelServiceManager());
- doReturn(new Configuration()).when(serviceManager).getConfig();
+ final TimelineAggregatorsCollection aggregatorCollection =
+ spy(new TimelineAggregatorsCollection());
+ doReturn(new Configuration()).when(aggregatorCollection).getConfig();
final int NUM_APPS = 5;
List> tasks = new ArrayList>();
@@ -79,8 +82,11 @@ public class TestAppLevelServiceManager {
final String appId = String.valueOf(i);
Callable task = new Callable() {
public Boolean call() {
- return serviceManager.addService(appId) &&
- serviceManager.removeService(appId);
+ AppLevelTimelineAggregator aggregator =
+ new AppLevelTimelineAggregator(appId);
+ boolean successPut =
+ (aggregatorCollection.putIfAbsent(appId, aggregator) == aggregator);
+ return successPut && aggregatorCollection.remove(appId);
}
};
tasks.add(task);
@@ -96,7 +102,7 @@ public class TestAppLevelServiceManager {
}
// check the keys
for (int i = 0; i < NUM_APPS; i++) {
- assertFalse(serviceManager.hasService(String.valueOf(i)));
+ assertFalse(aggregatorCollection.containsKey(String.valueOf(i)));
}
}
}