YARN-5233. Support for specifying a path for ATS plugin jars. Contributed by Li Lu

This commit is contained in:
Jian He 2016-07-06 16:44:46 -07:00
parent 4c9e1aeb94
commit 8a9d293dd6
4 changed files with 131 additions and 9 deletions

View File

@ -1824,6 +1824,14 @@ public class YarnConfiguration extends Configuration {
public static final String TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES =
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "group-id-plugin-classes";
public static final String TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSPATH =
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX
+ "group-id-plugin-classpath";
public static final String TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_SYSTEM_CLASSES
= TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX
+ "group-id-plugin-system-classes";
public static final String
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_STORE =
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "summary-store";

View File

@ -2084,6 +2084,15 @@
</description>
</property>
<property>
<name>yarn.timeline-service.entity-group-fs-store.group-id-plugin-classpath</name>
<value></value>
<description>
Classpath for all plugins defined in
yarn.timeline-service.entity-group-fs-store.group-id-plugin-classes.
</description>
</property>
<property>
<name>yarn.timeline-service.entity-group-fs-store.summary-store</name>
<description>Summary storage for ATS v1.5</description>

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.service.ServiceOperations;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.util.ApplicationClassLoader;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -48,7 +49,6 @@ import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl;
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.map.MappingJsonFactory;
import org.codehaus.jackson.map.ObjectMapper;
@ -58,7 +58,12 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.MalformedURLException;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
@ -217,25 +222,47 @@ public class EntityGroupFSTimelineStore extends CompositeService
throws RuntimeException {
Collection<String> pluginNames = conf.getTrimmedStringCollection(
YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES);
String pluginClasspath = conf.getTrimmed(
YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSPATH);
String[] systemClasses = conf.getTrimmedStrings(
YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_SYSTEM_CLASSES);
List<TimelineEntityGroupPlugin> pluginList
= new LinkedList<TimelineEntityGroupPlugin>();
Exception caught = null;
ClassLoader customClassLoader = null;
if (pluginClasspath != null && pluginClasspath.length() > 0) {
try {
customClassLoader = createPluginClassLoader(pluginClasspath,
systemClasses);
} catch (IOException ioe) {
LOG.warn("Error loading classloader", ioe);
}
}
for (final String name : pluginNames) {
LOG.debug("Trying to load plugin class {}", name);
TimelineEntityGroupPlugin cacheIdPlugin = null;
try {
Class<?> clazz = conf.getClassByName(name);
cacheIdPlugin =
(TimelineEntityGroupPlugin) ReflectionUtils.newInstance(
clazz, conf);
if (customClassLoader != null) {
LOG.debug("Load plugin {} with classpath: {}", name, pluginClasspath);
Class<?> clazz = Class.forName(name, true, customClassLoader);
Class<? extends TimelineEntityGroupPlugin> sClass = clazz.asSubclass(
TimelineEntityGroupPlugin.class);
cacheIdPlugin = ReflectionUtils.newInstance(sClass, conf);
} else {
LOG.debug("Load plugin class with system classpath");
Class<?> clazz = conf.getClassByName(name);
cacheIdPlugin =
(TimelineEntityGroupPlugin) ReflectionUtils.newInstance(
clazz, conf);
}
} catch (Exception e) {
LOG.warn("Error loading plugin " + name, e);
caught = e;
throw new RuntimeException("No class defined for " + name, e);
}
if (cacheIdPlugin == null) {
throw new RuntimeException("No class defined for " + name, caught);
}
LOG.info("Load plugin class {}", cacheIdPlugin.getClass().getName());
pluginList.add(cacheIdPlugin);
}
@ -489,6 +516,29 @@ public class EntityGroupFSTimelineStore extends CompositeService
return appId;
}
private static ClassLoader createPluginClassLoader(
final String appClasspath, final String[] systemClasses)
throws IOException {
try {
return AccessController.doPrivileged(
new PrivilegedExceptionAction<ClassLoader>() {
@Override
public ClassLoader run() throws MalformedURLException {
return new ApplicationClassLoader(appClasspath,
EntityGroupFSTimelineStore.class.getClassLoader(),
Arrays.asList(systemClasses));
}
}
);
} catch (PrivilegedActionException e) {
Throwable t = e.getCause();
if (t instanceof MalformedURLException) {
throw (MalformedURLException) t;
}
throw new IOException(e);
}
}
private Path getActiveAppPath(ApplicationId appId) {
return new Path(activeRootPath, appId.toString());
}
@ -531,6 +581,15 @@ public class EntityGroupFSTimelineStore extends CompositeService
return getAppState(appId, yarnClient);
}
/**
* Get all plugins for tests.
* @return all plugins
*/
@VisibleForTesting
List<TimelineEntityGroupPlugin> getPlugins() {
return cacheIdPlugins;
}
/**
* Ask the RM for the state of the application.
* This method has to be synchronized to control traffic to RM

View File

@ -29,6 +29,9 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableStat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.ApplicationClassLoader;
import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
@ -43,7 +46,10 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
@ -97,6 +103,8 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
@Rule
public TestName currTestName = new TestName();
private File rootDir;
private File testJar;
@BeforeClass
public static void setupClass() throws Exception {
@ -144,6 +152,25 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
store = new EntityGroupFSTimelineStoreForTest();
if (currTestName.getMethodName().contains("Plugin")) {
rootDir = GenericTestUtils.getTestDir(getClass()
.getSimpleName());
if (!rootDir.exists()) {
rootDir.mkdirs();
}
testJar = null;
testJar = JarFinder.makeClassLoaderTestJar(this.getClass(), rootDir,
"test-runjar.jar", 2048,
EntityGroupPlugInForTest.class.getName());
config.set(
YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSPATH,
testJar.getAbsolutePath());
// add "-org.apache.hadoop." as system classes
String systemClasses = "-org.apache.hadoop." + "," +
ApplicationClassLoader.SYSTEM_CLASSES_DEFAULT;
config.set(
YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_SYSTEM_CLASSES,
systemClasses);
config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES,
EntityGroupPlugInForTest.class.getName());
}
@ -158,6 +185,10 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
for (ApplicationId appId : sampleAppIds) {
fs.delete(getTestRootPath(appId.toString()), true);
}
if (testJar != null) {
testJar.delete();
rootDir.delete();
}
}
@AfterClass
@ -302,6 +333,21 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
assertEquals(EntityGroupPlugInForTest.class.getName(),
store.getConfig().get(
YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES));
List<TimelineEntityGroupPlugin> currPlugins = store.getPlugins();
for (TimelineEntityGroupPlugin plugin : currPlugins) {
ClassLoader pluginClassLoader = plugin.getClass().getClassLoader();
assertTrue("Should set up ApplicationClassLoader",
pluginClassLoader instanceof ApplicationClassLoader);
URL[] paths = ((URLClassLoader) pluginClassLoader).getURLs();
boolean foundJAR = false;
for (URL path : paths) {
if (path.toString().contains(testJar.getAbsolutePath())) {
foundJAR = true;
}
}
assertTrue("Not found path " + testJar.getAbsolutePath()
+ " for plugin " + plugin.getClass().getName(), foundJAR);
}
// Load data and cache item, prepare timeline store by making a cache item
EntityGroupFSTimelineStore.AppLogs appLogs =
store.new AppLogs(mainTestAppId, mainTestAppDirPath,