diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 0beec620de0..c0241c9c56a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1824,6 +1824,14 @@ public class YarnConfiguration extends Configuration {
public static final String TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES =
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "group-id-plugin-classes";
+ public static final String TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSPATH =
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX
+ + "group-id-plugin-classpath";
+
+ public static final String TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_SYSTEM_CLASSES
+ = TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX
+ + "group-id-plugin-system-classes";
+
public static final String
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_STORE =
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "summary-store";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index eabb679b7a7..cb0ac073699 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2084,6 +2084,15 @@
+
+ yarn.timeline-service.entity-group-fs-store.group-id-plugin-classpath
+
+
+ Classpath for all plugins defined in
+ yarn.timeline-service.entity-group-fs-store.group-id-plugin-classes.
+
+
+
yarn.timeline-service.entity-group-fs-store.summary-store
Summary storage for ATS v1.5
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
index 958b54e623b..7e6c1541616 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.service.ServiceOperations;
import org.apache.hadoop.ipc.CallerContext;
+import org.apache.hadoop.util.ApplicationClassLoader;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -48,7 +49,6 @@ import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl;
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
-import org.apache.hadoop.yarn.util.ConverterUtils;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.map.MappingJsonFactory;
import org.codehaus.jackson.map.ObjectMapper;
@@ -58,7 +58,12 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
+import java.net.MalformedURLException;
+import java.security.AccessController;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
@@ -217,25 +222,47 @@ public class EntityGroupFSTimelineStore extends CompositeService
throws RuntimeException {
Collection pluginNames = conf.getTrimmedStringCollection(
YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES);
+
+ String pluginClasspath = conf.getTrimmed(
+ YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSPATH);
+ String[] systemClasses = conf.getTrimmedStrings(
+ YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_SYSTEM_CLASSES);
+
List pluginList
= new LinkedList();
Exception caught = null;
+ ClassLoader customClassLoader = null;
+ if (pluginClasspath != null && pluginClasspath.length() > 0) {
+ try {
+ customClassLoader = createPluginClassLoader(pluginClasspath,
+ systemClasses);
+ } catch (IOException ioe) {
+ LOG.warn("Error loading classloader", ioe);
+ }
+ }
for (final String name : pluginNames) {
LOG.debug("Trying to load plugin class {}", name);
TimelineEntityGroupPlugin cacheIdPlugin = null;
+
try {
- Class> clazz = conf.getClassByName(name);
- cacheIdPlugin =
- (TimelineEntityGroupPlugin) ReflectionUtils.newInstance(
- clazz, conf);
+ if (customClassLoader != null) {
+ LOG.debug("Load plugin {} with classpath: {}", name, pluginClasspath);
+ Class> clazz = Class.forName(name, true, customClassLoader);
+ Class extends TimelineEntityGroupPlugin> sClass = clazz.asSubclass(
+ TimelineEntityGroupPlugin.class);
+ cacheIdPlugin = ReflectionUtils.newInstance(sClass, conf);
+ } else {
+ LOG.debug("Load plugin class with system classpath");
+ Class> clazz = conf.getClassByName(name);
+ cacheIdPlugin =
+ (TimelineEntityGroupPlugin) ReflectionUtils.newInstance(
+ clazz, conf);
+ }
} catch (Exception e) {
LOG.warn("Error loading plugin " + name, e);
- caught = e;
+ throw new RuntimeException("No class defined for " + name, e);
}
- if (cacheIdPlugin == null) {
- throw new RuntimeException("No class defined for " + name, caught);
- }
LOG.info("Load plugin class {}", cacheIdPlugin.getClass().getName());
pluginList.add(cacheIdPlugin);
}
@@ -489,6 +516,29 @@ public class EntityGroupFSTimelineStore extends CompositeService
return appId;
}
+ private static ClassLoader createPluginClassLoader(
+ final String appClasspath, final String[] systemClasses)
+ throws IOException {
+ try {
+ return AccessController.doPrivileged(
+ new PrivilegedExceptionAction() {
+ @Override
+ public ClassLoader run() throws MalformedURLException {
+ return new ApplicationClassLoader(appClasspath,
+ EntityGroupFSTimelineStore.class.getClassLoader(),
+ Arrays.asList(systemClasses));
+ }
+ }
+ );
+ } catch (PrivilegedActionException e) {
+ Throwable t = e.getCause();
+ if (t instanceof MalformedURLException) {
+ throw (MalformedURLException) t;
+ }
+ throw new IOException(e);
+ }
+ }
+
private Path getActiveAppPath(ApplicationId appId) {
return new Path(activeRootPath, appId.toString());
}
@@ -531,6 +581,15 @@ public class EntityGroupFSTimelineStore extends CompositeService
return getAppState(appId, yarnClient);
}
+ /**
+ * Get all plugins for tests.
+ * @return all plugins
+ */
+ @VisibleForTesting
+ List getPlugins() {
+ return cacheIdPlugins;
+ }
+
/**
* Ask the RM for the state of the application.
* This method has to be synchronized to control traffic to RM
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
index 1c12f36192b..95d09870513 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
@@ -29,6 +29,9 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableStat;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.ApplicationClassLoader;
+import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
@@ -43,7 +46,10 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
+import java.io.File;
import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
@@ -97,6 +103,8 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
@Rule
public TestName currTestName = new TestName();
+ private File rootDir;
+ private File testJar;
@BeforeClass
public static void setupClass() throws Exception {
@@ -144,6 +152,25 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
store = new EntityGroupFSTimelineStoreForTest();
if (currTestName.getMethodName().contains("Plugin")) {
+ rootDir = GenericTestUtils.getTestDir(getClass()
+ .getSimpleName());
+ if (!rootDir.exists()) {
+ rootDir.mkdirs();
+ }
+ testJar = null;
+ testJar = JarFinder.makeClassLoaderTestJar(this.getClass(), rootDir,
+ "test-runjar.jar", 2048,
+ EntityGroupPlugInForTest.class.getName());
+ config.set(
+ YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSPATH,
+ testJar.getAbsolutePath());
+ // add "-org.apache.hadoop." as system classes
+ String systemClasses = "-org.apache.hadoop." + "," +
+ ApplicationClassLoader.SYSTEM_CLASSES_DEFAULT;
+ config.set(
+ YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_SYSTEM_CLASSES,
+ systemClasses);
+
config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES,
EntityGroupPlugInForTest.class.getName());
}
@@ -158,6 +185,10 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
for (ApplicationId appId : sampleAppIds) {
fs.delete(getTestRootPath(appId.toString()), true);
}
+ if (testJar != null) {
+ testJar.delete();
+ rootDir.delete();
+ }
}
@AfterClass
@@ -302,6 +333,21 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
assertEquals(EntityGroupPlugInForTest.class.getName(),
store.getConfig().get(
YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES));
+ List currPlugins = store.getPlugins();
+ for (TimelineEntityGroupPlugin plugin : currPlugins) {
+ ClassLoader pluginClassLoader = plugin.getClass().getClassLoader();
+ assertTrue("Should set up ApplicationClassLoader",
+ pluginClassLoader instanceof ApplicationClassLoader);
+ URL[] paths = ((URLClassLoader) pluginClassLoader).getURLs();
+ boolean foundJAR = false;
+ for (URL path : paths) {
+ if (path.toString().contains(testJar.getAbsolutePath())) {
+ foundJAR = true;
+ }
+ }
+ assertTrue("Not found path " + testJar.getAbsolutePath()
+ + " for plugin " + plugin.getClass().getName(), foundJAR);
+ }
// Load data and cache item, prepare timeline store by making a cache item
EntityGroupFSTimelineStore.AppLogs appLogs =
store.new AppLogs(mainTestAppId, mainTestAppDirPath,