YARN-1151. Ability to configure auxiliary services from HDFS-based JAR files. (Xuan Gong via wangda)
Change-Id: Ied37ff11e507fc86847753ba79486652c8fadfe9
This commit is contained in:
parent
077eda66ad
commit
c962371430
|
@ -2086,6 +2086,9 @@ public class YarnConfiguration extends Configuration {
|
||||||
public static final String NM_AUX_SERVICES_CLASSPATH =
|
public static final String NM_AUX_SERVICES_CLASSPATH =
|
||||||
NM_AUX_SERVICES + ".%s.classpath";
|
NM_AUX_SERVICES + ".%s.classpath";
|
||||||
|
|
||||||
|
public static final String NM_AUX_SERVICE_REMOTE_CLASSPATH =
|
||||||
|
NM_AUX_SERVICES + ".%s.remote-classpath";
|
||||||
|
|
||||||
public static final String NM_AUX_SERVICES_SYSTEM_CLASSES =
|
public static final String NM_AUX_SERVICES_SYSTEM_CLASSES =
|
||||||
NM_AUX_SERVICES + ".%s.system-classes";
|
NM_AUX_SERVICES + ".%s.system-classes";
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,8 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager;
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URI;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -29,45 +31,70 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileContext;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.service.AbstractService;
|
import org.apache.hadoop.service.AbstractService;
|
||||||
import org.apache.hadoop.service.Service;
|
import org.apache.hadoop.service.Service;
|
||||||
import org.apache.hadoop.service.ServiceStateChangeListener;
|
import org.apache.hadoop.service.ServiceStateChangeListener;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||||
|
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||||
|
import org.apache.hadoop.yarn.api.records.URL;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
|
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
|
||||||
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
|
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
|
||||||
import org.apache.hadoop.yarn.server.api.AuxiliaryLocalPathHandler;
|
import org.apache.hadoop.yarn.server.api.AuxiliaryLocalPathHandler;
|
||||||
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
|
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
|
||||||
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
|
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
|
||||||
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
|
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
|
||||||
|
import org.apache.hadoop.yarn.util.FSDownload;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
public class AuxServices extends AbstractService
|
public class AuxServices extends AbstractService
|
||||||
implements ServiceStateChangeListener, EventHandler<AuxServicesEvent> {
|
implements ServiceStateChangeListener, EventHandler<AuxServicesEvent> {
|
||||||
|
|
||||||
|
public static final String NM_AUX_SERVICE_DIR = "nmAuxService";
|
||||||
|
public static final FsPermission NM_AUX_SERVICE_DIR_PERM =
|
||||||
|
new FsPermission((short) 0700);
|
||||||
|
|
||||||
static final String STATE_STORE_ROOT_NAME = "nm-aux-services";
|
static final String STATE_STORE_ROOT_NAME = "nm-aux-services";
|
||||||
|
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
LoggerFactory.getLogger(AuxServices.class);
|
LoggerFactory.getLogger(AuxServices.class);
|
||||||
|
private static final String DEL_SUFFIX = "_DEL_";
|
||||||
|
|
||||||
protected final Map<String,AuxiliaryService> serviceMap;
|
protected final Map<String,AuxiliaryService> serviceMap;
|
||||||
protected final Map<String,ByteBuffer> serviceMetaData;
|
protected final Map<String,ByteBuffer> serviceMetaData;
|
||||||
private final AuxiliaryLocalPathHandler auxiliaryLocalPathHandler;
|
private final AuxiliaryLocalPathHandler auxiliaryLocalPathHandler;
|
||||||
|
private final LocalDirsHandlerService dirsHandler;
|
||||||
|
private final DeletionService delService;
|
||||||
|
private final UserGroupInformation userUGI;
|
||||||
|
|
||||||
private final Pattern p = Pattern.compile("^[A-Za-z_]+[A-Za-z0-9_]*$");
|
private final Pattern p = Pattern.compile("^[A-Za-z_]+[A-Za-z0-9_]*$");
|
||||||
|
|
||||||
public AuxServices(AuxiliaryLocalPathHandler auxiliaryLocalPathHandler) {
|
public AuxServices(AuxiliaryLocalPathHandler auxiliaryLocalPathHandler,
|
||||||
|
Context nmContext, DeletionService deletionService) {
|
||||||
super(AuxServices.class.getName());
|
super(AuxServices.class.getName());
|
||||||
serviceMap =
|
serviceMap =
|
||||||
Collections.synchronizedMap(new HashMap<String,AuxiliaryService>());
|
Collections.synchronizedMap(new HashMap<String,AuxiliaryService>());
|
||||||
serviceMetaData =
|
serviceMetaData =
|
||||||
Collections.synchronizedMap(new HashMap<String,ByteBuffer>());
|
Collections.synchronizedMap(new HashMap<String,ByteBuffer>());
|
||||||
this.auxiliaryLocalPathHandler = auxiliaryLocalPathHandler;
|
this.auxiliaryLocalPathHandler = auxiliaryLocalPathHandler;
|
||||||
|
this.dirsHandler = nmContext.getLocalDirsHandler();
|
||||||
|
this.delService = deletionService;
|
||||||
|
this.userUGI = getRemoteUgi();
|
||||||
// Obtain services from configuration in init()
|
// Obtain services from configuration in init()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -125,15 +152,103 @@ public class AuxServices extends AbstractService
|
||||||
String classKey = String.format(
|
String classKey = String.format(
|
||||||
YarnConfiguration.NM_AUX_SERVICE_FMT, sName);
|
YarnConfiguration.NM_AUX_SERVICE_FMT, sName);
|
||||||
String className = conf.get(classKey);
|
String className = conf.get(classKey);
|
||||||
final String appClassPath = conf.get(String.format(
|
final String appLocalClassPath = conf.get(String.format(
|
||||||
YarnConfiguration.NM_AUX_SERVICES_CLASSPATH, sName));
|
YarnConfiguration.NM_AUX_SERVICES_CLASSPATH, sName));
|
||||||
|
final String appRemoteClassPath = conf.get(String.format(
|
||||||
|
YarnConfiguration.NM_AUX_SERVICE_REMOTE_CLASSPATH, sName));
|
||||||
AuxiliaryService s = null;
|
AuxiliaryService s = null;
|
||||||
boolean useCustomerClassLoader = appClassPath != null
|
boolean useCustomerClassLoader = ((appLocalClassPath != null
|
||||||
&& !appClassPath.isEmpty() && className != null
|
&& !appLocalClassPath.isEmpty()) ||
|
||||||
&& !className.isEmpty();
|
(appRemoteClassPath != null && !appRemoteClassPath.isEmpty()))
|
||||||
|
&& className != null && !className.isEmpty();
|
||||||
if (useCustomerClassLoader) {
|
if (useCustomerClassLoader) {
|
||||||
|
// load AuxiliaryService from local class path
|
||||||
|
if (appRemoteClassPath == null || appRemoteClassPath.isEmpty()) {
|
||||||
s = AuxiliaryServiceWithCustomClassLoader.getInstance(
|
s = AuxiliaryServiceWithCustomClassLoader.getInstance(
|
||||||
conf, className, appClassPath);
|
conf, className, appLocalClassPath);
|
||||||
|
} else {
|
||||||
|
// load AuxiliaryService from remote class path
|
||||||
|
if (appLocalClassPath != null && !appLocalClassPath.isEmpty()) {
|
||||||
|
throw new YarnRuntimeException("The aux serivce:" + sName
|
||||||
|
+ " has configured local classpath:" + appLocalClassPath
|
||||||
|
+ " and remote classpath:" + appRemoteClassPath
|
||||||
|
+ ". Only one of them should be configured.");
|
||||||
|
}
|
||||||
|
FileContext localLFS = getLocalFileContext(conf);
|
||||||
|
// create NM aux-service dir in NM localdir if it does not exist.
|
||||||
|
Path nmAuxDir = dirsHandler.getLocalPathForWrite("."
|
||||||
|
+ Path.SEPARATOR + NM_AUX_SERVICE_DIR);
|
||||||
|
if (!localLFS.util().exists(nmAuxDir)) {
|
||||||
|
try {
|
||||||
|
localLFS.mkdir(nmAuxDir, NM_AUX_SERVICE_DIR_PERM, true);
|
||||||
|
} catch (IOException ex) {
|
||||||
|
throw new YarnRuntimeException("Fail to create dir:"
|
||||||
|
+ nmAuxDir.toString(), ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Path src = new Path(appRemoteClassPath);
|
||||||
|
FileContext remoteLFS = getRemoteFileContext(src.toUri(), conf);
|
||||||
|
FileStatus scFileStatus = remoteLFS.getFileStatus(src);
|
||||||
|
if (!scFileStatus.getOwner().equals(
|
||||||
|
this.userUGI.getShortUserName())) {
|
||||||
|
throw new YarnRuntimeException("The remote jarfile owner:"
|
||||||
|
+ scFileStatus.getOwner() + " is not the same as the NM user:"
|
||||||
|
+ this.userUGI.getShortUserName() + ".");
|
||||||
|
}
|
||||||
|
if ((scFileStatus.getPermission().toShort() & 0022) != 0) {
|
||||||
|
throw new YarnRuntimeException("The remote jarfile should not "
|
||||||
|
+ "be writable by group or others. "
|
||||||
|
+ "The current Permission is "
|
||||||
|
+ scFileStatus.getPermission().toShort());
|
||||||
|
}
|
||||||
|
Path dest = null;
|
||||||
|
Path downloadDest = new Path(nmAuxDir,
|
||||||
|
className + "_" + scFileStatus.getModificationTime());
|
||||||
|
// check whether we need to re-download the jar
|
||||||
|
// from remote directory
|
||||||
|
Path targetDirPath = new Path(downloadDest,
|
||||||
|
scFileStatus.getPath().getName());
|
||||||
|
FileStatus[] allSubDirs = localLFS.util().listStatus(nmAuxDir);
|
||||||
|
boolean reDownload = true;
|
||||||
|
for (FileStatus sub : allSubDirs) {
|
||||||
|
if (sub.getPath().getName().equals(downloadDest.getName())) {
|
||||||
|
reDownload = false;
|
||||||
|
dest = new Path(targetDirPath + Path.SEPARATOR + "*");
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
if (sub.getPath().getName().contains(className) &&
|
||||||
|
!sub.getPath().getName().endsWith(DEL_SUFFIX)) {
|
||||||
|
Path delPath = new Path(sub.getPath().getParent(),
|
||||||
|
sub.getPath().getName() + DEL_SUFFIX);
|
||||||
|
localLFS.rename(sub.getPath(), delPath);
|
||||||
|
LOG.info("delete old aux service jar dir:"
|
||||||
|
+ delPath.toString());
|
||||||
|
FileDeletionTask deletionTask = new FileDeletionTask(
|
||||||
|
this.delService, null, delPath, null);
|
||||||
|
this.delService.delete(deletionTask);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (reDownload) {
|
||||||
|
LocalResource scRsrc = LocalResource.newInstance(
|
||||||
|
URL.fromURI(src.toUri()),
|
||||||
|
LocalResourceType.ARCHIVE, LocalResourceVisibility.PRIVATE,
|
||||||
|
scFileStatus.getLen(), scFileStatus.getModificationTime());
|
||||||
|
FSDownload download = new FSDownload(localLFS, null, conf,
|
||||||
|
downloadDest, scRsrc, null);
|
||||||
|
try {
|
||||||
|
Path downloaded = download.call();
|
||||||
|
dest = new Path(downloaded + Path.SEPARATOR + "*");
|
||||||
|
} catch (Exception ex) {
|
||||||
|
throw new YarnRuntimeException(
|
||||||
|
"Exception happend while downloading files "
|
||||||
|
+ "for aux-service:" + sName + " and remote-file-path:"
|
||||||
|
+ src + ".\n" + ex.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s = AuxiliaryServiceWithCustomClassLoader.getInstance(
|
||||||
|
conf, className, dest.toString());
|
||||||
|
}
|
||||||
LOG.info("The aux service:" + sName
|
LOG.info("The aux service:" + sName
|
||||||
+ " are using the custom classloader");
|
+ " are using the custom classloader");
|
||||||
} else {
|
} else {
|
||||||
|
@ -289,4 +404,33 @@ public class AuxServices extends AbstractService
|
||||||
: "The auxService name is " + service.getName())
|
: "The auxService name is " + service.getName())
|
||||||
+ " and it got an error at event: " + eventType, th);
|
+ " and it got an error at event: " + eventType, th);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
FileContext getLocalFileContext(Configuration conf) {
|
||||||
|
try {
|
||||||
|
return FileContext.getLocalFSFileContext(conf);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new YarnRuntimeException("Failed to access local fs");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
FileContext getRemoteFileContext(final URI path, Configuration conf) {
|
||||||
|
try {
|
||||||
|
return FileContext.getFileContext(path, conf);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new YarnRuntimeException("Failed to access remote fs");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private UserGroupInformation getRemoteUgi() {
|
||||||
|
UserGroupInformation remoteUgi;
|
||||||
|
try {
|
||||||
|
remoteUgi = UserGroupInformation.getCurrentUser();
|
||||||
|
} catch (IOException e) {
|
||||||
|
String msg = "Cannot obtain the user-name. Got exception: "
|
||||||
|
+ StringUtils.stringifyException(e);
|
||||||
|
LOG.warn(msg);
|
||||||
|
throw new YarnRuntimeException(msg);
|
||||||
|
}
|
||||||
|
return remoteUgi;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -253,7 +253,8 @@ public class ContainerManagerImpl extends CompositeService implements
|
||||||
AuxiliaryLocalPathHandler auxiliaryLocalPathHandler =
|
AuxiliaryLocalPathHandler auxiliaryLocalPathHandler =
|
||||||
new AuxiliaryLocalPathHandlerImpl(dirsHandler);
|
new AuxiliaryLocalPathHandlerImpl(dirsHandler);
|
||||||
// Start configurable services
|
// Start configurable services
|
||||||
auxiliaryServices = new AuxServices(auxiliaryLocalPathHandler);
|
auxiliaryServices = new AuxServices(auxiliaryLocalPathHandler,
|
||||||
|
this.context, this.deletionService);
|
||||||
auxiliaryServices.registerServiceListener(this);
|
auxiliaryServices.registerServiceListener(this);
|
||||||
addService(auxiliaryServices);
|
addService(auxiliaryServices);
|
||||||
|
|
||||||
|
|
|
@ -25,8 +25,12 @@ import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import org.mockito.Mockito;
|
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
import static org.mockito.Matchers.anyString;
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -37,6 +41,10 @@ import java.io.IOException;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.net.URLClassLoader;
|
import java.net.URLClassLoader;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Paths;
|
||||||
|
import java.nio.file.attribute.FileTime;
|
||||||
|
import java.nio.file.attribute.PosixFilePermission;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
@ -46,6 +54,7 @@ import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -61,6 +70,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
|
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
|
||||||
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
|
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
|
||||||
|
@ -69,8 +79,11 @@ import org.apache.hadoop.yarn.server.api.AuxiliaryService;
|
||||||
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
|
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
|
||||||
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
|
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -82,7 +95,10 @@ public class TestAuxServices {
|
||||||
System.getProperty("java.io.tmpdir")),
|
System.getProperty("java.io.tmpdir")),
|
||||||
TestAuxServices.class.getName());
|
TestAuxServices.class.getName());
|
||||||
private final static AuxiliaryLocalPathHandler MOCK_AUX_PATH_HANDLER =
|
private final static AuxiliaryLocalPathHandler MOCK_AUX_PATH_HANDLER =
|
||||||
Mockito.mock(AuxiliaryLocalPathHandler.class);
|
mock(AuxiliaryLocalPathHandler.class);
|
||||||
|
private final static Context MOCK_CONTEXT = mock(Context.class);
|
||||||
|
private final static DeletionService MOCK_DEL_SERVICE = mock(
|
||||||
|
DeletionService.class);
|
||||||
|
|
||||||
static class LightService extends AuxiliaryService implements Service
|
static class LightService extends AuxiliaryService implements Service
|
||||||
{
|
{
|
||||||
|
@ -188,6 +204,126 @@ public class TestAuxServices {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("resource")
|
||||||
|
@Test
|
||||||
|
public void testRemoteAuxServiceClassPath() throws Exception {
|
||||||
|
Configuration conf = new YarnConfiguration();
|
||||||
|
FileSystem fs = FileSystem.get(conf);
|
||||||
|
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
|
||||||
|
new String[] {"ServiceC"});
|
||||||
|
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
|
||||||
|
"ServiceC"), ServiceC.class, Service.class);
|
||||||
|
|
||||||
|
Context mockContext2 = mock(Context.class);
|
||||||
|
LocalDirsHandlerService mockDirsHandler = mock(
|
||||||
|
LocalDirsHandlerService.class);
|
||||||
|
String root = "target/LocalDir";
|
||||||
|
Path rootAuxServiceDirPath = new Path(root, "nmAuxService");
|
||||||
|
when(mockDirsHandler.getLocalPathForWrite(anyString())).thenReturn(
|
||||||
|
rootAuxServiceDirPath);
|
||||||
|
when(mockContext2.getLocalDirsHandler()).thenReturn(mockDirsHandler);
|
||||||
|
|
||||||
|
File rootDir = GenericTestUtils.getTestDir(getClass()
|
||||||
|
.getSimpleName());
|
||||||
|
if (!rootDir.exists()) {
|
||||||
|
rootDir.mkdirs();
|
||||||
|
}
|
||||||
|
AuxServices aux = null;
|
||||||
|
File testJar = null;
|
||||||
|
try {
|
||||||
|
// the remote jar file should not be be writable by group or others.
|
||||||
|
try {
|
||||||
|
testJar = JarFinder.makeClassLoaderTestJar(this.getClass(), rootDir,
|
||||||
|
"test-runjar.jar", 2048, ServiceC.class.getName());
|
||||||
|
// Give group a write permission.
|
||||||
|
// We should not load the auxservice from remote jar file.
|
||||||
|
Set<PosixFilePermission> perms = new HashSet<PosixFilePermission>();
|
||||||
|
perms.add(PosixFilePermission.OWNER_READ);
|
||||||
|
perms.add(PosixFilePermission.OWNER_WRITE);
|
||||||
|
perms.add(PosixFilePermission.GROUP_WRITE);
|
||||||
|
Files.setPosixFilePermissions(Paths.get(testJar.getAbsolutePath()),
|
||||||
|
perms);
|
||||||
|
conf.set(String.format(
|
||||||
|
YarnConfiguration.NM_AUX_SERVICE_REMOTE_CLASSPATH, "ServiceC"),
|
||||||
|
testJar.getAbsolutePath());
|
||||||
|
aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
|
||||||
|
mockContext2, MOCK_DEL_SERVICE);
|
||||||
|
aux.init(conf);
|
||||||
|
Assert.fail("The permission of the jar is wrong."
|
||||||
|
+ "Should throw out exception.");
|
||||||
|
} catch (YarnRuntimeException ex) {
|
||||||
|
Assert.assertTrue(ex.getMessage(), ex.getMessage().contains(
|
||||||
|
"The remote jarfile should not be writable by group or others"));
|
||||||
|
}
|
||||||
|
|
||||||
|
Files.delete(Paths.get(testJar.getAbsolutePath()));
|
||||||
|
|
||||||
|
testJar = JarFinder.makeClassLoaderTestJar(this.getClass(), rootDir,
|
||||||
|
"test-runjar.jar", 2048, ServiceC.class.getName());
|
||||||
|
conf.set(String.format(
|
||||||
|
YarnConfiguration.NM_AUX_SERVICE_REMOTE_CLASSPATH, "ServiceC"),
|
||||||
|
testJar.getAbsolutePath());
|
||||||
|
aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
|
||||||
|
mockContext2, MOCK_DEL_SERVICE);
|
||||||
|
aux.init(conf);
|
||||||
|
aux.start();
|
||||||
|
Map<String, ByteBuffer> meta = aux.getMetaData();
|
||||||
|
String auxName = "";
|
||||||
|
Assert.assertTrue(meta.size() == 1);
|
||||||
|
for(Entry<String, ByteBuffer> i : meta.entrySet()) {
|
||||||
|
auxName = i.getKey();
|
||||||
|
}
|
||||||
|
Assert.assertEquals("ServiceC", auxName);
|
||||||
|
aux.serviceStop();
|
||||||
|
FileStatus[] status = fs.listStatus(rootAuxServiceDirPath);
|
||||||
|
Assert.assertTrue(status.length == 1);
|
||||||
|
|
||||||
|
// initialize the same auxservice again, and make sure that we did not
|
||||||
|
// re-download the jar from remote directory.
|
||||||
|
aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
|
||||||
|
mockContext2, MOCK_DEL_SERVICE);
|
||||||
|
aux.init(conf);
|
||||||
|
aux.start();
|
||||||
|
meta = aux.getMetaData();
|
||||||
|
Assert.assertTrue(meta.size() == 1);
|
||||||
|
for(Entry<String, ByteBuffer> i : meta.entrySet()) {
|
||||||
|
auxName = i.getKey();
|
||||||
|
}
|
||||||
|
Assert.assertEquals("ServiceC", auxName);
|
||||||
|
verify(MOCK_DEL_SERVICE, times(0)).delete(any(FileDeletionTask.class));
|
||||||
|
status = fs.listStatus(rootAuxServiceDirPath);
|
||||||
|
Assert.assertTrue(status.length == 1);
|
||||||
|
aux.serviceStop();
|
||||||
|
|
||||||
|
// change the last modification time for remote jar,
|
||||||
|
// we will re-download the jar and clean the old jar
|
||||||
|
long time = System.currentTimeMillis() + 3600*1000;
|
||||||
|
FileTime fileTime = FileTime.fromMillis(time);
|
||||||
|
Files.setLastModifiedTime(Paths.get(testJar.getAbsolutePath()),
|
||||||
|
fileTime);
|
||||||
|
conf.set(
|
||||||
|
String.format(YarnConfiguration.NM_AUX_SERVICE_REMOTE_CLASSPATH,
|
||||||
|
"ServiceC"),
|
||||||
|
testJar.getAbsolutePath());
|
||||||
|
aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
|
||||||
|
mockContext2, MOCK_DEL_SERVICE);
|
||||||
|
aux.init(conf);
|
||||||
|
aux.start();
|
||||||
|
verify(MOCK_DEL_SERVICE, times(1)).delete(any(FileDeletionTask.class));
|
||||||
|
status = fs.listStatus(rootAuxServiceDirPath);
|
||||||
|
Assert.assertTrue(status.length == 2);
|
||||||
|
aux.serviceStop();
|
||||||
|
} finally {
|
||||||
|
if (testJar != null) {
|
||||||
|
testJar.delete();
|
||||||
|
rootDir.delete();
|
||||||
|
}
|
||||||
|
if (fs.exists(new Path(root))) {
|
||||||
|
fs.delete(new Path(root), true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// To verify whether we could load class from customized class path.
|
// To verify whether we could load class from customized class path.
|
||||||
// We would use ServiceC in this test. Also create a separate jar file
|
// We would use ServiceC in this test. Also create a separate jar file
|
||||||
// including ServiceC class, and add this jar to customized directory.
|
// including ServiceC class, and add this jar to customized directory.
|
||||||
|
@ -202,7 +338,8 @@ public class TestAuxServices {
|
||||||
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
|
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
|
||||||
"ServiceC"), ServiceC.class, Service.class);
|
"ServiceC"), ServiceC.class, Service.class);
|
||||||
@SuppressWarnings("resource")
|
@SuppressWarnings("resource")
|
||||||
AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER);
|
AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
|
||||||
|
MOCK_CONTEXT, MOCK_DEL_SERVICE);
|
||||||
aux.init(conf);
|
aux.init(conf);
|
||||||
aux.start();
|
aux.start();
|
||||||
Map<String, ByteBuffer> meta = aux.getMetaData();
|
Map<String, ByteBuffer> meta = aux.getMetaData();
|
||||||
|
@ -244,7 +381,8 @@ public class TestAuxServices {
|
||||||
conf.set(String.format(
|
conf.set(String.format(
|
||||||
YarnConfiguration.NM_AUX_SERVICES_SYSTEM_CLASSES,
|
YarnConfiguration.NM_AUX_SERVICES_SYSTEM_CLASSES,
|
||||||
"ServiceC"), systemClasses);
|
"ServiceC"), systemClasses);
|
||||||
aux = new AuxServices(MOCK_AUX_PATH_HANDLER);
|
aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
|
||||||
|
MOCK_CONTEXT, MOCK_DEL_SERVICE);
|
||||||
aux.init(conf);
|
aux.init(conf);
|
||||||
aux.start();
|
aux.start();
|
||||||
meta = aux.getMetaData();
|
meta = aux.getMetaData();
|
||||||
|
@ -282,7 +420,8 @@ public class TestAuxServices {
|
||||||
ServiceB.class, Service.class);
|
ServiceB.class, Service.class);
|
||||||
conf.setInt("A.expected.init", 1);
|
conf.setInt("A.expected.init", 1);
|
||||||
conf.setInt("B.expected.stop", 1);
|
conf.setInt("B.expected.stop", 1);
|
||||||
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER);
|
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
|
||||||
|
MOCK_CONTEXT, MOCK_DEL_SERVICE);
|
||||||
aux.init(conf);
|
aux.init(conf);
|
||||||
aux.start();
|
aux.start();
|
||||||
|
|
||||||
|
@ -346,7 +485,8 @@ public class TestAuxServices {
|
||||||
ServiceA.class, Service.class);
|
ServiceA.class, Service.class);
|
||||||
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"),
|
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"),
|
||||||
ServiceB.class, Service.class);
|
ServiceB.class, Service.class);
|
||||||
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER);
|
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
|
||||||
|
MOCK_CONTEXT, MOCK_DEL_SERVICE);
|
||||||
aux.init(conf);
|
aux.init(conf);
|
||||||
|
|
||||||
int latch = 1;
|
int latch = 1;
|
||||||
|
@ -379,7 +519,8 @@ public class TestAuxServices {
|
||||||
ServiceA.class, Service.class);
|
ServiceA.class, Service.class);
|
||||||
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"),
|
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"),
|
||||||
ServiceB.class, Service.class);
|
ServiceB.class, Service.class);
|
||||||
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER);
|
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
|
||||||
|
MOCK_CONTEXT, MOCK_DEL_SERVICE);
|
||||||
aux.init(conf);
|
aux.init(conf);
|
||||||
|
|
||||||
int latch = 1;
|
int latch = 1;
|
||||||
|
@ -416,7 +557,8 @@ public class TestAuxServices {
|
||||||
ServiceA.class, Service.class);
|
ServiceA.class, Service.class);
|
||||||
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"),
|
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"),
|
||||||
ServiceB.class, Service.class);
|
ServiceB.class, Service.class);
|
||||||
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER);
|
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
|
||||||
|
MOCK_CONTEXT, MOCK_DEL_SERVICE);
|
||||||
aux.init(conf);
|
aux.init(conf);
|
||||||
aux.start();
|
aux.start();
|
||||||
|
|
||||||
|
@ -429,7 +571,8 @@ public class TestAuxServices {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testValidAuxServiceName() {
|
public void testValidAuxServiceName() {
|
||||||
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER);
|
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
|
||||||
|
MOCK_CONTEXT, MOCK_DEL_SERVICE);
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[] {"Asrv1", "Bsrv_2"});
|
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[] {"Asrv1", "Bsrv_2"});
|
||||||
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Asrv1"),
|
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Asrv1"),
|
||||||
|
@ -443,7 +586,8 @@ public class TestAuxServices {
|
||||||
}
|
}
|
||||||
|
|
||||||
//Test bad auxService Name
|
//Test bad auxService Name
|
||||||
final AuxServices aux1 = new AuxServices(MOCK_AUX_PATH_HANDLER);
|
final AuxServices aux1 = new AuxServices(MOCK_AUX_PATH_HANDLER,
|
||||||
|
MOCK_CONTEXT, MOCK_DEL_SERVICE);
|
||||||
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[] {"1Asrv1"});
|
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[] {"1Asrv1"});
|
||||||
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "1Asrv1"),
|
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "1Asrv1"),
|
||||||
ServiceA.class, Service.class);
|
ServiceA.class, Service.class);
|
||||||
|
@ -469,7 +613,8 @@ public class TestAuxServices {
|
||||||
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"),
|
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"),
|
||||||
RecoverableServiceB.class, Service.class);
|
RecoverableServiceB.class, Service.class);
|
||||||
try {
|
try {
|
||||||
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER);
|
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
|
||||||
|
MOCK_CONTEXT, MOCK_DEL_SERVICE);
|
||||||
aux.init(conf);
|
aux.init(conf);
|
||||||
Assert.assertEquals(2, aux.getServices().size());
|
Assert.assertEquals(2, aux.getServices().size());
|
||||||
File auxStorageDir = new File(TEST_DIR,
|
File auxStorageDir = new File(TEST_DIR,
|
||||||
|
|
Loading…
Reference in New Issue