diff --git a/build.gradle b/build.gradle index 878f7a9a915..831db456a19 100644 --- a/build.gradle +++ b/build.gradle @@ -97,6 +97,7 @@ subprojects { // the "value" -quiet is added, separated by a space. This is ok since the javadoc // command already adds -quiet, so we are just duplicating it // see https://discuss.gradle.org/t/add-custom-javadoc-option-that-does-not-take-an-argument/5959 + javadoc.options.encoding='UTF8' javadoc.options.addStringOption('Xdoclint:all,-missing', '-quiet') } } diff --git a/core/src/main/java/org/elasticsearch/plugins/PluginManager.java b/core/src/main/java/org/elasticsearch/plugins/PluginManager.java index 0a01667f3c5..1ebe7813d3c 100644 --- a/core/src/main/java/org/elasticsearch/plugins/PluginManager.java +++ b/core/src/main/java/org/elasticsearch/plugins/PluginManager.java @@ -90,6 +90,7 @@ public class PluginManager { "mapper-murmur3", "mapper-size", "repository-azure", + "repository-hdfs", "repository-s3", "store-smb")); diff --git a/core/src/main/resources/org/elasticsearch/plugins/plugin-install.help b/core/src/main/resources/org/elasticsearch/plugins/plugin-install.help index 2a4e6a6382c..8c73e3837a4 100644 --- a/core/src/main/resources/org/elasticsearch/plugins/plugin-install.help +++ b/core/src/main/resources/org/elasticsearch/plugins/plugin-install.help @@ -50,6 +50,7 @@ OFFICIAL PLUGINS - mapper-murmur3 - mapper-size - repository-azure + - repository-hdfs - repository-s3 - store-smb diff --git a/docs/plugins/repository-hdfs.asciidoc b/docs/plugins/repository-hdfs.asciidoc new file mode 100644 index 00000000000..ea13e5ad3a6 --- /dev/null +++ b/docs/plugins/repository-hdfs.asciidoc @@ -0,0 +1,115 @@ +[[repository-hdfs]] +=== Hadoop HDFS Repository Plugin + +The HDFS repository plugin adds support for using HDFS File System as a repository for +{ref}/modules-snapshots.html[Snapshot/Restore]. + +[[repository-hdfs-install]] +[float] +==== Installation + +This plugin can be installed using the plugin manager: + +[source,sh] +---------------------------------------------------------------- +sudo bin/plugin install repository-hdfs +sudo bin/plugin install repository-hdfs-hadoop2 +sudo bin/plugin install repository-hdfs-lite +---------------------------------------------------------------- + +The plugin must be installed on every node in the cluster, and each node must +be restarted after installation. + +[[repository-hdfs-remove]] +[float] +==== Removal + +The plugin can be removed with the following command: + +[source,sh] +---------------------------------------------------------------- +sudo bin/plugin remove repository-hdfs +sudo bin/plugin remove repository-hdfs-hadoop2 +sudo bin/plugin remove repository-hdfs-lite +---------------------------------------------------------------- + +The node must be stopped before removing the plugin. + +[[repository-hdfs-usage]] +==== Getting started with HDFS + +The HDFS snapshot/restore plugin comes in three _flavors_: + +* Default / Hadoop 1.x:: +The default version contains the plugin jar alongside Apache Hadoop 1.x (stable) dependencies. +* YARN / Hadoop 2.x:: +The `hadoop2` version contains the plugin jar plus the Apache Hadoop 2.x (also known as YARN) dependencies. +* Lite:: +The `lite` version contains just the plugin jar, without any Hadoop dependencies. The user should provide these (read below). + +[[repository-hdfs-flavor]] +===== What version to use? + +It depends on whether Hadoop is locally installed or not and if not, whether it is compatible with Apache Hadoop clients. + +* Are you using Apache Hadoop (or a _compatible_ distro) and do not have installed on the Elasticsearch nodes?:: ++ +If the answer is yes, for Apache Hadoop 1 use the default `repository-hdfs` or `repository-hdfs-hadoop2` for Apache Hadoop 2. ++ +* If you are have Hadoop installed locally on the Elasticsearch nodes or are using a certain distro:: ++ +Use the `lite` version and place your Hadoop _client_ jars and their dependencies in the plugin folder under `hadoop-libs`. +For large deployments, it is recommended to package the libraries in the plugin zip and deploy it manually across nodes +(and thus avoiding having to do the libraries setup on each node). + +[[repository-hdfs-security]] +==== Handling JVM Security and Permissions + +Out of the box, Elasticsearch runs in a JVM with the security manager turned _on_ to make sure that unsafe or sensitive actions +are allowed only from trusted code. Hadoop however is not really designed to run under one; it does not rely on privileged blocks +to execute sensitive code, of which it uses plenty. + +The `repository-hdfs` plugin provides the necessary permissions for both Apache Hadoop 1.x and 2.x (latest versions) to successfully +run in a secured JVM as one can tell from the number of permissions required when installing the plugin. +However using a certain Hadoop File-System (outside DFS), a certain distro or operating system (in particular Windows), might require +additional permissions which are not provided by the plugin. + +In this case there are several workarounds: +* add the permission into `plugin-security.policy` (available in the plugin folder) +* disable the security manager through `es.security.manager.enabled=false` configurations setting - NOT RECOMMENDED + +If you find yourself in such a situation, please let us know what Hadoop distro version and OS you are using and what permission is missing +by raising an issue. Thank you! + +[[repository-hdfs-config]] +==== Configuration Properties + +Once installed, define the configuration for the `hdfs` repository through `elasticsearch.yml` or the +{ref}/modules-snapshots.html[REST API]: + +[source] +---- +repositories + hdfs: + uri: "hdfs://:/" # optional - Hadoop file-system URI + path: "some/path" # required - path with the file-system where data is stored/loaded + load_defaults: "true" # optional - whether to load the default Hadoop configuration (default) or not + conf_location: "extra-cfg.xml" # optional - Hadoop configuration XML to be loaded (use commas for multi values) + conf. : "" # optional - 'inlined' key=value added to the Hadoop configuration + concurrent_streams: 5 # optional - the number of concurrent streams (defaults to 5) + compress: "false" # optional - whether to compress the metadata or not (default) + chunk_size: "10mb" # optional - chunk size (disabled by default) +---- + +NOTE: Be careful when including a paths within the `uri` setting; Some implementations ignore them completely while +others consider them. In general, we recommend keeping the `uri` to a minimum and using the `path` element instead. + +===== Plugging other file-systems + +Any HDFS-compatible file-systems (like Amazon `s3://` or Google `gs://`) can be used as long as the proper Hadoop +configuration is passed to the Elasticsearch plugin. In practice, this means making sure the correct Hadoop configuration +files (`core-site.xml` and `hdfs-site.xml`) and its jars are available in plugin classpath, just as you would with any +other Hadoop client or job. + +Otherwise, the plugin will only read the _default_, vanilla configuration of Hadoop and will not be able to recognized +the plugged-in file-system. diff --git a/plugins/repository-hdfs/build.gradle b/plugins/repository-hdfs/build.gradle new file mode 100644 index 00000000000..8f18f67f70d --- /dev/null +++ b/plugins/repository-hdfs/build.gradle @@ -0,0 +1,203 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +//apply plugin: 'nebula.provided-base' + +esplugin { + description 'The HDFS repository plugin adds support for Hadoop Distributed File-System (HDFS) repositories.' + classname 'org.elasticsearch.plugin.hadoop.hdfs.HdfsPlugin' +} + +configurations { + hadoop1 + hadoop2 +} + +versions << [ + 'hadoop1': '1.2.1', + 'hadoop2': '2.7.1' +] + +dependencies { + provided "org.elasticsearch:elasticsearch:${versions.elasticsearch}" + provided "org.apache.hadoop:hadoop-core:${versions.hadoop1}" + + // use Hadoop1 to compile and test things (a subset of Hadoop2) + testCompile "org.apache.hadoop:hadoop-core:${versions.hadoop1}" + testCompile "org.apache.hadoop:hadoop-test:${versions.hadoop1}" + // Hadoop dependencies + testCompile "commons-configuration:commons-configuration:1.6" + testCompile "commons-lang:commons-lang:${versions.commonslang}" + testCompile "commons-collections:commons-collections:3.2.2" + testCompile "commons-net:commons-net:1.4.1" + testCompile "org.mortbay.jetty:jetty:6.1.26" + testCompile "org.mortbay.jetty:jetty-util:6.1.26" + testCompile "org.mortbay.jetty:servlet-api:2.5-20081211" + testCompile "com.sun.jersey:jersey-core:1.8" + + + hadoop1("org.apache.hadoop:hadoop-core:${versions.hadoop1}") { + exclude module: "commons-cli" + exclude group: "com.sun.jersey" + exclude group: "org.mortbay.jetty" + exclude group: "tomcat" + exclude module: "commons-el" + exclude module: "hsqldb" + exclude group: "org.eclipse.jdt" + exclude module: "commons-beanutils" + exclude module: "commons-beanutils-core" + exclude module: "junit" + // provided by ES itself + exclude group: "log4j" + } + + hadoop2("org.apache.hadoop:hadoop-client:${versions.hadoop2}") { + exclude module: "commons-cli" + exclude group: "com.sun.jersey" + exclude group: "com.sun.jersey.contribs" + exclude group: "com.sun.jersey.jersey-test-framework" + exclude module: "guice" + exclude group: "org.mortbay.jetty" + exclude group: "tomcat" + exclude module: "commons-el" + exclude module: "hsqldb" + exclude group: "org.eclipse.jdt" + exclude module: "commons-beanutils" + exclude module: "commons-beanutils-core" + exclude module: "javax.servlet" + exclude module: "junit" + // provided by ES itself + exclude group: "log4j" + } + + hadoop2("org.apache.hadoop:hadoop-hdfs:${versions.hadoop2}") { + exclude module: "guava" + exclude module: "junit" + // provided by ES itself + exclude group: "log4j" + } +} + +configurations.all { + resolutionStrategy { + force "commons-codec:commons-codec:${versions.commonscodec}" + force "commons-logging:commons-logging:${versions.commonslogging}" + force "commons-lang:commons-lang:2.6" + force "commons-httpclient:commons-httpclient:3.0.1" + force "org.codehaus.jackson:jackson-core-asl:1.8.8" + force "org.codehaus.jackson:jackson-mapper-asl:1.8.8" + force "com.google.code.findbugs:jsr305:3.0.0" + force "com.google.guava:guava:16.0.1" + force "org.slf4j:slf4j-api:1.7.10" + force "org.slf4j:slf4j-log4j12:1.7.10" + } +} + + +dependencyLicenses { + mapping from: /hadoop-core.*/, to: 'hadoop-1' + mapping from: /hadoop-.*/, to: 'hadoop-2' +} + +compileJava.options.compilerArgs << '-Xlint:-deprecation,-rawtypes' + +// main jar includes just the plugin classes +jar { + include "org/elasticsearch/plugin/hadoop/hdfs/*" +} + +// hadoop jar (which actually depend on Hadoop) +task hadoopLinkedJar(type: Jar, dependsOn:jar) { + appendix "internal" + from sourceSets.main.output.classesDir + // exclude plugin + exclude "org/elasticsearch/plugin/hadoop/hdfs/*" +} + + +bundlePlugin.dependsOn hadoopLinkedJar + +// configure 'bundle' as being w/o Hadoop deps +bundlePlugin { + into ("internal-libs") { + from hadoopLinkedJar.archivePath + } + + into ("hadoop-libs") { + from configurations.hadoop2.allArtifacts.files + from configurations.hadoop2 + } +} + + +task distZipHadoop1(type: Zip, dependsOn: [hadoopLinkedJar, jar]) { zipTask -> + from (zipTree(bundlePlugin.archivePath)) { + include "*" + include "internal-libs/**" + } + + description = "Builds archive (with Hadoop1 dependencies) suitable for download page." + classifier = "hadoop1" + + into ("hadoop-libs") { + from configurations.hadoop1.allArtifacts.files + from configurations.hadoop1 + } +} + +task distZipHadoop2(type: Zip, dependsOn: [hadoopLinkedJar, jar]) { zipTask -> + from (zipTree(bundlePlugin.archivePath)) { + include "*" + include "internal-libs/**" + } + + description = "Builds archive (with Hadoop2/YARN dependencies) suitable for download page." + classifier = "hadoop2" + + into ("hadoop-libs") { + from configurations.hadoop2.allArtifacts.files + from configurations.hadoop2 + } +} + +task distZipNoHadoop(type: Zip, dependsOn: [hadoopLinkedJar, jar]) { zipTask -> + from (zipTree(bundlePlugin.archivePath)) { + exclude "hadoop-libs/**" + } + + from sourceSets.main.output.resourcesDir + + description = "Builds archive (without any Hadoop dependencies) suitable for download page." + classifier = "lite" +} + + +artifacts { + archives bundlePlugin + 'default' bundlePlugin + archives distZipHadoop1 + archives distZipHadoop2 + archives distZipNoHadoop +} + +integTest { + cluster { + plugin(pluginProperties.extension.name, zipTree(distZipHadoop2.archivePath)) + } +} \ No newline at end of file diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/plugin/hadoop/hdfs/HdfsPlugin.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/plugin/hadoop/hdfs/HdfsPlugin.java new file mode 100644 index 00000000000..9b65f7bec2f --- /dev/null +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/plugin/hadoop/hdfs/HdfsPlugin.java @@ -0,0 +1,173 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.plugin.hadoop.hdfs; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.net.URLClassLoader; +import java.nio.file.Path; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; + +import org.elasticsearch.SpecialPermission; +import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.io.FileSystemUtils; +import org.elasticsearch.common.io.PathUtils; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.repositories.RepositoriesModule; +import org.elasticsearch.repositories.Repository; + +// +// Note this plugin is somewhat special as Hadoop itself loads a number of libraries and thus requires a number of permissions to run even in client mode. +// This poses two problems: +// - Hadoop itself comes with tons of jars, many providing the same classes across packages. In particular Hadoop 2 provides package annotations in the same +// package across jars which trips JarHell. Thus, to allow Hadoop jars to load, the plugin uses a dedicated CL which picks them up from the hadoop-libs folder. +// - The issue though with using a different CL is that it picks up the jars from a different location / codeBase and thus it does not fall under the plugin +// permissions. In other words, the plugin permissions don't apply to the hadoop libraries. +// There are different approaches here: +// - implement a custom classloader that loads the jars but 'lies' about the codesource. It is doable but since URLClassLoader is locked down, one would +// would have to implement the whole jar opening and loading from it. Not impossible but still fairly low-level. +// Further more, even if the code has the proper credentials, it needs to use the proper Privileged blocks to use its full permissions which does not +// happen in the Hadoop code base. +// - use a different Policy. Works but the Policy is JVM wide and thus the code needs to be quite efficient - quite a bit impact to cover just some plugin +// libraries +// - use a DomainCombiner. This doesn't change the semantics (it's clear where the code is loaded from, etc..) however it gives us a scoped, fine-grained +// callback on handling the permission intersection for secured calls. Note that DC works only in the current PAC call - the moment another PA is used, +// the domain combiner is going to be ignored (unless the caller specifically uses it). Due to its scoped impact and official Java support, this approach +// was used. + +// ClassLoading info +// - package plugin.hadoop.hdfs is part of the plugin +// - all the other packages are assumed to be in the nested Hadoop CL. + +// Code +public class HdfsPlugin extends Plugin { + + @Override + public String name() { + return "repository-hdfs"; + } + + @Override + public String description() { + return "HDFS Repository Plugin"; + } + + @SuppressWarnings("unchecked") + public void onModule(RepositoriesModule repositoriesModule) { + String baseLib = Utils.detectLibFolder(); + List cp = getHadoopClassLoaderPath(baseLib); + + ClassLoader hadoopCL = URLClassLoader.newInstance(cp.toArray(new URL[cp.size()]), getClass().getClassLoader()); + + Class repository = null; + try { + repository = (Class) hadoopCL.loadClass("org.elasticsearch.repositories.hdfs.HdfsRepository"); + } catch (ClassNotFoundException cnfe) { + throw new IllegalStateException("Cannot load plugin class; is the plugin class setup correctly?", cnfe); + } + + repositoriesModule.registerRepository("hdfs", repository, BlobStoreIndexShardRepository.class); + Loggers.getLogger(HdfsPlugin.class).info("Loaded Hadoop [{}] libraries from {}", getHadoopVersion(hadoopCL), baseLib); + } + + protected List getHadoopClassLoaderPath(String baseLib) { + List cp = new ArrayList<>(); + // add plugin internal jar + discoverJars(createURI(baseLib, "internal-libs"), cp, false); + // add Hadoop jars + discoverJars(createURI(baseLib, "hadoop-libs"), cp, true); + return cp; + } + + private String getHadoopVersion(ClassLoader hadoopCL) { + SecurityManager sm = System.getSecurityManager(); + if (sm != null) { + // unprivileged code such as scripts do not have SpecialPermission + sm.checkPermission(new SpecialPermission()); + } + + return AccessController.doPrivileged(new PrivilegedAction() { + @Override + public String run() { + // Hadoop 2 relies on TCCL to determine the version + ClassLoader tccl = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(hadoopCL); + return doGetHadoopVersion(hadoopCL); + } finally { + Thread.currentThread().setContextClassLoader(tccl); + } + } + }, Utils.hadoopACC()); + } + + private String doGetHadoopVersion(ClassLoader hadoopCL) { + String version = "Unknown"; + + Class clz = null; + try { + clz = hadoopCL.loadClass("org.apache.hadoop.util.VersionInfo"); + } catch (ClassNotFoundException cnfe) { + // unknown + } + if (clz != null) { + try { + Method method = clz.getMethod("getVersion"); + version = method.invoke(null).toString(); + } catch (Exception ex) { + // class has changed, ignore + } + } + + return version; + } + + private URI createURI(String base, String suffix) { + String location = base + suffix; + try { + return new URI(location); + } catch (URISyntaxException ex) { + throw new IllegalStateException(String.format(Locale.ROOT, "Cannot detect plugin folder; [%s] seems invalid", location), ex); + } + } + + @SuppressForbidden(reason = "discover nested jar") + private void discoverJars(URI libPath, List cp, boolean optional) { + try { + Path[] jars = FileSystemUtils.files(PathUtils.get(libPath), "*.jar"); + + for (Path path : jars) { + cp.add(path.toUri().toURL()); + } + } catch (IOException ex) { + if (!optional) { + throw new IllegalStateException("Cannot compute plugin classpath", ex); + } + } + } +} \ No newline at end of file diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/plugin/hadoop/hdfs/Utils.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/plugin/hadoop/hdfs/Utils.java new file mode 100644 index 00000000000..101025d029e --- /dev/null +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/plugin/hadoop/hdfs/Utils.java @@ -0,0 +1,84 @@ +package org.elasticsearch.plugin.hadoop.hdfs; + +import java.net.URL; +import java.security.AccessControlContext; +import java.security.AccessController; +import java.security.DomainCombiner; +import java.security.PrivilegedAction; +import java.security.ProtectionDomain; + +import org.elasticsearch.SpecialPermission; + +public abstract class Utils { + + protected static AccessControlContext hadoopACC() { + SecurityManager sm = System.getSecurityManager(); + if (sm != null) { + // unprivileged code such as scripts do not have SpecialPermission + sm.checkPermission(new SpecialPermission()); + } + + return AccessController.doPrivileged(new PrivilegedAction() { + @Override + public AccessControlContext run() { + return new AccessControlContext(AccessController.getContext(), new HadoopDomainCombiner()); + } + }); + } + + private static class HadoopDomainCombiner implements DomainCombiner { + + private static String BASE_LIB = detectLibFolder(); + + @Override + public ProtectionDomain[] combine(ProtectionDomain[] currentDomains, ProtectionDomain[] assignedDomains) { + for (ProtectionDomain pd : assignedDomains) { + if (pd.getCodeSource().getLocation().toString().startsWith(BASE_LIB)) { + return assignedDomains; + } + } + + return currentDomains; + } + } + + static String detectLibFolder() { + ClassLoader cl = Utils.class.getClassLoader(); + + // we could get the URL from the URLClassloader directly + // but that can create issues when running the tests from the IDE + // we could detect that by loading resources but that as well relies on + // the JAR URL + String classToLookFor = HdfsPlugin.class.getName().replace(".", "/").concat(".class"); + URL classURL = cl.getResource(classToLookFor); + if (classURL == null) { + throw new IllegalStateException("Cannot detect itself; something is wrong with this ClassLoader " + cl); + } + + String base = classURL.toString(); + + // extract root + // typically a JAR URL + int index = base.indexOf("!/"); + if (index > 0) { + base = base.substring(0, index); + // remove its prefix (jar:) + base = base.substring(4); + // remove the trailing jar + index = base.lastIndexOf("/"); + base = base.substring(0, index + 1); + } + // not a jar - something else, do a best effort here + else { + // remove the class searched + base = base.substring(0, base.length() - classToLookFor.length()); + } + + // append / + if (!base.endsWith("/")) { + base = base.concat("/"); + } + + return base; + } +} \ No newline at end of file diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/FileSystemFactory.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/FileSystemFactory.java new file mode 100644 index 00000000000..5e7c4d3fa57 --- /dev/null +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/FileSystemFactory.java @@ -0,0 +1,28 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.repositories.hdfs; + +import java.io.IOException; + +import org.apache.hadoop.fs.FileSystem; + +interface FileSystemFactory { + + FileSystem getFileSystem() throws IOException; +} diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/FsCallback.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/FsCallback.java new file mode 100644 index 00000000000..3eda2272149 --- /dev/null +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/FsCallback.java @@ -0,0 +1,29 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.repositories.hdfs; + +import java.io.IOException; + +import org.apache.hadoop.fs.FileSystem; + +interface FsCallback { + + V doInHdfs(FileSystem fs) throws IOException; +} diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java new file mode 100644 index 00000000000..f71ca7020a8 --- /dev/null +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java @@ -0,0 +1,173 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.repositories.hdfs; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.blobstore.BlobMetaData; +import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; +import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.Streams; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Locale; +import java.util.Map; + +public class HdfsBlobContainer extends AbstractBlobContainer { + + protected final HdfsBlobStore blobStore; + protected final Path path; + + public HdfsBlobContainer(BlobPath blobPath, HdfsBlobStore blobStore, Path path) { + super(blobPath); + this.blobStore = blobStore; + this.path = path; + } + + @Override + public boolean blobExists(String blobName) { + try { + return SecurityUtils.execute(blobStore.fileSystemFactory(), new FsCallback() { + @Override + public Boolean doInHdfs(FileSystem fs) throws IOException { + return fs.exists(new Path(path, blobName)); + } + }); + } catch (Exception e) { + return false; + } + } + + @Override + public void deleteBlob(String blobName) throws IOException { + SecurityUtils.execute(blobStore.fileSystemFactory(), new FsCallback() { + @Override + public Boolean doInHdfs(FileSystem fs) throws IOException { + return fs.delete(new Path(path, blobName), true); + } + }); + } + + @Override + public void move(String sourceBlobName, String targetBlobName) throws IOException { + boolean rename = SecurityUtils.execute(blobStore.fileSystemFactory(), new FsCallback() { + @Override + public Boolean doInHdfs(FileSystem fs) throws IOException { + return fs.rename(new Path(path, sourceBlobName), new Path(path, targetBlobName)); + } + }); + + if (!rename) { + throw new IOException(String.format(Locale.ROOT, "can not move blob from [%s] to [%s]", sourceBlobName, targetBlobName)); + } + } + + @Override + public InputStream readBlob(String blobName) throws IOException { + // FSDataInputStream does buffering internally + return SecurityUtils.execute(blobStore.fileSystemFactory(), new FsCallback() { + @Override + public InputStream doInHdfs(FileSystem fs) throws IOException { + return fs.open(new Path(path, blobName), blobStore.bufferSizeInBytes()); + } + }); + } + + @Override + public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException { + SecurityUtils.execute(blobStore.fileSystemFactory(), new FsCallback() { + @Override + public Void doInHdfs(FileSystem fs) throws IOException { + try (OutputStream stream = createOutput(blobName)) { + Streams.copy(inputStream, stream); + } + return null; + } + }); + } + + @Override + public void writeBlob(String blobName, BytesReference bytes) throws IOException { + SecurityUtils.execute(blobStore.fileSystemFactory(), new FsCallback() { + @Override + public Void doInHdfs(FileSystem fs) throws IOException { + try (OutputStream stream = createOutput(blobName)) { + bytes.writeTo(stream); + } + return null; + } + }); + } + + private OutputStream createOutput(String blobName) throws IOException { + Path file = new Path(path, blobName); + // FSDataOutputStream does buffering internally + return blobStore.fileSystemFactory().getFileSystem().create(file, true, blobStore.bufferSizeInBytes()); + } + + @Override + public Map listBlobsByPrefix(final @Nullable String blobNamePrefix) throws IOException { + FileStatus[] files = SecurityUtils.execute(blobStore.fileSystemFactory(), new FsCallback() { + @Override + public FileStatus[] doInHdfs(FileSystem fs) throws IOException { + return fs.listStatus(path, new PathFilter() { + @Override + public boolean accept(Path path) { + return path.getName().startsWith(blobNamePrefix); + } + }); + } + }); + if (files == null || files.length == 0) { + return Collections.emptyMap(); + } + Map map = new LinkedHashMap(); + for (FileStatus file : files) { + map.put(file.getPath().getName(), new PlainBlobMetaData(file.getPath().getName(), file.getLen())); + } + return Collections.unmodifiableMap(map); + } + + @Override + public Map listBlobs() throws IOException { + FileStatus[] files = SecurityUtils.execute(blobStore.fileSystemFactory(), new FsCallback() { + @Override + public FileStatus[] doInHdfs(FileSystem fs) throws IOException { + return fs.listStatus(path); + } + }); + if (files == null || files.length == 0) { + return Collections.emptyMap(); + } + Map map = new LinkedHashMap(); + for (FileStatus file : files) { + map.put(file.getPath().getName(), new PlainBlobMetaData(file.getPath().getName(), file.getLen())); + } + return Collections.unmodifiableMap(map); + } +} \ No newline at end of file diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobStore.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobStore.java new file mode 100644 index 00000000000..b75485fa7fe --- /dev/null +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobStore.java @@ -0,0 +1,125 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.repositories.hdfs; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.blobstore.BlobContainer; +import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.blobstore.BlobStore; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.concurrent.Executor; + +public class HdfsBlobStore extends AbstractComponent implements BlobStore { + + private final FileSystemFactory ffs; + private final Path rootHdfsPath; + private final ThreadPool threadPool; + private final int bufferSizeInBytes; + + public HdfsBlobStore(Settings settings, FileSystemFactory ffs, Path path, ThreadPool threadPool) throws IOException { + super(settings); + this.ffs = ffs; + this.rootHdfsPath = path; + this.threadPool = threadPool; + + this.bufferSizeInBytes = (int) settings.getAsBytesSize("buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)).bytes(); + + mkdirs(path); + } + + private void mkdirs(Path path) throws IOException { + SecurityUtils.execute(ffs, new FsCallback() { + @Override + public Void doInHdfs(FileSystem fs) throws IOException { + if (!fs.exists(path)) { + fs.mkdirs(path); + } + return null; + } + }); + } + + @Override + public String toString() { + return rootHdfsPath.toUri().toString(); + } + + public FileSystemFactory fileSystemFactory() { + return ffs; + } + + public Path path() { + return rootHdfsPath; + } + + public Executor executor() { + return threadPool.executor(ThreadPool.Names.SNAPSHOT); + } + + public int bufferSizeInBytes() { + return bufferSizeInBytes; + } + + @Override + public BlobContainer blobContainer(BlobPath path) { + return new HdfsBlobContainer(path, this, buildHdfsPath(path)); + } + + @Override + public void delete(BlobPath path) throws IOException { + SecurityUtils.execute(ffs, new FsCallback() { + @Override + public Void doInHdfs(FileSystem fs) throws IOException { + fs.delete(translateToHdfsPath(path), true); + return null; + } + }); + } + + private Path buildHdfsPath(BlobPath blobPath) { + final Path path = translateToHdfsPath(blobPath); + try { + mkdirs(path); + } catch (IOException ex) { + throw new ElasticsearchException("failed to create blob container", ex); + } + return path; + } + + private Path translateToHdfsPath(BlobPath blobPath) { + Path path = path(); + for (String p : blobPath) { + path = new Path(path, p); + } + return path; + } + + @Override + public void close() { + // + } +} \ No newline at end of file diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java new file mode 100644 index 00000000000..11081445fd4 --- /dev/null +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java @@ -0,0 +1,259 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.repositories.hdfs; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URL; +import java.nio.file.Files; +import java.security.AccessController; +import java.security.PrivilegedActionException; +import java.security.PrivilegedExceptionAction; +import java.util.Locale; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchGenerationException; +import org.elasticsearch.SpecialPermission; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.blobstore.BlobStore; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.PathUtils; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.index.snapshots.IndexShardRepository; +import org.elasticsearch.repositories.RepositoryName; +import org.elasticsearch.repositories.RepositorySettings; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.threadpool.ThreadPool; + +public class HdfsRepository extends BlobStoreRepository implements FileSystemFactory { + + public final static String TYPE = "hdfs"; + + private final HdfsBlobStore blobStore; + private final BlobPath basePath; + private final ByteSizeValue chunkSize; + private final boolean compress; + private final RepositorySettings repositorySettings; + private FileSystem fs; + + @Inject + public HdfsRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository, ThreadPool threadPool) throws IOException { + super(name.getName(), repositorySettings, indexShardRepository); + + this.repositorySettings = repositorySettings; + + String path = repositorySettings.settings().get("path", settings.get("path")); + if (path == null) { + throw new IllegalArgumentException("no 'path' defined for hdfs snapshot/restore"); + } + + // get configuration + fs = getFileSystem(); + Path hdfsPath = SecurityUtils.execute(fs, new FsCallback() { + @Override + public Path doInHdfs(FileSystem fs) throws IOException { + return fs.makeQualified(new Path(path)); + } + }); + this.basePath = BlobPath.cleanPath(); + + logger.debug("Using file-system [{}] for URI [{}], path [{}]", fs, fs.getUri(), hdfsPath); + blobStore = new HdfsBlobStore(settings, this, hdfsPath, threadPool); + this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", settings.getAsBytesSize("chunk_size", null)); + this.compress = repositorySettings.settings().getAsBoolean("compress", settings.getAsBoolean("compress", false)); + } + + // as the FileSystem is long-lived and might go away, make sure to check it before it's being used. + @Override + public FileSystem getFileSystem() throws IOException { + SecurityManager sm = System.getSecurityManager(); + if (sm != null) { + // unprivileged code such as scripts do not have SpecialPermission + sm.checkPermission(new SpecialPermission()); + } + + try { + return AccessController.doPrivileged(new PrivilegedExceptionAction() { + @Override + public FileSystem run() throws IOException { + return doGetFileSystem(); + } + }, SecurityUtils.AccBridge.acc()); + } catch (PrivilegedActionException pae) { + Throwable th = pae.getCause(); + if (th instanceof Error) { + throw (Error) th; + } + if (th instanceof RuntimeException) { + throw (RuntimeException) th; + } + if (th instanceof IOException) { + throw (IOException) th; + } + throw new ElasticsearchException(pae); + } + } + + private FileSystem doGetFileSystem() throws IOException { + // check if the fs is still alive + // make a cheap call that triggers little to no security checks + if (fs != null) { + try { + fs.isFile(fs.getWorkingDirectory()); + } catch (IOException ex) { + if (ex.getMessage().contains("Filesystem closed")) { + fs = null; + } + else { + throw ex; + } + } + } + if (fs == null) { + Thread th = Thread.currentThread(); + ClassLoader oldCL = th.getContextClassLoader(); + try { + th.setContextClassLoader(getClass().getClassLoader()); + return initFileSystem(repositorySettings); + } catch (IOException ex) { + throw ex; + } finally { + th.setContextClassLoader(oldCL); + } + } + return fs; + } + + private FileSystem initFileSystem(RepositorySettings repositorySettings) throws IOException { + + Configuration cfg = new Configuration(repositorySettings.settings().getAsBoolean("load_defaults", settings.getAsBoolean("load_defaults", true))); + cfg.setClassLoader(this.getClass().getClassLoader()); + cfg.reloadConfiguration(); + + String confLocation = repositorySettings.settings().get("conf_location", settings.get("conf_location")); + if (Strings.hasText(confLocation)) { + for (String entry : Strings.commaDelimitedListToStringArray(confLocation)) { + addConfigLocation(cfg, entry.trim()); + } + } + + Map map = repositorySettings.settings().getByPrefix("conf.").getAsMap(); + for (Entry entry : map.entrySet()) { + cfg.set(entry.getKey(), entry.getValue()); + } + + try { + UserGroupInformation.setConfiguration(cfg); + } catch (Throwable th) { + throw new ElasticsearchGenerationException(String.format(Locale.ROOT, "Cannot initialize Hadoop"), th); + } + + String uri = repositorySettings.settings().get("uri", settings.get("uri")); + URI actualUri = (uri != null ? URI.create(uri) : FileSystem.getDefaultUri(cfg)); + String user = repositorySettings.settings().get("user", settings.get("user")); + + try { + // disable FS cache + String disableFsCache = String.format(Locale.ROOT, "fs.%s.impl.disable.cache", actualUri.getScheme()); + cfg.setBoolean(disableFsCache, true); + + return (user != null ? FileSystem.get(actualUri, cfg, user) : FileSystem.get(actualUri, cfg)); + } catch (Exception ex) { + throw new ElasticsearchGenerationException(String.format(Locale.ROOT, "Cannot create Hdfs file-system for uri [%s]", actualUri), ex); + } + } + + @SuppressForbidden(reason = "pick up Hadoop config (which can be on HDFS)") + private void addConfigLocation(Configuration cfg, String confLocation) { + URL cfgURL = null; + // it's an URL + if (!confLocation.contains(":")) { + cfgURL = cfg.getClassLoader().getResource(confLocation); + + // fall back to file + if (cfgURL == null) { + java.nio.file.Path path = PathUtils.get(confLocation); + if (!Files.isReadable(path)) { + throw new IllegalArgumentException( + String.format(Locale.ROOT, + "Cannot find classpath resource or file 'conf_location' [%s] defined for hdfs snapshot/restore", + confLocation)); + } + String pathLocation = path.toUri().toString(); + logger.debug("Adding path [{}] as file [{}]", confLocation, pathLocation); + confLocation = pathLocation; + } + else { + logger.debug("Resolving path [{}] to classpath [{}]", confLocation, cfgURL); + } + } + else { + logger.debug("Adding path [{}] as URL", confLocation); + } + + if (cfgURL == null) { + try { + cfgURL = new URL(confLocation); + } catch (MalformedURLException ex) { + throw new IllegalArgumentException(String.format(Locale.ROOT, + "Invalid 'conf_location' URL [%s] defined for hdfs snapshot/restore", confLocation), ex); + } + } + + cfg.addResource(cfgURL); + } + + @Override + protected BlobStore blobStore() { + return blobStore; + } + + @Override + protected BlobPath basePath() { + return basePath; + } + + @Override + protected boolean isCompress() { + return compress; + } + + @Override + protected ByteSizeValue chunkSize() { + return chunkSize; + } + + @Override + protected void doClose() throws ElasticsearchException { + super.doClose(); + + IOUtils.closeStream(fs); + fs = null; + } +} \ No newline at end of file diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/SecurityUtils.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/SecurityUtils.java new file mode 100644 index 00000000000..6a0d4ffa818 --- /dev/null +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/SecurityUtils.java @@ -0,0 +1,73 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.repositories.hdfs; + +import java.io.IOException; +import java.security.AccessControlContext; +import java.security.AccessController; +import java.security.PrivilegedActionException; +import java.security.PrivilegedExceptionAction; + +import org.apache.hadoop.fs.FileSystem; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.SpecialPermission; +import org.elasticsearch.plugin.hadoop.hdfs.Utils; + +class SecurityUtils { + + abstract static class AccBridge extends Utils { + static AccessControlContext acc() { + return Utils.hadoopACC(); + } + } + + static V execute(FileSystemFactory ffs, FsCallback callback) throws IOException { + return execute(ffs.getFileSystem(), callback); + } + + static V execute(FileSystem fs, FsCallback callback) throws IOException { + SecurityManager sm = System.getSecurityManager(); + if (sm != null) { + // unprivileged code such as scripts do not have SpecialPermission + sm.checkPermission(new SpecialPermission()); + } + + try { + return AccessController.doPrivileged(new PrivilegedExceptionAction() { + @Override + public V run() throws IOException { + return callback.doInHdfs(fs); + } + }, AccBridge.acc()); + } catch (PrivilegedActionException pae) { + Throwable th = pae.getCause(); + if (th instanceof Error) { + throw (Error) th; + } + if (th instanceof RuntimeException) { + throw (RuntimeException) th; + } + if (th instanceof IOException) { + throw (IOException) th; + } + throw new ElasticsearchException(pae); + } + } +} diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/TestingFs.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/TestingFs.java new file mode 100644 index 00000000000..46cb0a263fe --- /dev/null +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/TestingFs.java @@ -0,0 +1,57 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.repositories.hdfs; + +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.elasticsearch.common.SuppressForbidden; + +import java.io.File; +import java.io.IOException; + +/** + * Extends LFS to improve some operations to keep the security permissions at + * bay. In particular mkdir is smarter and doesn't have to walk all the file + * hierarchy but rather only limits itself to the parent/working dir and creates + * a file only when necessary. + */ +public class TestingFs extends LocalFileSystem { + + private static class ImprovedRawLocalFileSystem extends RawLocalFileSystem { + @Override + @SuppressForbidden(reason = "the Hadoop API depends on java.io.File") + public boolean mkdirs(Path f) throws IOException { + File wd = pathToFile(getWorkingDirectory()); + File local = pathToFile(f); + if (wd.equals(local) || local.exists()) { + return true; + } + return mkdirs(f.getParent()) && local.mkdir(); + } + } + + public TestingFs() { + super(new ImprovedRawLocalFileSystem()); + // use the build path instead of the starting dir as that one has read permissions + //setWorkingDirectory(new Path(getClass().getProtectionDomain().getCodeSource().getLocation().toString())); + setWorkingDirectory(new Path(System.getProperty("java.io.tmpdir"))); + } +} diff --git a/plugins/repository-hdfs/src/main/plugin-metadata/plugin-security.policy b/plugins/repository-hdfs/src/main/plugin-metadata/plugin-security.policy new file mode 100644 index 00000000000..d26acd121e4 --- /dev/null +++ b/plugins/repository-hdfs/src/main/plugin-metadata/plugin-security.policy @@ -0,0 +1,67 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +grant { + // used by the plugin to get the TCCL to properly initialize all of Hadoop components + permission java.lang.RuntimePermission "getClassLoader"; + + // used for DomainCombiner + permission java.security.SecurityPermission "createAccessControlContext"; + + // set TCCL used for bootstrapping Hadoop Configuration and JAAS + permission java.lang.RuntimePermission "setContextClassLoader"; + + // + // Hadoop 1 + // + + // UserGroupInformation (UGI) + permission java.lang.reflect.ReflectPermission "suppressAccessChecks"; + + // UGI triggers JAAS + permission javax.security.auth.AuthPermission "getSubject"; + + // JAAS libraries are not loaded with the proper context in Hadoop, hence why the permission is needed here + permission java.lang.RuntimePermission "loadLibrary.jaas_nt"; + + // which triggers the use of the Kerberos library + permission java.lang.RuntimePermission "accessClassInPackage.sun.security.krb5"; + + // plus LoginContext + permission javax.security.auth.AuthPermission "modifyPrincipals"; + + permission javax.security.auth.AuthPermission "modifyPublicCredentials"; + + permission javax.security.auth.AuthPermission "modifyPrivateCredentials"; + + // + // Hadoop 2 + // + + // UGI (Ugi Metrics) + permission java.lang.RuntimePermission "accessDeclaredMembers"; + + // Shell initialization - reading system props + permission java.util.PropertyPermission "*", "read,write"; + + permission javax.security.auth.PrivateCredentialPermission "org.apache.hadoop.security.Credentials \"*\"", "read"; + + // HftpFileSystem (all present FS are loaded and initialized at startup ...) + permission java.lang.RuntimePermission "setFactory"; +}; \ No newline at end of file diff --git a/plugins/repository-hdfs/src/main/resources/hadoop-libs/README.asciidoc b/plugins/repository-hdfs/src/main/resources/hadoop-libs/README.asciidoc new file mode 100644 index 00000000000..e9f85f3cdf7 --- /dev/null +++ b/plugins/repository-hdfs/src/main/resources/hadoop-libs/README.asciidoc @@ -0,0 +1 @@ +Folder containing the required Hadoop client libraries and dependencies. \ No newline at end of file diff --git a/plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/HdfsRepositoryRestIT.java b/plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/HdfsRepositoryRestIT.java new file mode 100644 index 00000000000..fd87e18cbce --- /dev/null +++ b/plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/HdfsRepositoryRestIT.java @@ -0,0 +1,30 @@ +package org.elasticsearch.plugin.hadoop.hdfs; + +import java.io.IOException; +import java.util.Collection; + +import org.elasticsearch.plugin.hadoop.hdfs.HdfsPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.test.rest.RestTestCandidate; +import org.elasticsearch.test.rest.parser.RestTestParseException; + +import com.carrotsearch.randomizedtesting.annotations.Name; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + +public class HdfsRepositoryRestIT extends ESRestTestCase { + + @Override + protected Collection> nodePlugins() { + return pluginList(HdfsPlugin.class); + } + + public HdfsRepositoryRestIT(@Name("yaml") RestTestCandidate testCandidate) { + super(testCandidate); + } + + @ParametersFactory + public static Iterable parameters() throws IOException, RestTestParseException { + return ESRestTestCase.createParameters(0, 1); + } +} diff --git a/plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/HdfsTestPlugin.java b/plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/HdfsTestPlugin.java new file mode 100644 index 00000000000..4b4e2aa05ef --- /dev/null +++ b/plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/HdfsTestPlugin.java @@ -0,0 +1,15 @@ +package org.elasticsearch.plugin.hadoop.hdfs; + +import java.net.URL; +import java.util.Collections; +import java.util.List; + +import org.elasticsearch.plugin.hadoop.hdfs.HdfsPlugin; + +public class HdfsTestPlugin extends HdfsPlugin { + + @Override + protected List getHadoopClassLoaderPath(String baseLib) { + return Collections.emptyList(); + } +} diff --git a/plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/HdfsTests.java b/plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/HdfsTests.java new file mode 100644 index 00000000000..d1b23e92538 --- /dev/null +++ b/plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/HdfsTests.java @@ -0,0 +1,218 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.plugin.hadoop.hdfs; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; + +import java.util.Collection; + +import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse; +import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; +import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.repositories.RepositoryException; +import org.elasticsearch.repositories.RepositoryMissingException; +import org.elasticsearch.repositories.hdfs.TestingFs; +import org.elasticsearch.snapshots.SnapshotState; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.ESIntegTestCase.ClusterScope; +import org.elasticsearch.test.ESIntegTestCase.Scope; +import org.elasticsearch.test.ESIntegTestCase.ThirdParty; +import org.elasticsearch.test.store.MockFSDirectoryService; +import org.junit.After; +import org.junit.Before; + +/** + * You must specify {@code -Dtests.thirdparty=true} + */ +@ThirdParty +@ClusterScope(scope = Scope.SUITE, numDataNodes = 1, transportClientRatio = 0.0) +public class HdfsTests extends ESIntegTestCase { + + @Override + public Settings indexSettings() { + return Settings.builder() + .put(super.indexSettings()) + .put(MockFSDirectoryService.RANDOM_PREVENT_DOUBLE_WRITE, false) + .put(MockFSDirectoryService.RANDOM_NO_DELETE_OPEN_FILE, false) + .build(); + } + + @Override + protected Settings nodeSettings(int ordinal) { + Settings.Builder settings = Settings.builder() + .put(super.nodeSettings(ordinal)) + .put("path.home", createTempDir()) + .put("path.repo", "") + .put(MockFSDirectoryService.RANDOM_PREVENT_DOUBLE_WRITE, false) + .put(MockFSDirectoryService.RANDOM_NO_DELETE_OPEN_FILE, false); + return settings.build(); + } + + @Override + protected Collection> nodePlugins() { + return pluginList(HdfsTestPlugin.class); + } + + private String path; + + @Before + public final void wipeBefore() throws Exception { + wipeRepositories(); + path = "build/data/repo-" + randomInt(); + } + + @After + public final void wipeAfter() throws Exception { + wipeRepositories(); + } + + public void testSimpleWorkflow() { + Client client = client(); + logger.info("--> creating hdfs repository with path [{}]", path); + + PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo") + .setType("hdfs") + .setSettings(Settings.settingsBuilder() + //.put("uri", "hdfs://127.0.0.1:51227") + .put("conf.fs.es-hdfs.impl", TestingFs.class.getName()) + .put("uri", "es-hdfs://./build/") + .put("path", path) + .put("conf", "additional-cfg.xml, conf-2.xml") + .put("chunk_size", randomIntBetween(100, 1000) + "k") + .put("compress", randomBoolean()) + ).get(); + assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); + + createIndex("test-idx-1", "test-idx-2", "test-idx-3"); + ensureGreen(); + + logger.info("--> indexing some data"); + for (int i = 0; i < 100; i++) { + index("test-idx-1", "doc", Integer.toString(i), "foo", "bar" + i); + index("test-idx-2", "doc", Integer.toString(i), "foo", "baz" + i); + index("test-idx-3", "doc", Integer.toString(i), "foo", "baz" + i); + } + refresh(); + assertThat(count(client, "test-idx-1"), equalTo(100L)); + assertThat(count(client, "test-idx-2"), equalTo(100L)); + assertThat(count(client, "test-idx-3"), equalTo(100L)); + + logger.info("--> snapshot"); + CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx-*", "-test-idx-3").get(); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); + + assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS)); + + logger.info("--> delete some data"); + for (int i = 0; i < 50; i++) { + client.prepareDelete("test-idx-1", "doc", Integer.toString(i)).get(); + } + for (int i = 50; i < 100; i++) { + client.prepareDelete("test-idx-2", "doc", Integer.toString(i)).get(); + } + for (int i = 0; i < 100; i += 2) { + client.prepareDelete("test-idx-3", "doc", Integer.toString(i)).get(); + } + refresh(); + assertThat(count(client, "test-idx-1"), equalTo(50L)); + assertThat(count(client, "test-idx-2"), equalTo(50L)); + assertThat(count(client, "test-idx-3"), equalTo(50L)); + + logger.info("--> close indices"); + client.admin().indices().prepareClose("test-idx-1", "test-idx-2").get(); + + logger.info("--> restore all indices from the snapshot"); + RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute().actionGet(); + assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); + + ensureGreen(); + assertThat(count(client, "test-idx-1"), equalTo(100L)); + assertThat(count(client, "test-idx-2"), equalTo(100L)); + assertThat(count(client, "test-idx-3"), equalTo(50L)); + + // Test restore after index deletion + logger.info("--> delete indices"); + wipeIndices("test-idx-1", "test-idx-2"); + logger.info("--> restore one index after deletion"); + restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx-*", "-test-idx-2").execute().actionGet(); + assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); + ensureGreen(); + assertThat(count(client, "test-idx-1"), equalTo(100L)); + ClusterState clusterState = client.admin().cluster().prepareState().get().getState(); + assertThat(clusterState.getMetaData().hasIndex("test-idx-1"), equalTo(true)); + assertThat(clusterState.getMetaData().hasIndex("test-idx-2"), equalTo(false)); + } + + private void wipeIndices(String... indices) { + cluster().wipeIndices(indices); + } + + // RepositoryVerificationException.class + public void testWrongPath() { + Client client = client(); + logger.info("--> creating hdfs repository with path [{}]", path); + + try { + PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo") + .setType("hdfs") + .setSettings(Settings.settingsBuilder() + // .put("uri", "hdfs://127.0.0.1:51227/") + .put("conf.fs.es-hdfs.impl", TestingFs.class.getName()) + .put("uri", "es-hdfs:///") + .put("path", path + "a@b$c#11:22") + .put("chunk_size", randomIntBetween(100, 1000) + "k") + .put("compress", randomBoolean())) + .get(); + assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); + + createIndex("test-idx-1", "test-idx-2", "test-idx-3"); + ensureGreen(); + fail("Path name is invalid"); + } catch (RepositoryException re) { + // expected + } + } + + /** + * Deletes repositories, supports wildcard notation. + */ + public static void wipeRepositories(String... repositories) { + // if nothing is provided, delete all + if (repositories.length == 0) { + repositories = new String[]{"*"}; + } + for (String repository : repositories) { + try { + client().admin().cluster().prepareDeleteRepository(repository).execute().actionGet(); + } catch (RepositoryMissingException ex) { + // ignore + } + } + } + + private long count(Client client, String index) { + return client.prepareSearch(index).setSize(0).get().getHits().totalHits(); + } +} \ No newline at end of file diff --git a/plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/MiniHDFSCluster.java b/plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/MiniHDFSCluster.java new file mode 100644 index 00000000000..0d700615a1a --- /dev/null +++ b/plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/MiniHDFSCluster.java @@ -0,0 +1,48 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.hadoop.hdfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.elasticsearch.common.SuppressForbidden; + +import java.io.File; + +public class MiniHDFSCluster { + + @SuppressForbidden(reason = "Hadoop is messy") + public static void main(String[] args) throws Exception { + FileUtil.fullyDelete(new File(System.getProperty("test.build.data", "build/test/data"), "dfs/")); + // MiniHadoopClusterManager.main(new String[] { "-nomr" }); + Configuration cfg = new Configuration(); + cfg.set(DataNode.DATA_DIR_PERMISSION_KEY, "666"); + cfg.set("dfs.replication", "0"); + MiniDFSCluster dfsCluster = new MiniDFSCluster(cfg, 1, true, null); + FileSystem fs = dfsCluster.getFileSystem(); + System.out.println(fs.getClass()); + System.out.println(fs.getUri()); + System.out.println(dfsCluster.getHftpFileSystem().getClass()); + + // dfsCluster.shutdown(); + } +} diff --git a/plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/UtilsTests.java b/plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/UtilsTests.java new file mode 100644 index 00000000000..2f492eee343 --- /dev/null +++ b/plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/UtilsTests.java @@ -0,0 +1,11 @@ +package org.elasticsearch.plugin.hadoop.hdfs; + +import org.elasticsearch.test.ESTestCase; + +public class UtilsTests extends ESTestCase { + + public void testDetectLibFolder() { + String location = HdfsPlugin.class.getProtectionDomain().getCodeSource().getLocation().toString(); + assertEquals(location, Utils.detectLibFolder()); + } +} diff --git a/plugins/repository-hdfs/src/test/resources/additional-cfg.xml b/plugins/repository-hdfs/src/test/resources/additional-cfg.xml new file mode 100644 index 00000000000..b1b6611e924 --- /dev/null +++ b/plugins/repository-hdfs/src/test/resources/additional-cfg.xml @@ -0,0 +1,12 @@ + + + + + foo + foo + + + paradise + lost + + diff --git a/plugins/repository-hdfs/src/test/resources/conf-2.xml b/plugins/repository-hdfs/src/test/resources/conf-2.xml new file mode 100644 index 00000000000..b1b6611e924 --- /dev/null +++ b/plugins/repository-hdfs/src/test/resources/conf-2.xml @@ -0,0 +1,12 @@ + + + + + foo + foo + + + paradise + lost + + diff --git a/plugins/repository-hdfs/src/test/resources/rest-api-spec/test/hdfs_repository/10_basic.yaml b/plugins/repository-hdfs/src/test/resources/rest-api-spec/test/hdfs_repository/10_basic.yaml new file mode 100644 index 00000000000..b7bc644a832 --- /dev/null +++ b/plugins/repository-hdfs/src/test/resources/rest-api-spec/test/hdfs_repository/10_basic.yaml @@ -0,0 +1,16 @@ +# Integration tests for HDFS Repository plugin +# +# Check plugin is installed +# +"HDFS Repository loaded": + - do: + cluster.state: {} + + # Get master node id + - set: { master_node: master } + + - do: + nodes.info: {} + + - match: { nodes.$master.plugins.0.name: repository-hdfs } + - match: { nodes.$master.plugins.0.jvm: true } diff --git a/plugins/repository-hdfs/src/test/resources/rest-api-spec/test/hdfs_repository/20_repository.disabled b/plugins/repository-hdfs/src/test/resources/rest-api-spec/test/hdfs_repository/20_repository.disabled new file mode 100644 index 00000000000..f1f5f7a65e0 --- /dev/null +++ b/plugins/repository-hdfs/src/test/resources/rest-api-spec/test/hdfs_repository/20_repository.disabled @@ -0,0 +1,25 @@ +# Integration tests for HDFS Repository plugin +# +# Check plugin is installed +# +"HDFS Repository Config": + - do: + snapshot.create_repository: + repository: test_repo_hdfs_1 + verify: false + body: + type: hdfs + settings: + # local HDFS implementation + conf.fs.es-hdfs.impl: "org.elasticsearch.repositories.hdfs.TestingFs" + uri: "es-hdfs://./build/" + path: "build/data/repo-hdfs" + + # Get repositry + - do: + snapshot.get_repository: + repository: test_repo_hdfs_1 + + - is_true: test_repo_hdfs_1 + - is_true: test_repo_hdfs_1.settings.uri + - match: {test_repo_hdfs_1.settings.path : "build/data/repo-hdfs"} diff --git a/settings.gradle b/settings.gradle index e928e53b690..e9fb0a043aa 100644 --- a/settings.gradle +++ b/settings.gradle @@ -29,6 +29,7 @@ List projects = [ 'plugins:mapper-murmur3', 'plugins:mapper-size', 'plugins:repository-azure', + 'plugins:repository-hdfs', 'plugins:repository-s3', 'plugins:jvm-example', 'plugins:site-example',