YARN-1151. Ability to configure auxiliary services from HDFS-based JAR files. (Xuan Gong via wangda)

Change-Id: Ied37ff11e507fc86847753ba79486652c8fadfe9
This commit is contained in:
Wangda Tan 2018-04-06 21:25:57 -07:00
parent d4e63ccca0
commit 00ebec89f1
4 changed files with 313 additions and 20 deletions

View File

@ -2106,6 +2106,9 @@ public class YarnConfiguration extends Configuration {
public static final String NM_AUX_SERVICES_CLASSPATH =
NM_AUX_SERVICES + ".%s.classpath";
public static final String NM_AUX_SERVICE_REMOTE_CLASSPATH =
NM_AUX_SERVICES + ".%s.remote-classpath";
public static final String NM_AUX_SERVICES_SYSTEM_CLASSES =
NM_AUX_SERVICES + ".%s.system-classes";

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
@ -29,45 +31,70 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.service.ServiceStateChangeListener;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryLocalPathHandler;
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
import org.apache.hadoop.yarn.util.FSDownload;
import com.google.common.base.Preconditions;
public class AuxServices extends AbstractService
implements ServiceStateChangeListener, EventHandler<AuxServicesEvent> {
public static final String NM_AUX_SERVICE_DIR = "nmAuxService";
public static final FsPermission NM_AUX_SERVICE_DIR_PERM =
new FsPermission((short) 0700);
static final String STATE_STORE_ROOT_NAME = "nm-aux-services";
private static final Logger LOG =
LoggerFactory.getLogger(AuxServices.class);
private static final String DEL_SUFFIX = "_DEL_";
protected final Map<String,AuxiliaryService> serviceMap;
protected final Map<String,ByteBuffer> serviceMetaData;
private final AuxiliaryLocalPathHandler auxiliaryLocalPathHandler;
private final LocalDirsHandlerService dirsHandler;
private final DeletionService delService;
private final UserGroupInformation userUGI;
private final Pattern p = Pattern.compile("^[A-Za-z_]+[A-Za-z0-9_]*$");
public AuxServices(AuxiliaryLocalPathHandler auxiliaryLocalPathHandler) {
public AuxServices(AuxiliaryLocalPathHandler auxiliaryLocalPathHandler,
Context nmContext, DeletionService deletionService) {
super(AuxServices.class.getName());
serviceMap =
Collections.synchronizedMap(new HashMap<String,AuxiliaryService>());
serviceMetaData =
Collections.synchronizedMap(new HashMap<String,ByteBuffer>());
this.auxiliaryLocalPathHandler = auxiliaryLocalPathHandler;
this.dirsHandler = nmContext.getLocalDirsHandler();
this.delService = deletionService;
this.userUGI = getRemoteUgi();
// Obtain services from configuration in init()
}
@ -125,15 +152,103 @@ public class AuxServices extends AbstractService
String classKey = String.format(
YarnConfiguration.NM_AUX_SERVICE_FMT, sName);
String className = conf.get(classKey);
final String appClassPath = conf.get(String.format(
final String appLocalClassPath = conf.get(String.format(
YarnConfiguration.NM_AUX_SERVICES_CLASSPATH, sName));
final String appRemoteClassPath = conf.get(String.format(
YarnConfiguration.NM_AUX_SERVICE_REMOTE_CLASSPATH, sName));
AuxiliaryService s = null;
boolean useCustomerClassLoader = appClassPath != null
&& !appClassPath.isEmpty() && className != null
&& !className.isEmpty();
boolean useCustomerClassLoader = ((appLocalClassPath != null
&& !appLocalClassPath.isEmpty()) ||
(appRemoteClassPath != null && !appRemoteClassPath.isEmpty()))
&& className != null && !className.isEmpty();
if (useCustomerClassLoader) {
s = AuxiliaryServiceWithCustomClassLoader.getInstance(
conf, className, appClassPath);
// load AuxiliaryService from local class path
if (appRemoteClassPath == null || appRemoteClassPath.isEmpty()) {
s = AuxiliaryServiceWithCustomClassLoader.getInstance(
conf, className, appLocalClassPath);
} else {
// load AuxiliaryService from remote class path
if (appLocalClassPath != null && !appLocalClassPath.isEmpty()) {
throw new YarnRuntimeException("The aux serivce:" + sName
+ " has configured local classpath:" + appLocalClassPath
+ " and remote classpath:" + appRemoteClassPath
+ ". Only one of them should be configured.");
}
FileContext localLFS = getLocalFileContext(conf);
// create NM aux-service dir in NM localdir if it does not exist.
Path nmAuxDir = dirsHandler.getLocalPathForWrite("."
+ Path.SEPARATOR + NM_AUX_SERVICE_DIR);
if (!localLFS.util().exists(nmAuxDir)) {
try {
localLFS.mkdir(nmAuxDir, NM_AUX_SERVICE_DIR_PERM, true);
} catch (IOException ex) {
throw new YarnRuntimeException("Fail to create dir:"
+ nmAuxDir.toString(), ex);
}
}
Path src = new Path(appRemoteClassPath);
FileContext remoteLFS = getRemoteFileContext(src.toUri(), conf);
FileStatus scFileStatus = remoteLFS.getFileStatus(src);
if (!scFileStatus.getOwner().equals(
this.userUGI.getShortUserName())) {
throw new YarnRuntimeException("The remote jarfile owner:"
+ scFileStatus.getOwner() + " is not the same as the NM user:"
+ this.userUGI.getShortUserName() + ".");
}
if ((scFileStatus.getPermission().toShort() & 0022) != 0) {
throw new YarnRuntimeException("The remote jarfile should not "
+ "be writable by group or others. "
+ "The current Permission is "
+ scFileStatus.getPermission().toShort());
}
Path dest = null;
Path downloadDest = new Path(nmAuxDir,
className + "_" + scFileStatus.getModificationTime());
// check whether we need to re-download the jar
// from remote directory
Path targetDirPath = new Path(downloadDest,
scFileStatus.getPath().getName());
FileStatus[] allSubDirs = localLFS.util().listStatus(nmAuxDir);
boolean reDownload = true;
for (FileStatus sub : allSubDirs) {
if (sub.getPath().getName().equals(downloadDest.getName())) {
reDownload = false;
dest = new Path(targetDirPath + Path.SEPARATOR + "*");
break;
} else {
if (sub.getPath().getName().contains(className) &&
!sub.getPath().getName().endsWith(DEL_SUFFIX)) {
Path delPath = new Path(sub.getPath().getParent(),
sub.getPath().getName() + DEL_SUFFIX);
localLFS.rename(sub.getPath(), delPath);
LOG.info("delete old aux service jar dir:"
+ delPath.toString());
FileDeletionTask deletionTask = new FileDeletionTask(
this.delService, null, delPath, null);
this.delService.delete(deletionTask);
}
}
}
if (reDownload) {
LocalResource scRsrc = LocalResource.newInstance(
URL.fromURI(src.toUri()),
LocalResourceType.ARCHIVE, LocalResourceVisibility.PRIVATE,
scFileStatus.getLen(), scFileStatus.getModificationTime());
FSDownload download = new FSDownload(localLFS, null, conf,
downloadDest, scRsrc, null);
try {
Path downloaded = download.call();
dest = new Path(downloaded + Path.SEPARATOR + "*");
} catch (Exception ex) {
throw new YarnRuntimeException(
"Exception happend while downloading files "
+ "for aux-service:" + sName + " and remote-file-path:"
+ src + ".\n" + ex.getMessage());
}
}
s = AuxiliaryServiceWithCustomClassLoader.getInstance(
conf, className, dest.toString());
}
LOG.info("The aux service:" + sName
+ " are using the custom classloader");
} else {
@ -289,4 +404,33 @@ public class AuxServices extends AbstractService
: "The auxService name is " + service.getName())
+ " and it got an error at event: " + eventType, th);
}
FileContext getLocalFileContext(Configuration conf) {
try {
return FileContext.getLocalFSFileContext(conf);
} catch (IOException e) {
throw new YarnRuntimeException("Failed to access local fs");
}
}
FileContext getRemoteFileContext(final URI path, Configuration conf) {
try {
return FileContext.getFileContext(path, conf);
} catch (IOException e) {
throw new YarnRuntimeException("Failed to access remote fs");
}
}
private UserGroupInformation getRemoteUgi() {
UserGroupInformation remoteUgi;
try {
remoteUgi = UserGroupInformation.getCurrentUser();
} catch (IOException e) {
String msg = "Cannot obtain the user-name. Got exception: "
+ StringUtils.stringifyException(e);
LOG.warn(msg);
throw new YarnRuntimeException(msg);
}
return remoteUgi;
}
}

View File

@ -253,7 +253,8 @@ public class ContainerManagerImpl extends CompositeService implements
AuxiliaryLocalPathHandler auxiliaryLocalPathHandler =
new AuxiliaryLocalPathHandlerImpl(dirsHandler);
// Start configurable services
auxiliaryServices = new AuxServices(auxiliaryLocalPathHandler);
auxiliaryServices = new AuxServices(auxiliaryLocalPathHandler,
this.context, this.deletionService);
auxiliaryServices.registerServiceListener(this);
addService(auxiliaryServices);

View File

@ -25,8 +25,12 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.mockito.Mockito;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -37,6 +41,10 @@ import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.attribute.FileTime;
import java.nio.file.attribute.PosixFilePermission;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -46,6 +54,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
@ -61,6 +70,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
@ -69,8 +79,11 @@ import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
import org.junit.Assert;
import org.junit.Test;
@ -82,7 +95,10 @@ public class TestAuxServices {
System.getProperty("java.io.tmpdir")),
TestAuxServices.class.getName());
private final static AuxiliaryLocalPathHandler MOCK_AUX_PATH_HANDLER =
Mockito.mock(AuxiliaryLocalPathHandler.class);
mock(AuxiliaryLocalPathHandler.class);
private final static Context MOCK_CONTEXT = mock(Context.class);
private final static DeletionService MOCK_DEL_SERVICE = mock(
DeletionService.class);
static class LightService extends AuxiliaryService implements Service
{
@ -188,6 +204,126 @@ public class TestAuxServices {
}
}
@SuppressWarnings("resource")
@Test
public void testRemoteAuxServiceClassPath() throws Exception {
Configuration conf = new YarnConfiguration();
FileSystem fs = FileSystem.get(conf);
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
new String[] {"ServiceC"});
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
"ServiceC"), ServiceC.class, Service.class);
Context mockContext2 = mock(Context.class);
LocalDirsHandlerService mockDirsHandler = mock(
LocalDirsHandlerService.class);
String root = "target/LocalDir";
Path rootAuxServiceDirPath = new Path(root, "nmAuxService");
when(mockDirsHandler.getLocalPathForWrite(anyString())).thenReturn(
rootAuxServiceDirPath);
when(mockContext2.getLocalDirsHandler()).thenReturn(mockDirsHandler);
File rootDir = GenericTestUtils.getTestDir(getClass()
.getSimpleName());
if (!rootDir.exists()) {
rootDir.mkdirs();
}
AuxServices aux = null;
File testJar = null;
try {
// the remote jar file should not be be writable by group or others.
try {
testJar = JarFinder.makeClassLoaderTestJar(this.getClass(), rootDir,
"test-runjar.jar", 2048, ServiceC.class.getName());
// Give group a write permission.
// We should not load the auxservice from remote jar file.
Set<PosixFilePermission> perms = new HashSet<PosixFilePermission>();
perms.add(PosixFilePermission.OWNER_READ);
perms.add(PosixFilePermission.OWNER_WRITE);
perms.add(PosixFilePermission.GROUP_WRITE);
Files.setPosixFilePermissions(Paths.get(testJar.getAbsolutePath()),
perms);
conf.set(String.format(
YarnConfiguration.NM_AUX_SERVICE_REMOTE_CLASSPATH, "ServiceC"),
testJar.getAbsolutePath());
aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
mockContext2, MOCK_DEL_SERVICE);
aux.init(conf);
Assert.fail("The permission of the jar is wrong."
+ "Should throw out exception.");
} catch (YarnRuntimeException ex) {
Assert.assertTrue(ex.getMessage(), ex.getMessage().contains(
"The remote jarfile should not be writable by group or others"));
}
Files.delete(Paths.get(testJar.getAbsolutePath()));
testJar = JarFinder.makeClassLoaderTestJar(this.getClass(), rootDir,
"test-runjar.jar", 2048, ServiceC.class.getName());
conf.set(String.format(
YarnConfiguration.NM_AUX_SERVICE_REMOTE_CLASSPATH, "ServiceC"),
testJar.getAbsolutePath());
aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
mockContext2, MOCK_DEL_SERVICE);
aux.init(conf);
aux.start();
Map<String, ByteBuffer> meta = aux.getMetaData();
String auxName = "";
Assert.assertTrue(meta.size() == 1);
for(Entry<String, ByteBuffer> i : meta.entrySet()) {
auxName = i.getKey();
}
Assert.assertEquals("ServiceC", auxName);
aux.serviceStop();
FileStatus[] status = fs.listStatus(rootAuxServiceDirPath);
Assert.assertTrue(status.length == 1);
// initialize the same auxservice again, and make sure that we did not
// re-download the jar from remote directory.
aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
mockContext2, MOCK_DEL_SERVICE);
aux.init(conf);
aux.start();
meta = aux.getMetaData();
Assert.assertTrue(meta.size() == 1);
for(Entry<String, ByteBuffer> i : meta.entrySet()) {
auxName = i.getKey();
}
Assert.assertEquals("ServiceC", auxName);
verify(MOCK_DEL_SERVICE, times(0)).delete(any(FileDeletionTask.class));
status = fs.listStatus(rootAuxServiceDirPath);
Assert.assertTrue(status.length == 1);
aux.serviceStop();
// change the last modification time for remote jar,
// we will re-download the jar and clean the old jar
long time = System.currentTimeMillis() + 3600*1000;
FileTime fileTime = FileTime.fromMillis(time);
Files.setLastModifiedTime(Paths.get(testJar.getAbsolutePath()),
fileTime);
conf.set(
String.format(YarnConfiguration.NM_AUX_SERVICE_REMOTE_CLASSPATH,
"ServiceC"),
testJar.getAbsolutePath());
aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
mockContext2, MOCK_DEL_SERVICE);
aux.init(conf);
aux.start();
verify(MOCK_DEL_SERVICE, times(1)).delete(any(FileDeletionTask.class));
status = fs.listStatus(rootAuxServiceDirPath);
Assert.assertTrue(status.length == 2);
aux.serviceStop();
} finally {
if (testJar != null) {
testJar.delete();
rootDir.delete();
}
if (fs.exists(new Path(root))) {
fs.delete(new Path(root), true);
}
}
}
// To verify whether we could load class from customized class path.
// We would use ServiceC in this test. Also create a separate jar file
// including ServiceC class, and add this jar to customized directory.
@ -202,7 +338,8 @@ public class TestAuxServices {
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
"ServiceC"), ServiceC.class, Service.class);
@SuppressWarnings("resource")
AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER);
AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
MOCK_CONTEXT, MOCK_DEL_SERVICE);
aux.init(conf);
aux.start();
Map<String, ByteBuffer> meta = aux.getMetaData();
@ -244,7 +381,8 @@ public class TestAuxServices {
conf.set(String.format(
YarnConfiguration.NM_AUX_SERVICES_SYSTEM_CLASSES,
"ServiceC"), systemClasses);
aux = new AuxServices(MOCK_AUX_PATH_HANDLER);
aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
MOCK_CONTEXT, MOCK_DEL_SERVICE);
aux.init(conf);
aux.start();
meta = aux.getMetaData();
@ -282,7 +420,8 @@ public class TestAuxServices {
ServiceB.class, Service.class);
conf.setInt("A.expected.init", 1);
conf.setInt("B.expected.stop", 1);
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER);
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
MOCK_CONTEXT, MOCK_DEL_SERVICE);
aux.init(conf);
aux.start();
@ -346,7 +485,8 @@ public class TestAuxServices {
ServiceA.class, Service.class);
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"),
ServiceB.class, Service.class);
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER);
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
MOCK_CONTEXT, MOCK_DEL_SERVICE);
aux.init(conf);
int latch = 1;
@ -379,7 +519,8 @@ public class TestAuxServices {
ServiceA.class, Service.class);
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"),
ServiceB.class, Service.class);
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER);
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
MOCK_CONTEXT, MOCK_DEL_SERVICE);
aux.init(conf);
int latch = 1;
@ -416,7 +557,8 @@ public class TestAuxServices {
ServiceA.class, Service.class);
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"),
ServiceB.class, Service.class);
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER);
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
MOCK_CONTEXT, MOCK_DEL_SERVICE);
aux.init(conf);
aux.start();
@ -429,7 +571,8 @@ public class TestAuxServices {
@Test
public void testValidAuxServiceName() {
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER);
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
MOCK_CONTEXT, MOCK_DEL_SERVICE);
Configuration conf = new Configuration();
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[] {"Asrv1", "Bsrv_2"});
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Asrv1"),
@ -443,7 +586,8 @@ public class TestAuxServices {
}
//Test bad auxService Name
final AuxServices aux1 = new AuxServices(MOCK_AUX_PATH_HANDLER);
final AuxServices aux1 = new AuxServices(MOCK_AUX_PATH_HANDLER,
MOCK_CONTEXT, MOCK_DEL_SERVICE);
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[] {"1Asrv1"});
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "1Asrv1"),
ServiceA.class, Service.class);
@ -469,7 +613,8 @@ public class TestAuxServices {
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"),
RecoverableServiceB.class, Service.class);
try {
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER);
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
MOCK_CONTEXT, MOCK_DEL_SERVICE);
aux.init(conf);
Assert.assertEquals(2, aux.getServices().size());
File auxStorageDir = new File(TEST_DIR,