YARN-8079. Support static and archive unmodified local resources in service AM. Contributed by Suma Shivaprasad
This commit is contained in:
parent
502914ca32
commit
6795f8072f
|
@ -475,6 +475,8 @@ definitions:
|
|||
- YAML
|
||||
- TEMPLATE
|
||||
- HADOOP_XML
|
||||
- STATIC
|
||||
- ARCHIVE
|
||||
dest_file:
|
||||
type: string
|
||||
description: The path that this configuration file should be created as. If it is an absolute path, it will be mounted into the DOCKER container. Absolute paths are only allowed for DOCKER containers. If it is a relative path, only the file name should be provided, and the file will be created in the container local working directory under a folder named conf.
|
||||
|
|
|
@ -55,7 +55,8 @@ public class ConfigFile implements Serializable {
|
|||
@XmlEnum
|
||||
public enum TypeEnum {
|
||||
XML("XML"), PROPERTIES("PROPERTIES"), JSON("JSON"), YAML("YAML"), TEMPLATE(
|
||||
"TEMPLATE"), HADOOP_XML("HADOOP_XML");
|
||||
"TEMPLATE"), HADOOP_XML("HADOOP_XML"), STATIC("STATIC"), ARCHIVE(
|
||||
"ARCHIVE");
|
||||
|
||||
private String value;
|
||||
|
||||
|
|
|
@ -84,6 +84,7 @@ public interface YarnServiceConstants {
|
|||
String HADOOP_USER_NAME = "HADOOP_USER_NAME";
|
||||
|
||||
String APP_CONF_DIR = "conf";
|
||||
String APP_RESOURCES_DIR = "resources";
|
||||
|
||||
String APP_LIB_DIR = "lib";
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.yarn.service.provider;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.yarn.service.api.records.Artifact;
|
||||
|
@ -86,8 +87,9 @@ public abstract class AbstractClientProvider {
|
|||
if (file.getType() == null) {
|
||||
throw new IllegalArgumentException("File type is empty");
|
||||
}
|
||||
ConfigFile.TypeEnum fileType = file.getType();
|
||||
|
||||
if (file.getType().equals(ConfigFile.TypeEnum.TEMPLATE)) {
|
||||
if (fileType.equals(ConfigFile.TypeEnum.TEMPLATE)) {
|
||||
if (StringUtils.isEmpty(file.getSrcFile()) &&
|
||||
!file.getProperties().containsKey(CONTENT)) {
|
||||
throw new IllegalArgumentException(MessageFormat.format("For {0} " +
|
||||
|
@ -96,6 +98,25 @@ public abstract class AbstractClientProvider {
|
|||
"the 'properties' field of ConfigFile. ",
|
||||
ConfigFile.TypeEnum.TEMPLATE, CONTENT));
|
||||
}
|
||||
} else if (fileType.equals(ConfigFile.TypeEnum.STATIC) || fileType.equals(
|
||||
ConfigFile.TypeEnum.ARCHIVE)) {
|
||||
if (!file.getProperties().isEmpty()) {
|
||||
throw new IllegalArgumentException(String
|
||||
.format("For %s format, should not specify any 'properties.'",
|
||||
fileType));
|
||||
}
|
||||
|
||||
String srcFile = file.getSrcFile();
|
||||
if (srcFile == null || srcFile.isEmpty()) {
|
||||
throw new IllegalArgumentException(String.format(
|
||||
"For %s format, should make sure that srcFile is specified",
|
||||
fileType));
|
||||
}
|
||||
FileStatus fileStatus = fs.getFileStatus(new Path(srcFile));
|
||||
if (fileStatus != null && fileStatus.isDirectory()) {
|
||||
throw new IllegalArgumentException("srcFile=" + srcFile +
|
||||
" is a directory, which is not supported.");
|
||||
}
|
||||
}
|
||||
if (!StringUtils.isEmpty(file.getSrcFile())) {
|
||||
Path p = new Path(file.getSrcFile());
|
||||
|
|
|
@ -97,6 +97,10 @@ public abstract class AbstractProviderService implements ProviderService,
|
|||
ProviderUtils.createConfigFileAndAddLocalResource(launcher, fileSystem,
|
||||
compLaunchContext, tokensForSubstitution, instance, context);
|
||||
|
||||
// handles static files (like normal file / archive file) for localization.
|
||||
ProviderUtils.handleStaticFilesForLocalization(launcher, fileSystem,
|
||||
compLaunchContext);
|
||||
|
||||
// substitute launch command
|
||||
String launchCommand = compLaunchContext.getLaunchCommand();
|
||||
// docker container may have empty commands
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.yarn.service.provider;
|
||||
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
|
@ -198,6 +199,10 @@ public class ProviderUtils implements YarnServiceConstants {
|
|||
|
||||
for (ConfigFile originalFile : compLaunchContext.getConfiguration()
|
||||
.getFiles()) {
|
||||
|
||||
if (isStaticFile(originalFile)) {
|
||||
continue;
|
||||
}
|
||||
ConfigFile configFile = originalFile.copy();
|
||||
String fileName = new Path(configFile.getDestFile()).getName();
|
||||
|
||||
|
@ -207,7 +212,14 @@ public class ProviderUtils implements YarnServiceConstants {
|
|||
.replaceAll(Pattern.quote(token.getKey()), token.getValue()));
|
||||
}
|
||||
|
||||
/* When source file is not specified, write new configs
|
||||
* to compInstanceDir/fileName
|
||||
* When source file is specified, it reads and performs variable
|
||||
* substitution and merges in new configs, and writes a new file to
|
||||
* compInstanceDir/fileName.
|
||||
*/
|
||||
Path remoteFile = new Path(compInstanceDir, fileName);
|
||||
|
||||
if (!fs.getFileSystem().exists(remoteFile)) {
|
||||
log.info("Saving config file on hdfs for component " + instance
|
||||
.getCompInstanceName() + ": " + configFile);
|
||||
|
@ -239,22 +251,79 @@ public class ProviderUtils implements YarnServiceConstants {
|
|||
// Add resource for localization
|
||||
LocalResource configResource =
|
||||
fs.createAmResource(remoteFile, LocalResourceType.FILE);
|
||||
File destFile = new File(configFile.getDestFile());
|
||||
Path destFile = new Path(configFile.getDestFile());
|
||||
String symlink = APP_CONF_DIR + "/" + fileName;
|
||||
if (destFile.isAbsolute()) {
|
||||
launcher.addLocalResource(symlink, configResource,
|
||||
configFile.getDestFile());
|
||||
log.info("Add config file for localization: " + symlink + " -> "
|
||||
+ configResource.getResource().getFile() + ", dest mount path: "
|
||||
+ configFile.getDestFile());
|
||||
} else {
|
||||
launcher.addLocalResource(symlink, configResource);
|
||||
log.info("Add config file for localization: " + symlink + " -> "
|
||||
+ configResource.getResource().getFile());
|
||||
}
|
||||
addLocalResource(launcher, symlink, configResource, destFile);
|
||||
}
|
||||
}
|
||||
|
||||
public static synchronized void handleStaticFilesForLocalization(
|
||||
AbstractLauncher launcher, SliderFileSystem fs, ContainerLaunchService
|
||||
.ComponentLaunchContext componentLaunchCtx)
|
||||
throws IOException {
|
||||
for (ConfigFile staticFile :
|
||||
componentLaunchCtx.getConfiguration().getFiles()) {
|
||||
// Only handle static file here.
|
||||
if (!isStaticFile(staticFile)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (staticFile.getSrcFile() == null) {
|
||||
// This should not happen, AbstractClientProvider should have checked
|
||||
// this.
|
||||
throw new IOException("srcFile is null, please double check.");
|
||||
}
|
||||
Path sourceFile = new Path(staticFile.getSrcFile());
|
||||
|
||||
// Output properties to sourceFile if not existed
|
||||
if (!fs.getFileSystem().exists(sourceFile)) {
|
||||
throw new IOException(
|
||||
"srcFile=" + sourceFile + " doesn't exist, please double check.");
|
||||
}
|
||||
|
||||
FileStatus fileStatus = fs.getFileSystem().getFileStatus(sourceFile);
|
||||
if (fileStatus.isDirectory()) {
|
||||
throw new IOException("srcFile=" + sourceFile +
|
||||
" is a directory, which is not supported.");
|
||||
}
|
||||
|
||||
// Add resource for localization
|
||||
LocalResource localResource = fs.createAmResource(sourceFile,
|
||||
(staticFile.getType() == ConfigFile.TypeEnum.ARCHIVE ?
|
||||
LocalResourceType.ARCHIVE :
|
||||
LocalResourceType.FILE));
|
||||
Path destFile = new Path(sourceFile.getName());
|
||||
if (staticFile.getDestFile() != null && !staticFile.getDestFile()
|
||||
.isEmpty()) {
|
||||
destFile = new Path(staticFile.getDestFile());
|
||||
}
|
||||
|
||||
String symlink = APP_RESOURCES_DIR + "/" + destFile.getName();
|
||||
addLocalResource(launcher, symlink, localResource, destFile);
|
||||
}
|
||||
}
|
||||
|
||||
private static void addLocalResource(AbstractLauncher launcher,
|
||||
String symlink, LocalResource localResource, Path destFile) {
|
||||
if (destFile.isAbsolute()) {
|
||||
launcher.addLocalResource(symlink, localResource, destFile.toString());
|
||||
log.info("Added file for localization: "+ symlink +" -> " +
|
||||
localResource.getResource().getFile() + ", dest mount path: " +
|
||||
destFile);
|
||||
} else{
|
||||
launcher.addLocalResource(symlink, localResource);
|
||||
log.info("Added file for localization: " + symlink+ " -> " +
|
||||
localResource.getResource().getFile());
|
||||
}
|
||||
}
|
||||
|
||||
// Static file is files uploaded by users before launch the service. Which
|
||||
// should be localized to container local disk without any changes.
|
||||
private static boolean isStaticFile(ConfigFile file) {
|
||||
return file.getType().equals(ConfigFile.TypeEnum.ARCHIVE) || file.getType()
|
||||
.equals(ConfigFile.TypeEnum.STATIC);
|
||||
}
|
||||
|
||||
private static void resolvePropsInConfigFileAndSaveOnHdfs(SliderFileSystem fs,
|
||||
Map<String, String> tokensForSubstitution, ComponentInstance instance,
|
||||
ConfigFile configFile, String fileName, Path remoteFile)
|
||||
|
|
|
@ -0,0 +1,164 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.service.provider;
|
||||
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||
import org.apache.hadoop.yarn.api.records.URL;
|
||||
import org.apache.hadoop.yarn.service.api.records.ConfigFile;
|
||||
import org.apache.hadoop.yarn.service.api.records.Configuration;
|
||||
import org.apache.hadoop.yarn.service.containerlaunch.AbstractLauncher;
|
||||
import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
|
||||
import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
* Test functionality of ProviderUtils.
|
||||
*/
|
||||
public class TestProviderUtils {
|
||||
@Test
|
||||
public void testStaticFileLocalization() throws IOException {
|
||||
// A bunch of mocks ...
|
||||
ContainerLaunchService.ComponentLaunchContext compLaunchCtx =
|
||||
mock(ContainerLaunchService.ComponentLaunchContext.class);
|
||||
AbstractLauncher launcher = mock(AbstractLauncher.class);
|
||||
SliderFileSystem sfs = mock(SliderFileSystem.class);
|
||||
FileSystem fs = mock(FileSystem.class);
|
||||
when(fs.getFileStatus(any(Path.class))).thenAnswer(
|
||||
invocationOnMock -> new FileStatus(1L, false, 1, 1L, 1L,
|
||||
(Path) invocationOnMock.getArguments()[0]));
|
||||
when(fs.exists(any(Path.class))).thenReturn(true);
|
||||
when(sfs.getFileSystem()).thenReturn(fs);
|
||||
Configuration conf = mock(Configuration.class);
|
||||
List<ConfigFile> configFileList = new ArrayList<>();
|
||||
when(conf.getFiles()).thenReturn(configFileList);
|
||||
when(compLaunchCtx.getConfiguration()).thenReturn(conf);
|
||||
when(sfs.createAmResource(any(Path.class), any(LocalResourceType.class)))
|
||||
.thenAnswer(invocationOnMock -> new LocalResource() {
|
||||
@Override
|
||||
public URL getResource() {
|
||||
return URL.fromPath(((Path) invocationOnMock.getArguments()[0]));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setResource(URL resource) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSize() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setSize(long size) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getTimestamp() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setTimestamp(long timestamp) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public LocalResourceType getType() {
|
||||
return (LocalResourceType) invocationOnMock.getArguments()[1];
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setType(LocalResourceType type) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public LocalResourceVisibility getVisibility() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setVisibility(LocalResourceVisibility visibility) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPattern() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setPattern(String pattern) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean getShouldBeUploadedToSharedCache() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setShouldBeUploadedToSharedCache(
|
||||
boolean shouldBeUploadedToSharedCache) {
|
||||
|
||||
}
|
||||
});
|
||||
|
||||
// Initialize list of files.
|
||||
//archive
|
||||
configFileList.add(new ConfigFile().srcFile("hdfs://default/sourceFile1")
|
||||
.destFile("destFile1").type(ConfigFile.TypeEnum.ARCHIVE));
|
||||
|
||||
//static file
|
||||
configFileList.add(new ConfigFile().srcFile("hdfs://default/sourceFile2")
|
||||
.destFile("folder/destFile_2").type(ConfigFile.TypeEnum.STATIC));
|
||||
|
||||
//This will be ignored since type is JSON
|
||||
configFileList.add(new ConfigFile().srcFile("hdfs://default/sourceFile3")
|
||||
.destFile("destFile3").type(ConfigFile.TypeEnum.JSON));
|
||||
//No destination file specified
|
||||
configFileList.add(new ConfigFile().srcFile("hdfs://default/sourceFile4")
|
||||
.type(ConfigFile.TypeEnum.STATIC));
|
||||
|
||||
ProviderUtils.handleStaticFilesForLocalization(launcher, sfs,
|
||||
compLaunchCtx);
|
||||
Mockito.verify(launcher).addLocalResource(Mockito.eq("resources/destFile1"),
|
||||
any(LocalResource.class));
|
||||
Mockito.verify(launcher).addLocalResource(
|
||||
Mockito.eq("resources/destFile_2"), any(LocalResource.class));
|
||||
Mockito.verify(launcher).addLocalResource(
|
||||
Mockito.eq("resources/sourceFile4"), any(LocalResource.class));
|
||||
}
|
||||
}
|
|
@ -17,7 +17,9 @@
|
|||
*/
|
||||
package org.apache.hadoop.yarn.service.providers;
|
||||
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.yarn.service.api.records.Artifact;
|
||||
import org.apache.hadoop.yarn.service.api.records.ConfigFile;
|
||||
import org.apache.hadoop.yarn.service.provider.AbstractClientProvider;
|
||||
|
@ -57,6 +59,7 @@ public class TestAbstractClientProvider {
|
|||
public void testConfigFiles() throws IOException {
|
||||
ClientProvider clientProvider = new ClientProvider();
|
||||
FileSystem mockFs = mock(FileSystem.class);
|
||||
FileStatus mockFileStatus = mock(FileStatus.class);
|
||||
when(mockFs.exists(anyObject())).thenReturn(true);
|
||||
|
||||
ConfigFile configFile = new ConfigFile();
|
||||
|
@ -114,5 +117,46 @@ public class TestAbstractClientProvider {
|
|||
Assert.fail(EXCEPTION_PREFIX + "duplicate dest file");
|
||||
} catch (IllegalArgumentException e) {
|
||||
}
|
||||
|
||||
configFiles.clear();
|
||||
configFile = new ConfigFile();
|
||||
configFile.setType(ConfigFile.TypeEnum.STATIC);
|
||||
configFile.setSrcFile(null);
|
||||
configFile.setDestFile("path/destfile3");
|
||||
configFiles.add(configFile);
|
||||
try {
|
||||
clientProvider.validateConfigFiles(configFiles, mockFs);
|
||||
Assert.fail(EXCEPTION_PREFIX + "dest file with multiple path elements");
|
||||
} catch (IllegalArgumentException e) {
|
||||
}
|
||||
|
||||
configFile.setDestFile("/path/destfile3");
|
||||
try {
|
||||
clientProvider.validateConfigFiles(configFiles, mockFs);
|
||||
Assert.fail(EXCEPTION_PREFIX + "src file should be specified");
|
||||
} catch (IllegalArgumentException e) {
|
||||
}
|
||||
|
||||
//should succeed
|
||||
configFile.setSrcFile("srcFile");
|
||||
configFile.setDestFile("destfile3");
|
||||
clientProvider.validateConfigFiles(configFiles, mockFs);
|
||||
|
||||
when(mockFileStatus.isDirectory()).thenReturn(true);
|
||||
when(mockFs.getFileStatus(new Path("srcFile")))
|
||||
.thenReturn(mockFileStatus).thenReturn(mockFileStatus);
|
||||
|
||||
configFiles.clear();
|
||||
configFile = new ConfigFile();
|
||||
configFile.setType(ConfigFile.TypeEnum.STATIC);
|
||||
configFile.setSrcFile("srcFile");
|
||||
configFile.setDestFile("destfile3");
|
||||
configFiles.add(configFile);
|
||||
|
||||
try {
|
||||
clientProvider.validateConfigFiles(configFiles, mockFs);
|
||||
Assert.fail(EXCEPTION_PREFIX + "src file is a directory");
|
||||
} catch (IllegalArgumentException e) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -250,8 +250,8 @@ A config file that needs to be created and made available as a volume in a servi
|
|||
|
||||
|Name|Description|Required|Schema|Default|
|
||||
|----|----|----|----|----|
|
||||
|type|Config file in the standard format like xml, properties, json, yaml, template.|false|enum (XML, PROPERTIES, JSON, YAML, TEMPLATE, HADOOP_XML)||
|
||||
|dest_file|The path that this configuration file should be created as. If it is an absolute path, it will be mounted into the DOCKER container. Absolute paths are only allowed for DOCKER containers. If it is a relative path, only the file name should be provided, and the file will be created in the container local working directory under a folder named conf.|false|string||
|
||||
|type|Config file in the standard format like xml, properties, json, yaml, template or static/archive resource files. When static/archive types are specified, file must be uploaded to remote file system before launching the job, and YARN service framework will localize files prior to launching containers. Archive files are unwrapped during localization |false|enum (XML, PROPERTIES, JSON, YAML, TEMPLATE, ENV, HADOOP_XML, STATIC, ARCHIVE)||
|
||||
|dest_file|The path that this configuration file should be created as. If it is an absolute path, it will be mounted into the DOCKER container. Absolute paths are only allowed for DOCKER containers. If it is a relative path, only the file name should be provided, and the file will be created in the container local working directory under a folder named conf for all types other than static/archive. For static/archive resource types, the files are available under resources directory.|false|string||
|
||||
|src_file|This provides the source location of the configuration file, the content of which is dumped to dest_file post property substitutions, in the format as specified in type. Typically the src_file would point to a source controlled network accessible file maintained by tools like puppet, chef, or hdfs etc. Currently, only hdfs is supported.|false|string||
|
||||
|properties|A blob of key value pairs that will be dumped in the dest_file in the format as specified in type. If src_file is specified, src_file content are dumped in the dest_file and these properties will overwrite, if any, existing properties in src_file or be added as new properties in src_file.|false|object||
|
||||
|
||||
|
|
Loading…
Reference in New Issue