YARN-4577. Enable aux services to have their own custom classpath/jar file (Xuan Gong via sale)
This commit is contained in:
parent
3f2816ab5a
commit
0bbe01f8d5
|
@ -14,7 +14,7 @@
|
|||
package org.apache.hadoop.util;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
|
@ -173,4 +173,28 @@ public class JarFinder {
|
|||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public static File makeClassLoaderTestJar(Class<?> target, File rootDir,
|
||||
String jarName, int buffSize, String... clsNames) throws IOException {
|
||||
File jarFile = new File(rootDir, jarName);
|
||||
JarOutputStream jstream =
|
||||
new JarOutputStream(new FileOutputStream(jarFile));
|
||||
for (String clsName: clsNames) {
|
||||
String name = clsName.replace('.', '/') + ".class";
|
||||
InputStream entryInputStream = target.getResourceAsStream(
|
||||
"/" + name);
|
||||
ZipEntry entry = new ZipEntry(name);
|
||||
jstream.putNextEntry(entry);
|
||||
BufferedInputStream bufInputStream = new BufferedInputStream(
|
||||
entryInputStream, buffSize);
|
||||
int count;
|
||||
byte[] data = new byte[buffSize];
|
||||
while ((count = bufInputStream.read(data, 0, buffSize)) != -1) {
|
||||
jstream.write(data, 0, count);
|
||||
}
|
||||
jstream.closeEntry();
|
||||
}
|
||||
jstream.close();
|
||||
return jarFile;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,11 +23,9 @@ import static org.junit.Assert.assertTrue;
|
|||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.jar.JarOutputStream;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.zip.ZipEntry;
|
||||
|
@ -156,7 +154,8 @@ public class TestRunJar {
|
|||
when(runJar.getSystemClasses()).thenReturn(systemClasses);
|
||||
|
||||
// create the test jar
|
||||
File testJar = makeClassLoaderTestJar(mainCls, thirdCls);
|
||||
File testJar = JarFinder.makeClassLoaderTestJar(this.getClass(),
|
||||
TEST_ROOT_DIR, TEST_JAR_2_NAME, BUFF_SIZE, mainCls, thirdCls);
|
||||
// form the args
|
||||
String[] args = new String[3];
|
||||
args[0] = testJar.getAbsolutePath();
|
||||
|
@ -166,28 +165,4 @@ public class TestRunJar {
|
|||
runJar.run(args);
|
||||
// it should not throw an exception
|
||||
}
|
||||
|
||||
private File makeClassLoaderTestJar(String... clsNames) throws IOException {
|
||||
File jarFile = new File(TEST_ROOT_DIR, TEST_JAR_2_NAME);
|
||||
JarOutputStream jstream =
|
||||
new JarOutputStream(new FileOutputStream(jarFile));
|
||||
for (String clsName: clsNames) {
|
||||
String name = clsName.replace('.', '/') + ".class";
|
||||
InputStream entryInputStream = this.getClass().getResourceAsStream(
|
||||
"/" + name);
|
||||
ZipEntry entry = new ZipEntry(name);
|
||||
jstream.putNextEntry(entry);
|
||||
BufferedInputStream bufInputStream = new BufferedInputStream(
|
||||
entryInputStream, BUFF_SIZE);
|
||||
int count;
|
||||
byte[] data = new byte[BUFF_SIZE];
|
||||
while ((count = bufInputStream.read(data, 0, BUFF_SIZE)) != -1) {
|
||||
jstream.write(data, 0, count);
|
||||
}
|
||||
jstream.closeEntry();
|
||||
}
|
||||
jstream.close();
|
||||
|
||||
return jarFile;
|
||||
}
|
||||
}
|
|
@ -1451,10 +1451,16 @@ public class YarnConfiguration extends Configuration {
|
|||
NM_PREFIX + "principal";
|
||||
|
||||
public static final String NM_AUX_SERVICES =
|
||||
NM_PREFIX + "aux-services";
|
||||
NM_PREFIX + "aux-services";
|
||||
|
||||
public static final String NM_AUX_SERVICE_FMT =
|
||||
NM_PREFIX + "aux-services.%s.class";
|
||||
NM_PREFIX + "aux-services.%s.class";
|
||||
|
||||
public static final String NM_AUX_SERVICES_CLASSPATH =
|
||||
NM_AUX_SERVICES + ".%s.classpath";
|
||||
|
||||
public static final String NM_AUX_SERVICES_SYSTEM_CLASSES =
|
||||
NM_AUX_SERVICES + ".%s.system-classes";
|
||||
|
||||
public static final String NM_USER_HOME_DIR =
|
||||
NM_PREFIX + "user-home-dir";
|
||||
|
|
|
@ -118,21 +118,40 @@ public class AuxServices extends AbstractService
|
|||
YarnConfiguration.NM_AUX_SERVICES +" is invalid." +
|
||||
"The valid service name should only contain a-zA-Z0-9_ " +
|
||||
"and can not start with numbers");
|
||||
Class<? extends AuxiliaryService> sClass = conf.getClass(
|
||||
String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, sName), null,
|
||||
AuxiliaryService.class);
|
||||
if (null == sClass) {
|
||||
throw new RuntimeException("No class defined for " + sName);
|
||||
String classKey = String.format(
|
||||
YarnConfiguration.NM_AUX_SERVICE_FMT, sName);
|
||||
String className = conf.get(classKey);
|
||||
final String appClassPath = conf.get(String.format(
|
||||
YarnConfiguration.NM_AUX_SERVICES_CLASSPATH, sName));
|
||||
AuxiliaryService s = null;
|
||||
boolean useCustomerClassLoader = appClassPath != null
|
||||
&& !appClassPath.isEmpty() && className != null
|
||||
&& !className.isEmpty();
|
||||
if (useCustomerClassLoader) {
|
||||
s = AuxiliaryServiceWithCustomClassLoader.getInstance(
|
||||
conf, className, appClassPath);
|
||||
LOG.info("The aux service:" + sName
|
||||
+ " are using the custom classloader");
|
||||
} else {
|
||||
Class<? extends AuxiliaryService> sClass = conf.getClass(
|
||||
classKey, null, AuxiliaryService.class);
|
||||
|
||||
if (sClass == null) {
|
||||
throw new RuntimeException("No class defined for " + sName);
|
||||
}
|
||||
s = ReflectionUtils.newInstance(sClass, conf);
|
||||
}
|
||||
if (s == null) {
|
||||
throw new RuntimeException("No object created for " + sName);
|
||||
}
|
||||
AuxiliaryService s = ReflectionUtils.newInstance(sClass, conf);
|
||||
// TODO better use s.getName()?
|
||||
if(!sName.equals(s.getName())) {
|
||||
LOG.warn("The Auxilurary Service named '"+sName+"' in the "
|
||||
+"configuration is for "+sClass+" which has "
|
||||
+"a name of '"+s.getName()+"'. Because these are "
|
||||
+"not the same tools trying to send ServiceData and read "
|
||||
+"Service Meta Data may have issues unless the refer to "
|
||||
+"the name in the config.");
|
||||
+"configuration is for "+s.getClass()+" which has "
|
||||
+"a name of '"+s.getName()+"'. Because these are "
|
||||
+"not the same tools trying to send ServiceData and read "
|
||||
+"Service Meta Data may have issues unless the refer to "
|
||||
+"the name in the config.");
|
||||
}
|
||||
addService(sName, s);
|
||||
if (recoveryEnabled) {
|
||||
|
|
|
@ -0,0 +1,201 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.MalformedURLException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.security.AccessController;
|
||||
import java.security.PrivilegedActionException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.util.ApplicationClassLoader;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
|
||||
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
|
||||
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
|
||||
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
|
||||
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
|
||||
|
||||
final class AuxiliaryServiceWithCustomClassLoader extends AuxiliaryService {
|
||||
|
||||
private final AuxiliaryService wrapped;
|
||||
private final ClassLoader customClassLoader;
|
||||
|
||||
private AuxiliaryServiceWithCustomClassLoader(String name,
|
||||
AuxiliaryService wrapped, ClassLoader customClassLoader) {
|
||||
super(name);
|
||||
this.wrapped = wrapped;
|
||||
this.customClassLoader = customClassLoader;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceInit(Configuration conf) throws Exception {
|
||||
// We pass a shared configuration as part of serviceInit call.
|
||||
// To avoid the scenario that we could get a ClassNotFoundException
|
||||
// when we use customClassLoader to load the class, we create a copy
|
||||
// of the configuration.
|
||||
Configuration config = new Configuration(conf);
|
||||
// reset the service configuration
|
||||
setConfig(config);
|
||||
config.setClassLoader(customClassLoader);
|
||||
ClassLoader original = Thread.currentThread().getContextClassLoader();
|
||||
Thread.currentThread().setContextClassLoader(customClassLoader);
|
||||
try {
|
||||
wrapped.init(config);
|
||||
} finally {
|
||||
Thread.currentThread().setContextClassLoader(original);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
ClassLoader original = Thread.currentThread().getContextClassLoader();
|
||||
Thread.currentThread().setContextClassLoader(customClassLoader);
|
||||
try {
|
||||
wrapped.start();
|
||||
} finally {
|
||||
Thread.currentThread().setContextClassLoader(original);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStop() throws Exception {
|
||||
ClassLoader original = Thread.currentThread().getContextClassLoader();
|
||||
Thread.currentThread().setContextClassLoader(customClassLoader);
|
||||
try {
|
||||
wrapped.stop();
|
||||
} finally {
|
||||
Thread.currentThread().setContextClassLoader(original);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initializeApplication(
|
||||
ApplicationInitializationContext initAppContext) {
|
||||
ClassLoader original = Thread.currentThread().getContextClassLoader();
|
||||
Thread.currentThread().setContextClassLoader(customClassLoader);
|
||||
try {
|
||||
wrapped.initializeApplication(initAppContext);
|
||||
} finally {
|
||||
Thread.currentThread().setContextClassLoader(original);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stopApplication(ApplicationTerminationContext stopAppContext) {
|
||||
ClassLoader original = Thread.currentThread().getContextClassLoader();
|
||||
Thread.currentThread().setContextClassLoader(customClassLoader);
|
||||
try {
|
||||
wrapped.stopApplication(stopAppContext);
|
||||
} finally {
|
||||
Thread.currentThread().setContextClassLoader(original);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer getMetaData() {
|
||||
ClassLoader original = Thread.currentThread().getContextClassLoader();
|
||||
Thread.currentThread().setContextClassLoader(customClassLoader);
|
||||
try {
|
||||
return wrapped.getMetaData();
|
||||
} finally {
|
||||
Thread.currentThread().setContextClassLoader(original);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initializeContainer(ContainerInitializationContext
|
||||
initContainerContext) {
|
||||
ClassLoader original = Thread.currentThread().getContextClassLoader();
|
||||
Thread.currentThread().setContextClassLoader(customClassLoader);
|
||||
try {
|
||||
wrapped.initializeContainer(initContainerContext);
|
||||
} finally {
|
||||
Thread.currentThread().setContextClassLoader(original);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stopContainer(ContainerTerminationContext stopContainerContext) {
|
||||
ClassLoader original = Thread.currentThread().getContextClassLoader();
|
||||
Thread.currentThread().setContextClassLoader(customClassLoader);
|
||||
try {
|
||||
wrapped.stopContainer(stopContainerContext);
|
||||
} finally {
|
||||
Thread.currentThread().setContextClassLoader(original);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRecoveryPath(Path recoveryPath) {
|
||||
ClassLoader original = Thread.currentThread().getContextClassLoader();
|
||||
Thread.currentThread().setContextClassLoader(customClassLoader);
|
||||
try {
|
||||
wrapped.setRecoveryPath(recoveryPath);
|
||||
} finally {
|
||||
Thread.currentThread().setContextClassLoader(original);
|
||||
}
|
||||
}
|
||||
|
||||
public static AuxiliaryServiceWithCustomClassLoader getInstance(
|
||||
Configuration conf, String className, String appClassPath)
|
||||
throws IOException, ClassNotFoundException {
|
||||
String[] systemClasses = conf.getTrimmedStrings(String.format(
|
||||
YarnConfiguration.NM_AUX_SERVICES_SYSTEM_CLASSES,
|
||||
className));
|
||||
ClassLoader customClassLoader = createAuxServiceClassLoader(
|
||||
appClassPath, systemClasses);
|
||||
Class<?> clazz = Class.forName(className, true,
|
||||
customClassLoader);
|
||||
Class<? extends AuxiliaryService> sClass = clazz.asSubclass(
|
||||
AuxiliaryService.class);
|
||||
AuxiliaryService wrapped = ReflectionUtils.newInstance(sClass, conf);
|
||||
return new AuxiliaryServiceWithCustomClassLoader(
|
||||
className + " with custom class loader", wrapped,
|
||||
customClassLoader);
|
||||
}
|
||||
|
||||
private static ClassLoader createAuxServiceClassLoader(
|
||||
final String appClasspath, final String[] systemClasses)
|
||||
throws IOException {
|
||||
try {
|
||||
return AccessController.doPrivileged(
|
||||
new PrivilegedExceptionAction<ClassLoader>() {
|
||||
@Override
|
||||
public ClassLoader run() throws MalformedURLException {
|
||||
return new ApplicationClassLoader(appClasspath,
|
||||
AuxServices.class.getClassLoader(),
|
||||
Arrays.asList(systemClasses));
|
||||
}
|
||||
}
|
||||
);
|
||||
} catch (PrivilegedActionException e) {
|
||||
Throwable t = e.getCause();
|
||||
if (t instanceof MalformedURLException) {
|
||||
throw (MalformedURLException) t;
|
||||
}
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -27,13 +27,21 @@ import static org.junit.Assert.assertTrue;
|
|||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.collect.Sets;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URL;
|
||||
import java.net.URLClassLoader;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -42,6 +50,10 @@ import org.apache.hadoop.fs.FileUtil;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.service.Service;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.util.ApplicationClassLoader;
|
||||
import org.apache.hadoop.util.JarFinder;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
|
@ -150,6 +162,111 @@ public class TestAuxServices {
|
|||
}
|
||||
}
|
||||
|
||||
// Override getMetaData() method to return current
|
||||
// class path. This class would be used for
|
||||
// testCustomizedAuxServiceClassPath.
|
||||
static class ServiceC extends LightService {
|
||||
public ServiceC() {
|
||||
super("C", 'C', 66, ByteBuffer.wrap("C".getBytes()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer getMetaData() {
|
||||
ClassLoader loader = Thread.currentThread().getContextClassLoader();
|
||||
URL[] urls = ((URLClassLoader)loader).getURLs();
|
||||
List<String> urlString = new ArrayList<String>();
|
||||
for (URL url : urls) {
|
||||
urlString.add(url.toString());
|
||||
}
|
||||
String joinedString = StringUtils.join(",", urlString);
|
||||
return ByteBuffer.wrap(joinedString.getBytes());
|
||||
}
|
||||
}
|
||||
|
||||
// To verify whether we could load class from customized class path.
|
||||
// We would use ServiceC in this test. Also create a separate jar file
|
||||
// including ServiceC class, and add this jar to customized directory.
|
||||
// By setting some proper configurations, we should load ServiceC class
|
||||
// from customized class path.
|
||||
@Test (timeout = 15000)
|
||||
public void testCustomizedAuxServiceClassPath() throws Exception {
|
||||
// verify that we can load AuxService Class from default Class path
|
||||
Configuration conf = new YarnConfiguration();
|
||||
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
|
||||
new String[] {"ServiceC"});
|
||||
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
|
||||
"ServiceC"), ServiceC.class, Service.class);
|
||||
@SuppressWarnings("resource")
|
||||
AuxServices aux = new AuxServices();
|
||||
aux.init(conf);
|
||||
aux.start();
|
||||
Map<String, ByteBuffer> meta = aux.getMetaData();
|
||||
String auxName = "";
|
||||
Set<String> defaultAuxClassPath = null;
|
||||
Assert.assertTrue(meta.size() == 1);
|
||||
for(Entry<String, ByteBuffer> i : meta.entrySet()) {
|
||||
auxName = i.getKey();
|
||||
String auxClassPath = Charsets.UTF_8.decode(i.getValue()).toString();
|
||||
defaultAuxClassPath = new HashSet<String>(Arrays.asList(StringUtils
|
||||
.getTrimmedStrings(auxClassPath)));
|
||||
}
|
||||
Assert.assertTrue(auxName.equals("ServiceC"));
|
||||
aux.serviceStop();
|
||||
|
||||
// create a new jar file, and configure it as customized class path
|
||||
// for this AuxService, and make sure that we could load the class
|
||||
// from this configured customized class path
|
||||
File rootDir = GenericTestUtils.getTestDir(getClass()
|
||||
.getSimpleName());
|
||||
if (!rootDir.exists()) {
|
||||
rootDir.mkdirs();
|
||||
}
|
||||
File testJar = null;
|
||||
try {
|
||||
testJar = JarFinder.makeClassLoaderTestJar(this.getClass(), rootDir,
|
||||
"test-runjar.jar", 2048, ServiceC.class.getName());
|
||||
conf = new YarnConfiguration();
|
||||
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
|
||||
new String[] {"ServiceC"});
|
||||
conf.set(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "ServiceC"),
|
||||
ServiceC.class.getName());
|
||||
conf.set(String.format(
|
||||
YarnConfiguration.NM_AUX_SERVICES_CLASSPATH, "ServiceC"),
|
||||
testJar.getAbsolutePath());
|
||||
// remove "-org.apache.hadoop." from system classes
|
||||
String systemClasses = "-org.apache.hadoop." + "," +
|
||||
ApplicationClassLoader.SYSTEM_CLASSES_DEFAULT;
|
||||
conf.set(String.format(
|
||||
YarnConfiguration.NM_AUX_SERVICES_SYSTEM_CLASSES,
|
||||
"ServiceC"), systemClasses);
|
||||
aux = new AuxServices();
|
||||
aux.init(conf);
|
||||
aux.start();
|
||||
meta = aux.getMetaData();
|
||||
Assert.assertTrue(meta.size() == 1);
|
||||
Set<String> customizedAuxClassPath = null;
|
||||
for(Entry<String, ByteBuffer> i : meta.entrySet()) {
|
||||
Assert.assertTrue(auxName.equals(i.getKey()));
|
||||
String classPath = Charsets.UTF_8.decode(i.getValue()).toString();
|
||||
customizedAuxClassPath = new HashSet<String>(Arrays.asList(StringUtils
|
||||
.getTrimmedStrings(classPath)));
|
||||
Assert.assertTrue(classPath.contains(testJar.getName()));
|
||||
}
|
||||
aux.stop();
|
||||
|
||||
// Verify that we do not have any overlap between customized class path
|
||||
// and the default class path.
|
||||
Set<String> mutalClassPath = Sets.intersection(defaultAuxClassPath,
|
||||
customizedAuxClassPath);
|
||||
Assert.assertTrue(mutalClassPath.isEmpty());
|
||||
} finally {
|
||||
if (testJar != null) {
|
||||
testJar.delete();
|
||||
rootDir.delete();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAuxEventDispatch() {
|
||||
Configuration conf = new Configuration();
|
||||
|
|
Loading…
Reference in New Issue