HDFS Snapshot/Restore plugin

Migrated from ES-Hadoop. Contains several improvements regarding:

* Security
Takes advantage of the pluggable security in ES 2.2 and uses that in order
to grant the necessary permissions to the Hadoop libs. It relies on a
dedicated DomainCombiner to grant permissions only when needed only to the
libraries installed in the plugin folder
Add security checks for SpecialPermission/scripting and provides out of
the box permissions for the latest Hadoop 1.x (1.2.1) and 2.x (2.7.1)

* Testing
Uses a customized Local FS to perform actual integration testing of the
Hadoop stack (and thus to make sure the proper permissions and ACC blocks
are in place) however without requiring extra permissions for testing.
If needed, a MiniDFS cluster is provided (though it requires extra
permissions to bind ports)
Provides a RestIT test

* Build system
Picks the build system used in ES (still Gradle)
This commit is contained in:
Costin Leau 2015-11-25 01:04:40 +02:00
parent c7a001dcb4
commit 7bca97bba6
26 changed files with 1778 additions and 0 deletions

View File

@ -97,6 +97,7 @@ subprojects {
// the "value" -quiet is added, separated by a space. This is ok since the javadoc
// command already adds -quiet, so we are just duplicating it
// see https://discuss.gradle.org/t/add-custom-javadoc-option-that-does-not-take-an-argument/5959
javadoc.options.encoding='UTF8'
javadoc.options.addStringOption('Xdoclint:all,-missing', '-quiet')
}
}

View File

@ -90,6 +90,7 @@ public class PluginManager {
"mapper-murmur3",
"mapper-size",
"repository-azure",
"repository-hdfs",
"repository-s3",
"store-smb"));

View File

@ -50,6 +50,7 @@ OFFICIAL PLUGINS
- mapper-murmur3
- mapper-size
- repository-azure
- repository-hdfs
- repository-s3
- store-smb

View File

@ -0,0 +1,115 @@
[[repository-hdfs]]
=== Hadoop HDFS Repository Plugin
The HDFS repository plugin adds support for using HDFS File System as a repository for
{ref}/modules-snapshots.html[Snapshot/Restore].
[[repository-hdfs-install]]
[float]
==== Installation
This plugin can be installed using the plugin manager:
[source,sh]
----------------------------------------------------------------
sudo bin/plugin install repository-hdfs
sudo bin/plugin install repository-hdfs-hadoop2
sudo bin/plugin install repository-hdfs-lite
----------------------------------------------------------------
The plugin must be installed on every node in the cluster, and each node must
be restarted after installation.
[[repository-hdfs-remove]]
[float]
==== Removal
The plugin can be removed with the following command:
[source,sh]
----------------------------------------------------------------
sudo bin/plugin remove repository-hdfs
sudo bin/plugin remove repository-hdfs-hadoop2
sudo bin/plugin remove repository-hdfs-lite
----------------------------------------------------------------
The node must be stopped before removing the plugin.
[[repository-hdfs-usage]]
==== Getting started with HDFS
The HDFS snapshot/restore plugin comes in three _flavors_:
* Default / Hadoop 1.x::
The default version contains the plugin jar alongside Apache Hadoop 1.x (stable) dependencies.
* YARN / Hadoop 2.x::
The `hadoop2` version contains the plugin jar plus the Apache Hadoop 2.x (also known as YARN) dependencies.
* Lite::
The `lite` version contains just the plugin jar, without any Hadoop dependencies. The user should provide these (read below).
[[repository-hdfs-flavor]]
===== What version to use?
It depends on whether Hadoop is locally installed or not and if not, whether it is compatible with Apache Hadoop clients.
* Are you using Apache Hadoop (or a _compatible_ distro) and do not have installed on the Elasticsearch nodes?::
+
If the answer is yes, for Apache Hadoop 1 use the default `repository-hdfs` or `repository-hdfs-hadoop2` for Apache Hadoop 2.
+
* If you are have Hadoop installed locally on the Elasticsearch nodes or are using a certain distro::
+
Use the `lite` version and place your Hadoop _client_ jars and their dependencies in the plugin folder under `hadoop-libs`.
For large deployments, it is recommended to package the libraries in the plugin zip and deploy it manually across nodes
(and thus avoiding having to do the libraries setup on each node).
[[repository-hdfs-security]]
==== Handling JVM Security and Permissions
Out of the box, Elasticsearch runs in a JVM with the security manager turned _on_ to make sure that unsafe or sensitive actions
are allowed only from trusted code. Hadoop however is not really designed to run under one; it does not rely on privileged blocks
to execute sensitive code, of which it uses plenty.
The `repository-hdfs` plugin provides the necessary permissions for both Apache Hadoop 1.x and 2.x (latest versions) to successfully
run in a secured JVM as one can tell from the number of permissions required when installing the plugin.
However using a certain Hadoop File-System (outside DFS), a certain distro or operating system (in particular Windows), might require
additional permissions which are not provided by the plugin.
In this case there are several workarounds:
* add the permission into `plugin-security.policy` (available in the plugin folder)
* disable the security manager through `es.security.manager.enabled=false` configurations setting - NOT RECOMMENDED
If you find yourself in such a situation, please let us know what Hadoop distro version and OS you are using and what permission is missing
by raising an issue. Thank you!
[[repository-hdfs-config]]
==== Configuration Properties
Once installed, define the configuration for the `hdfs` repository through `elasticsearch.yml` or the
{ref}/modules-snapshots.html[REST API]:
[source]
----
repositories
hdfs:
uri: "hdfs://<host>:<port>/" # optional - Hadoop file-system URI
path: "some/path" # required - path with the file-system where data is stored/loaded
load_defaults: "true" # optional - whether to load the default Hadoop configuration (default) or not
conf_location: "extra-cfg.xml" # optional - Hadoop configuration XML to be loaded (use commas for multi values)
conf.<key> : "<value>" # optional - 'inlined' key=value added to the Hadoop configuration
concurrent_streams: 5 # optional - the number of concurrent streams (defaults to 5)
compress: "false" # optional - whether to compress the metadata or not (default)
chunk_size: "10mb" # optional - chunk size (disabled by default)
----
NOTE: Be careful when including a paths within the `uri` setting; Some implementations ignore them completely while
others consider them. In general, we recommend keeping the `uri` to a minimum and using the `path` element instead.
===== Plugging other file-systems
Any HDFS-compatible file-systems (like Amazon `s3://` or Google `gs://`) can be used as long as the proper Hadoop
configuration is passed to the Elasticsearch plugin. In practice, this means making sure the correct Hadoop configuration
files (`core-site.xml` and `hdfs-site.xml`) and its jars are available in plugin classpath, just as you would with any
other Hadoop client or job.
Otherwise, the plugin will only read the _default_, vanilla configuration of Hadoop and will not be able to recognized
the plugged-in file-system.

View File

@ -0,0 +1,203 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
//apply plugin: 'nebula.provided-base'
esplugin {
description 'The HDFS repository plugin adds support for Hadoop Distributed File-System (HDFS) repositories.'
classname 'org.elasticsearch.plugin.hadoop.hdfs.HdfsPlugin'
}
configurations {
hadoop1
hadoop2
}
versions << [
'hadoop1': '1.2.1',
'hadoop2': '2.7.1'
]
dependencies {
provided "org.elasticsearch:elasticsearch:${versions.elasticsearch}"
provided "org.apache.hadoop:hadoop-core:${versions.hadoop1}"
// use Hadoop1 to compile and test things (a subset of Hadoop2)
testCompile "org.apache.hadoop:hadoop-core:${versions.hadoop1}"
testCompile "org.apache.hadoop:hadoop-test:${versions.hadoop1}"
// Hadoop dependencies
testCompile "commons-configuration:commons-configuration:1.6"
testCompile "commons-lang:commons-lang:${versions.commonslang}"
testCompile "commons-collections:commons-collections:3.2.2"
testCompile "commons-net:commons-net:1.4.1"
testCompile "org.mortbay.jetty:jetty:6.1.26"
testCompile "org.mortbay.jetty:jetty-util:6.1.26"
testCompile "org.mortbay.jetty:servlet-api:2.5-20081211"
testCompile "com.sun.jersey:jersey-core:1.8"
hadoop1("org.apache.hadoop:hadoop-core:${versions.hadoop1}") {
exclude module: "commons-cli"
exclude group: "com.sun.jersey"
exclude group: "org.mortbay.jetty"
exclude group: "tomcat"
exclude module: "commons-el"
exclude module: "hsqldb"
exclude group: "org.eclipse.jdt"
exclude module: "commons-beanutils"
exclude module: "commons-beanutils-core"
exclude module: "junit"
// provided by ES itself
exclude group: "log4j"
}
hadoop2("org.apache.hadoop:hadoop-client:${versions.hadoop2}") {
exclude module: "commons-cli"
exclude group: "com.sun.jersey"
exclude group: "com.sun.jersey.contribs"
exclude group: "com.sun.jersey.jersey-test-framework"
exclude module: "guice"
exclude group: "org.mortbay.jetty"
exclude group: "tomcat"
exclude module: "commons-el"
exclude module: "hsqldb"
exclude group: "org.eclipse.jdt"
exclude module: "commons-beanutils"
exclude module: "commons-beanutils-core"
exclude module: "javax.servlet"
exclude module: "junit"
// provided by ES itself
exclude group: "log4j"
}
hadoop2("org.apache.hadoop:hadoop-hdfs:${versions.hadoop2}") {
exclude module: "guava"
exclude module: "junit"
// provided by ES itself
exclude group: "log4j"
}
}
configurations.all {
resolutionStrategy {
force "commons-codec:commons-codec:${versions.commonscodec}"
force "commons-logging:commons-logging:${versions.commonslogging}"
force "commons-lang:commons-lang:2.6"
force "commons-httpclient:commons-httpclient:3.0.1"
force "org.codehaus.jackson:jackson-core-asl:1.8.8"
force "org.codehaus.jackson:jackson-mapper-asl:1.8.8"
force "com.google.code.findbugs:jsr305:3.0.0"
force "com.google.guava:guava:16.0.1"
force "org.slf4j:slf4j-api:1.7.10"
force "org.slf4j:slf4j-log4j12:1.7.10"
}
}
dependencyLicenses {
mapping from: /hadoop-core.*/, to: 'hadoop-1'
mapping from: /hadoop-.*/, to: 'hadoop-2'
}
compileJava.options.compilerArgs << '-Xlint:-deprecation,-rawtypes'
// main jar includes just the plugin classes
jar {
include "org/elasticsearch/plugin/hadoop/hdfs/*"
}
// hadoop jar (which actually depend on Hadoop)
task hadoopLinkedJar(type: Jar, dependsOn:jar) {
appendix "internal"
from sourceSets.main.output.classesDir
// exclude plugin
exclude "org/elasticsearch/plugin/hadoop/hdfs/*"
}
bundlePlugin.dependsOn hadoopLinkedJar
// configure 'bundle' as being w/o Hadoop deps
bundlePlugin {
into ("internal-libs") {
from hadoopLinkedJar.archivePath
}
into ("hadoop-libs") {
from configurations.hadoop2.allArtifacts.files
from configurations.hadoop2
}
}
task distZipHadoop1(type: Zip, dependsOn: [hadoopLinkedJar, jar]) { zipTask ->
from (zipTree(bundlePlugin.archivePath)) {
include "*"
include "internal-libs/**"
}
description = "Builds archive (with Hadoop1 dependencies) suitable for download page."
classifier = "hadoop1"
into ("hadoop-libs") {
from configurations.hadoop1.allArtifacts.files
from configurations.hadoop1
}
}
task distZipHadoop2(type: Zip, dependsOn: [hadoopLinkedJar, jar]) { zipTask ->
from (zipTree(bundlePlugin.archivePath)) {
include "*"
include "internal-libs/**"
}
description = "Builds archive (with Hadoop2/YARN dependencies) suitable for download page."
classifier = "hadoop2"
into ("hadoop-libs") {
from configurations.hadoop2.allArtifacts.files
from configurations.hadoop2
}
}
task distZipNoHadoop(type: Zip, dependsOn: [hadoopLinkedJar, jar]) { zipTask ->
from (zipTree(bundlePlugin.archivePath)) {
exclude "hadoop-libs/**"
}
from sourceSets.main.output.resourcesDir
description = "Builds archive (without any Hadoop dependencies) suitable for download page."
classifier = "lite"
}
artifacts {
archives bundlePlugin
'default' bundlePlugin
archives distZipHadoop1
archives distZipHadoop2
archives distZipNoHadoop
}
integTest {
cluster {
plugin(pluginProperties.extension.name, zipTree(distZipHadoop2.archivePath))
}
}

View File

@ -0,0 +1,173 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.hadoop.hdfs;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.Path;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoriesModule;
import org.elasticsearch.repositories.Repository;
//
// Note this plugin is somewhat special as Hadoop itself loads a number of libraries and thus requires a number of permissions to run even in client mode.
// This poses two problems:
// - Hadoop itself comes with tons of jars, many providing the same classes across packages. In particular Hadoop 2 provides package annotations in the same
// package across jars which trips JarHell. Thus, to allow Hadoop jars to load, the plugin uses a dedicated CL which picks them up from the hadoop-libs folder.
// - The issue though with using a different CL is that it picks up the jars from a different location / codeBase and thus it does not fall under the plugin
// permissions. In other words, the plugin permissions don't apply to the hadoop libraries.
// There are different approaches here:
// - implement a custom classloader that loads the jars but 'lies' about the codesource. It is doable but since URLClassLoader is locked down, one would
// would have to implement the whole jar opening and loading from it. Not impossible but still fairly low-level.
// Further more, even if the code has the proper credentials, it needs to use the proper Privileged blocks to use its full permissions which does not
// happen in the Hadoop code base.
// - use a different Policy. Works but the Policy is JVM wide and thus the code needs to be quite efficient - quite a bit impact to cover just some plugin
// libraries
// - use a DomainCombiner. This doesn't change the semantics (it's clear where the code is loaded from, etc..) however it gives us a scoped, fine-grained
// callback on handling the permission intersection for secured calls. Note that DC works only in the current PAC call - the moment another PA is used,
// the domain combiner is going to be ignored (unless the caller specifically uses it). Due to its scoped impact and official Java support, this approach
// was used.
// ClassLoading info
// - package plugin.hadoop.hdfs is part of the plugin
// - all the other packages are assumed to be in the nested Hadoop CL.
// Code
public class HdfsPlugin extends Plugin {
@Override
public String name() {
return "repository-hdfs";
}
@Override
public String description() {
return "HDFS Repository Plugin";
}
@SuppressWarnings("unchecked")
public void onModule(RepositoriesModule repositoriesModule) {
String baseLib = Utils.detectLibFolder();
List<URL> cp = getHadoopClassLoaderPath(baseLib);
ClassLoader hadoopCL = URLClassLoader.newInstance(cp.toArray(new URL[cp.size()]), getClass().getClassLoader());
Class<? extends Repository> repository = null;
try {
repository = (Class<? extends Repository>) hadoopCL.loadClass("org.elasticsearch.repositories.hdfs.HdfsRepository");
} catch (ClassNotFoundException cnfe) {
throw new IllegalStateException("Cannot load plugin class; is the plugin class setup correctly?", cnfe);
}
repositoriesModule.registerRepository("hdfs", repository, BlobStoreIndexShardRepository.class);
Loggers.getLogger(HdfsPlugin.class).info("Loaded Hadoop [{}] libraries from {}", getHadoopVersion(hadoopCL), baseLib);
}
protected List<URL> getHadoopClassLoaderPath(String baseLib) {
List<URL> cp = new ArrayList<>();
// add plugin internal jar
discoverJars(createURI(baseLib, "internal-libs"), cp, false);
// add Hadoop jars
discoverJars(createURI(baseLib, "hadoop-libs"), cp, true);
return cp;
}
private String getHadoopVersion(ClassLoader hadoopCL) {
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
// unprivileged code such as scripts do not have SpecialPermission
sm.checkPermission(new SpecialPermission());
}
return AccessController.doPrivileged(new PrivilegedAction<String>() {
@Override
public String run() {
// Hadoop 2 relies on TCCL to determine the version
ClassLoader tccl = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(hadoopCL);
return doGetHadoopVersion(hadoopCL);
} finally {
Thread.currentThread().setContextClassLoader(tccl);
}
}
}, Utils.hadoopACC());
}
private String doGetHadoopVersion(ClassLoader hadoopCL) {
String version = "Unknown";
Class<?> clz = null;
try {
clz = hadoopCL.loadClass("org.apache.hadoop.util.VersionInfo");
} catch (ClassNotFoundException cnfe) {
// unknown
}
if (clz != null) {
try {
Method method = clz.getMethod("getVersion");
version = method.invoke(null).toString();
} catch (Exception ex) {
// class has changed, ignore
}
}
return version;
}
private URI createURI(String base, String suffix) {
String location = base + suffix;
try {
return new URI(location);
} catch (URISyntaxException ex) {
throw new IllegalStateException(String.format(Locale.ROOT, "Cannot detect plugin folder; [%s] seems invalid", location), ex);
}
}
@SuppressForbidden(reason = "discover nested jar")
private void discoverJars(URI libPath, List<URL> cp, boolean optional) {
try {
Path[] jars = FileSystemUtils.files(PathUtils.get(libPath), "*.jar");
for (Path path : jars) {
cp.add(path.toUri().toURL());
}
} catch (IOException ex) {
if (!optional) {
throw new IllegalStateException("Cannot compute plugin classpath", ex);
}
}
}
}

View File

@ -0,0 +1,84 @@
package org.elasticsearch.plugin.hadoop.hdfs;
import java.net.URL;
import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.DomainCombiner;
import java.security.PrivilegedAction;
import java.security.ProtectionDomain;
import org.elasticsearch.SpecialPermission;
public abstract class Utils {
protected static AccessControlContext hadoopACC() {
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
// unprivileged code such as scripts do not have SpecialPermission
sm.checkPermission(new SpecialPermission());
}
return AccessController.doPrivileged(new PrivilegedAction<AccessControlContext>() {
@Override
public AccessControlContext run() {
return new AccessControlContext(AccessController.getContext(), new HadoopDomainCombiner());
}
});
}
private static class HadoopDomainCombiner implements DomainCombiner {
private static String BASE_LIB = detectLibFolder();
@Override
public ProtectionDomain[] combine(ProtectionDomain[] currentDomains, ProtectionDomain[] assignedDomains) {
for (ProtectionDomain pd : assignedDomains) {
if (pd.getCodeSource().getLocation().toString().startsWith(BASE_LIB)) {
return assignedDomains;
}
}
return currentDomains;
}
}
static String detectLibFolder() {
ClassLoader cl = Utils.class.getClassLoader();
// we could get the URL from the URLClassloader directly
// but that can create issues when running the tests from the IDE
// we could detect that by loading resources but that as well relies on
// the JAR URL
String classToLookFor = HdfsPlugin.class.getName().replace(".", "/").concat(".class");
URL classURL = cl.getResource(classToLookFor);
if (classURL == null) {
throw new IllegalStateException("Cannot detect itself; something is wrong with this ClassLoader " + cl);
}
String base = classURL.toString();
// extract root
// typically a JAR URL
int index = base.indexOf("!/");
if (index > 0) {
base = base.substring(0, index);
// remove its prefix (jar:)
base = base.substring(4);
// remove the trailing jar
index = base.lastIndexOf("/");
base = base.substring(0, index + 1);
}
// not a jar - something else, do a best effort here
else {
// remove the class searched
base = base.substring(0, base.length() - classToLookFor.length());
}
// append /
if (!base.endsWith("/")) {
base = base.concat("/");
}
return base;
}
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.hdfs;
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
interface FileSystemFactory {
FileSystem getFileSystem() throws IOException;
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.hdfs;
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
interface FsCallback<V> {
V doInHdfs(FileSystem fs) throws IOException;
}

View File

@ -0,0 +1,173 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.hdfs;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Locale;
import java.util.Map;
public class HdfsBlobContainer extends AbstractBlobContainer {
protected final HdfsBlobStore blobStore;
protected final Path path;
public HdfsBlobContainer(BlobPath blobPath, HdfsBlobStore blobStore, Path path) {
super(blobPath);
this.blobStore = blobStore;
this.path = path;
}
@Override
public boolean blobExists(String blobName) {
try {
return SecurityUtils.execute(blobStore.fileSystemFactory(), new FsCallback<Boolean>() {
@Override
public Boolean doInHdfs(FileSystem fs) throws IOException {
return fs.exists(new Path(path, blobName));
}
});
} catch (Exception e) {
return false;
}
}
@Override
public void deleteBlob(String blobName) throws IOException {
SecurityUtils.execute(blobStore.fileSystemFactory(), new FsCallback<Boolean>() {
@Override
public Boolean doInHdfs(FileSystem fs) throws IOException {
return fs.delete(new Path(path, blobName), true);
}
});
}
@Override
public void move(String sourceBlobName, String targetBlobName) throws IOException {
boolean rename = SecurityUtils.execute(blobStore.fileSystemFactory(), new FsCallback<Boolean>() {
@Override
public Boolean doInHdfs(FileSystem fs) throws IOException {
return fs.rename(new Path(path, sourceBlobName), new Path(path, targetBlobName));
}
});
if (!rename) {
throw new IOException(String.format(Locale.ROOT, "can not move blob from [%s] to [%s]", sourceBlobName, targetBlobName));
}
}
@Override
public InputStream readBlob(String blobName) throws IOException {
// FSDataInputStream does buffering internally
return SecurityUtils.execute(blobStore.fileSystemFactory(), new FsCallback<InputStream>() {
@Override
public InputStream doInHdfs(FileSystem fs) throws IOException {
return fs.open(new Path(path, blobName), blobStore.bufferSizeInBytes());
}
});
}
@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
SecurityUtils.execute(blobStore.fileSystemFactory(), new FsCallback<Void>() {
@Override
public Void doInHdfs(FileSystem fs) throws IOException {
try (OutputStream stream = createOutput(blobName)) {
Streams.copy(inputStream, stream);
}
return null;
}
});
}
@Override
public void writeBlob(String blobName, BytesReference bytes) throws IOException {
SecurityUtils.execute(blobStore.fileSystemFactory(), new FsCallback<Void>() {
@Override
public Void doInHdfs(FileSystem fs) throws IOException {
try (OutputStream stream = createOutput(blobName)) {
bytes.writeTo(stream);
}
return null;
}
});
}
private OutputStream createOutput(String blobName) throws IOException {
Path file = new Path(path, blobName);
// FSDataOutputStream does buffering internally
return blobStore.fileSystemFactory().getFileSystem().create(file, true, blobStore.bufferSizeInBytes());
}
@Override
public Map<String, BlobMetaData> listBlobsByPrefix(final @Nullable String blobNamePrefix) throws IOException {
FileStatus[] files = SecurityUtils.execute(blobStore.fileSystemFactory(), new FsCallback<FileStatus[]>() {
@Override
public FileStatus[] doInHdfs(FileSystem fs) throws IOException {
return fs.listStatus(path, new PathFilter() {
@Override
public boolean accept(Path path) {
return path.getName().startsWith(blobNamePrefix);
}
});
}
});
if (files == null || files.length == 0) {
return Collections.emptyMap();
}
Map<String, BlobMetaData> map = new LinkedHashMap<String, BlobMetaData>();
for (FileStatus file : files) {
map.put(file.getPath().getName(), new PlainBlobMetaData(file.getPath().getName(), file.getLen()));
}
return Collections.unmodifiableMap(map);
}
@Override
public Map<String, BlobMetaData> listBlobs() throws IOException {
FileStatus[] files = SecurityUtils.execute(blobStore.fileSystemFactory(), new FsCallback<FileStatus[]>() {
@Override
public FileStatus[] doInHdfs(FileSystem fs) throws IOException {
return fs.listStatus(path);
}
});
if (files == null || files.length == 0) {
return Collections.emptyMap();
}
Map<String, BlobMetaData> map = new LinkedHashMap<String, BlobMetaData>();
for (FileStatus file : files) {
map.put(file.getPath().getName(), new PlainBlobMetaData(file.getPath().getName(), file.getLen()));
}
return Collections.unmodifiableMap(map);
}
}

View File

@ -0,0 +1,125 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.hdfs;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.concurrent.Executor;
public class HdfsBlobStore extends AbstractComponent implements BlobStore {
private final FileSystemFactory ffs;
private final Path rootHdfsPath;
private final ThreadPool threadPool;
private final int bufferSizeInBytes;
public HdfsBlobStore(Settings settings, FileSystemFactory ffs, Path path, ThreadPool threadPool) throws IOException {
super(settings);
this.ffs = ffs;
this.rootHdfsPath = path;
this.threadPool = threadPool;
this.bufferSizeInBytes = (int) settings.getAsBytesSize("buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)).bytes();
mkdirs(path);
}
private void mkdirs(Path path) throws IOException {
SecurityUtils.execute(ffs, new FsCallback<Void>() {
@Override
public Void doInHdfs(FileSystem fs) throws IOException {
if (!fs.exists(path)) {
fs.mkdirs(path);
}
return null;
}
});
}
@Override
public String toString() {
return rootHdfsPath.toUri().toString();
}
public FileSystemFactory fileSystemFactory() {
return ffs;
}
public Path path() {
return rootHdfsPath;
}
public Executor executor() {
return threadPool.executor(ThreadPool.Names.SNAPSHOT);
}
public int bufferSizeInBytes() {
return bufferSizeInBytes;
}
@Override
public BlobContainer blobContainer(BlobPath path) {
return new HdfsBlobContainer(path, this, buildHdfsPath(path));
}
@Override
public void delete(BlobPath path) throws IOException {
SecurityUtils.execute(ffs, new FsCallback<Void>() {
@Override
public Void doInHdfs(FileSystem fs) throws IOException {
fs.delete(translateToHdfsPath(path), true);
return null;
}
});
}
private Path buildHdfsPath(BlobPath blobPath) {
final Path path = translateToHdfsPath(blobPath);
try {
mkdirs(path);
} catch (IOException ex) {
throw new ElasticsearchException("failed to create blob container", ex);
}
return path;
}
private Path translateToHdfsPath(BlobPath blobPath) {
Path path = path();
for (String p : blobPath) {
path = new Path(path, p);
}
return path;
}
@Override
public void close() {
//
}
}

View File

@ -0,0 +1,259 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.hdfs;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.nio.file.Files;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchGenerationException;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositorySettings;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.threadpool.ThreadPool;
public class HdfsRepository extends BlobStoreRepository implements FileSystemFactory {
public final static String TYPE = "hdfs";
private final HdfsBlobStore blobStore;
private final BlobPath basePath;
private final ByteSizeValue chunkSize;
private final boolean compress;
private final RepositorySettings repositorySettings;
private FileSystem fs;
@Inject
public HdfsRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository, ThreadPool threadPool) throws IOException {
super(name.getName(), repositorySettings, indexShardRepository);
this.repositorySettings = repositorySettings;
String path = repositorySettings.settings().get("path", settings.get("path"));
if (path == null) {
throw new IllegalArgumentException("no 'path' defined for hdfs snapshot/restore");
}
// get configuration
fs = getFileSystem();
Path hdfsPath = SecurityUtils.execute(fs, new FsCallback<Path>() {
@Override
public Path doInHdfs(FileSystem fs) throws IOException {
return fs.makeQualified(new Path(path));
}
});
this.basePath = BlobPath.cleanPath();
logger.debug("Using file-system [{}] for URI [{}], path [{}]", fs, fs.getUri(), hdfsPath);
blobStore = new HdfsBlobStore(settings, this, hdfsPath, threadPool);
this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", settings.getAsBytesSize("chunk_size", null));
this.compress = repositorySettings.settings().getAsBoolean("compress", settings.getAsBoolean("compress", false));
}
// as the FileSystem is long-lived and might go away, make sure to check it before it's being used.
@Override
public FileSystem getFileSystem() throws IOException {
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
// unprivileged code such as scripts do not have SpecialPermission
sm.checkPermission(new SpecialPermission());
}
try {
return AccessController.doPrivileged(new PrivilegedExceptionAction<FileSystem>() {
@Override
public FileSystem run() throws IOException {
return doGetFileSystem();
}
}, SecurityUtils.AccBridge.acc());
} catch (PrivilegedActionException pae) {
Throwable th = pae.getCause();
if (th instanceof Error) {
throw (Error) th;
}
if (th instanceof RuntimeException) {
throw (RuntimeException) th;
}
if (th instanceof IOException) {
throw (IOException) th;
}
throw new ElasticsearchException(pae);
}
}
private FileSystem doGetFileSystem() throws IOException {
// check if the fs is still alive
// make a cheap call that triggers little to no security checks
if (fs != null) {
try {
fs.isFile(fs.getWorkingDirectory());
} catch (IOException ex) {
if (ex.getMessage().contains("Filesystem closed")) {
fs = null;
}
else {
throw ex;
}
}
}
if (fs == null) {
Thread th = Thread.currentThread();
ClassLoader oldCL = th.getContextClassLoader();
try {
th.setContextClassLoader(getClass().getClassLoader());
return initFileSystem(repositorySettings);
} catch (IOException ex) {
throw ex;
} finally {
th.setContextClassLoader(oldCL);
}
}
return fs;
}
private FileSystem initFileSystem(RepositorySettings repositorySettings) throws IOException {
Configuration cfg = new Configuration(repositorySettings.settings().getAsBoolean("load_defaults", settings.getAsBoolean("load_defaults", true)));
cfg.setClassLoader(this.getClass().getClassLoader());
cfg.reloadConfiguration();
String confLocation = repositorySettings.settings().get("conf_location", settings.get("conf_location"));
if (Strings.hasText(confLocation)) {
for (String entry : Strings.commaDelimitedListToStringArray(confLocation)) {
addConfigLocation(cfg, entry.trim());
}
}
Map<String, String> map = repositorySettings.settings().getByPrefix("conf.").getAsMap();
for (Entry<String, String> entry : map.entrySet()) {
cfg.set(entry.getKey(), entry.getValue());
}
try {
UserGroupInformation.setConfiguration(cfg);
} catch (Throwable th) {
throw new ElasticsearchGenerationException(String.format(Locale.ROOT, "Cannot initialize Hadoop"), th);
}
String uri = repositorySettings.settings().get("uri", settings.get("uri"));
URI actualUri = (uri != null ? URI.create(uri) : FileSystem.getDefaultUri(cfg));
String user = repositorySettings.settings().get("user", settings.get("user"));
try {
// disable FS cache
String disableFsCache = String.format(Locale.ROOT, "fs.%s.impl.disable.cache", actualUri.getScheme());
cfg.setBoolean(disableFsCache, true);
return (user != null ? FileSystem.get(actualUri, cfg, user) : FileSystem.get(actualUri, cfg));
} catch (Exception ex) {
throw new ElasticsearchGenerationException(String.format(Locale.ROOT, "Cannot create Hdfs file-system for uri [%s]", actualUri), ex);
}
}
@SuppressForbidden(reason = "pick up Hadoop config (which can be on HDFS)")
private void addConfigLocation(Configuration cfg, String confLocation) {
URL cfgURL = null;
// it's an URL
if (!confLocation.contains(":")) {
cfgURL = cfg.getClassLoader().getResource(confLocation);
// fall back to file
if (cfgURL == null) {
java.nio.file.Path path = PathUtils.get(confLocation);
if (!Files.isReadable(path)) {
throw new IllegalArgumentException(
String.format(Locale.ROOT,
"Cannot find classpath resource or file 'conf_location' [%s] defined for hdfs snapshot/restore",
confLocation));
}
String pathLocation = path.toUri().toString();
logger.debug("Adding path [{}] as file [{}]", confLocation, pathLocation);
confLocation = pathLocation;
}
else {
logger.debug("Resolving path [{}] to classpath [{}]", confLocation, cfgURL);
}
}
else {
logger.debug("Adding path [{}] as URL", confLocation);
}
if (cfgURL == null) {
try {
cfgURL = new URL(confLocation);
} catch (MalformedURLException ex) {
throw new IllegalArgumentException(String.format(Locale.ROOT,
"Invalid 'conf_location' URL [%s] defined for hdfs snapshot/restore", confLocation), ex);
}
}
cfg.addResource(cfgURL);
}
@Override
protected BlobStore blobStore() {
return blobStore;
}
@Override
protected BlobPath basePath() {
return basePath;
}
@Override
protected boolean isCompress() {
return compress;
}
@Override
protected ByteSizeValue chunkSize() {
return chunkSize;
}
@Override
protected void doClose() throws ElasticsearchException {
super.doClose();
IOUtils.closeStream(fs);
fs = null;
}
}

View File

@ -0,0 +1,73 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.hdfs;
import java.io.IOException;
import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.fs.FileSystem;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.plugin.hadoop.hdfs.Utils;
class SecurityUtils {
abstract static class AccBridge extends Utils {
static AccessControlContext acc() {
return Utils.hadoopACC();
}
}
static <V> V execute(FileSystemFactory ffs, FsCallback<V> callback) throws IOException {
return execute(ffs.getFileSystem(), callback);
}
static <V> V execute(FileSystem fs, FsCallback<V> callback) throws IOException {
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
// unprivileged code such as scripts do not have SpecialPermission
sm.checkPermission(new SpecialPermission());
}
try {
return AccessController.doPrivileged(new PrivilegedExceptionAction<V>() {
@Override
public V run() throws IOException {
return callback.doInHdfs(fs);
}
}, AccBridge.acc());
} catch (PrivilegedActionException pae) {
Throwable th = pae.getCause();
if (th instanceof Error) {
throw (Error) th;
}
if (th instanceof RuntimeException) {
throw (RuntimeException) th;
}
if (th instanceof IOException) {
throw (IOException) th;
}
throw new ElasticsearchException(pae);
}
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.hdfs;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.elasticsearch.common.SuppressForbidden;
import java.io.File;
import java.io.IOException;
/**
* Extends LFS to improve some operations to keep the security permissions at
* bay. In particular mkdir is smarter and doesn't have to walk all the file
* hierarchy but rather only limits itself to the parent/working dir and creates
* a file only when necessary.
*/
public class TestingFs extends LocalFileSystem {
private static class ImprovedRawLocalFileSystem extends RawLocalFileSystem {
@Override
@SuppressForbidden(reason = "the Hadoop API depends on java.io.File")
public boolean mkdirs(Path f) throws IOException {
File wd = pathToFile(getWorkingDirectory());
File local = pathToFile(f);
if (wd.equals(local) || local.exists()) {
return true;
}
return mkdirs(f.getParent()) && local.mkdir();
}
}
public TestingFs() {
super(new ImprovedRawLocalFileSystem());
// use the build path instead of the starting dir as that one has read permissions
//setWorkingDirectory(new Path(getClass().getProtectionDomain().getCodeSource().getLocation().toString()));
setWorkingDirectory(new Path(System.getProperty("java.io.tmpdir")));
}
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
grant {
// used by the plugin to get the TCCL to properly initialize all of Hadoop components
permission java.lang.RuntimePermission "getClassLoader";
// used for DomainCombiner
permission java.security.SecurityPermission "createAccessControlContext";
// set TCCL used for bootstrapping Hadoop Configuration and JAAS
permission java.lang.RuntimePermission "setContextClassLoader";
//
// Hadoop 1
//
// UserGroupInformation (UGI)
permission java.lang.reflect.ReflectPermission "suppressAccessChecks";
// UGI triggers JAAS
permission javax.security.auth.AuthPermission "getSubject";
// JAAS libraries are not loaded with the proper context in Hadoop, hence why the permission is needed here
permission java.lang.RuntimePermission "loadLibrary.jaas_nt";
// which triggers the use of the Kerberos library
permission java.lang.RuntimePermission "accessClassInPackage.sun.security.krb5";
// plus LoginContext
permission javax.security.auth.AuthPermission "modifyPrincipals";
permission javax.security.auth.AuthPermission "modifyPublicCredentials";
permission javax.security.auth.AuthPermission "modifyPrivateCredentials";
//
// Hadoop 2
//
// UGI (Ugi Metrics)
permission java.lang.RuntimePermission "accessDeclaredMembers";
// Shell initialization - reading system props
permission java.util.PropertyPermission "*", "read,write";
permission javax.security.auth.PrivateCredentialPermission "org.apache.hadoop.security.Credentials \"*\"", "read";
// HftpFileSystem (all present FS are loaded and initialized at startup ...)
permission java.lang.RuntimePermission "setFactory";
};

View File

@ -0,0 +1 @@
Folder containing the required Hadoop client libraries and dependencies.

View File

@ -0,0 +1,30 @@
package org.elasticsearch.plugin.hadoop.hdfs;
import java.io.IOException;
import java.util.Collection;
import org.elasticsearch.plugin.hadoop.hdfs.HdfsPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.test.rest.RestTestCandidate;
import org.elasticsearch.test.rest.parser.RestTestParseException;
import com.carrotsearch.randomizedtesting.annotations.Name;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
public class HdfsRepositoryRestIT extends ESRestTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return pluginList(HdfsPlugin.class);
}
public HdfsRepositoryRestIT(@Name("yaml") RestTestCandidate testCandidate) {
super(testCandidate);
}
@ParametersFactory
public static Iterable<Object[]> parameters() throws IOException, RestTestParseException {
return ESRestTestCase.createParameters(0, 1);
}
}

View File

@ -0,0 +1,15 @@
package org.elasticsearch.plugin.hadoop.hdfs;
import java.net.URL;
import java.util.Collections;
import java.util.List;
import org.elasticsearch.plugin.hadoop.hdfs.HdfsPlugin;
public class HdfsTestPlugin extends HdfsPlugin {
@Override
protected List<URL> getHadoopClassLoaderPath(String baseLib) {
return Collections.emptyList();
}
}

View File

@ -0,0 +1,218 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.hadoop.hdfs;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import java.util.Collection;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryMissingException;
import org.elasticsearch.repositories.hdfs.TestingFs;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.ESIntegTestCase.ThirdParty;
import org.elasticsearch.test.store.MockFSDirectoryService;
import org.junit.After;
import org.junit.Before;
/**
* You must specify {@code -Dtests.thirdparty=true}
*/
@ThirdParty
@ClusterScope(scope = Scope.SUITE, numDataNodes = 1, transportClientRatio = 0.0)
public class HdfsTests extends ESIntegTestCase {
@Override
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(MockFSDirectoryService.RANDOM_PREVENT_DOUBLE_WRITE, false)
.put(MockFSDirectoryService.RANDOM_NO_DELETE_OPEN_FILE, false)
.build();
}
@Override
protected Settings nodeSettings(int ordinal) {
Settings.Builder settings = Settings.builder()
.put(super.nodeSettings(ordinal))
.put("path.home", createTempDir())
.put("path.repo", "")
.put(MockFSDirectoryService.RANDOM_PREVENT_DOUBLE_WRITE, false)
.put(MockFSDirectoryService.RANDOM_NO_DELETE_OPEN_FILE, false);
return settings.build();
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return pluginList(HdfsTestPlugin.class);
}
private String path;
@Before
public final void wipeBefore() throws Exception {
wipeRepositories();
path = "build/data/repo-" + randomInt();
}
@After
public final void wipeAfter() throws Exception {
wipeRepositories();
}
public void testSimpleWorkflow() {
Client client = client();
logger.info("--> creating hdfs repository with path [{}]", path);
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("hdfs")
.setSettings(Settings.settingsBuilder()
//.put("uri", "hdfs://127.0.0.1:51227")
.put("conf.fs.es-hdfs.impl", TestingFs.class.getName())
.put("uri", "es-hdfs://./build/")
.put("path", path)
.put("conf", "additional-cfg.xml, conf-2.xml")
.put("chunk_size", randomIntBetween(100, 1000) + "k")
.put("compress", randomBoolean())
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
createIndex("test-idx-1", "test-idx-2", "test-idx-3");
ensureGreen();
logger.info("--> indexing some data");
for (int i = 0; i < 100; i++) {
index("test-idx-1", "doc", Integer.toString(i), "foo", "bar" + i);
index("test-idx-2", "doc", Integer.toString(i), "foo", "baz" + i);
index("test-idx-3", "doc", Integer.toString(i), "foo", "baz" + i);
}
refresh();
assertThat(count(client, "test-idx-1"), equalTo(100L));
assertThat(count(client, "test-idx-2"), equalTo(100L));
assertThat(count(client, "test-idx-3"), equalTo(100L));
logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx-*", "-test-idx-3").get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));
logger.info("--> delete some data");
for (int i = 0; i < 50; i++) {
client.prepareDelete("test-idx-1", "doc", Integer.toString(i)).get();
}
for (int i = 50; i < 100; i++) {
client.prepareDelete("test-idx-2", "doc", Integer.toString(i)).get();
}
for (int i = 0; i < 100; i += 2) {
client.prepareDelete("test-idx-3", "doc", Integer.toString(i)).get();
}
refresh();
assertThat(count(client, "test-idx-1"), equalTo(50L));
assertThat(count(client, "test-idx-2"), equalTo(50L));
assertThat(count(client, "test-idx-3"), equalTo(50L));
logger.info("--> close indices");
client.admin().indices().prepareClose("test-idx-1", "test-idx-2").get();
logger.info("--> restore all indices from the snapshot");
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
ensureGreen();
assertThat(count(client, "test-idx-1"), equalTo(100L));
assertThat(count(client, "test-idx-2"), equalTo(100L));
assertThat(count(client, "test-idx-3"), equalTo(50L));
// Test restore after index deletion
logger.info("--> delete indices");
wipeIndices("test-idx-1", "test-idx-2");
logger.info("--> restore one index after deletion");
restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx-*", "-test-idx-2").execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
ensureGreen();
assertThat(count(client, "test-idx-1"), equalTo(100L));
ClusterState clusterState = client.admin().cluster().prepareState().get().getState();
assertThat(clusterState.getMetaData().hasIndex("test-idx-1"), equalTo(true));
assertThat(clusterState.getMetaData().hasIndex("test-idx-2"), equalTo(false));
}
private void wipeIndices(String... indices) {
cluster().wipeIndices(indices);
}
// RepositoryVerificationException.class
public void testWrongPath() {
Client client = client();
logger.info("--> creating hdfs repository with path [{}]", path);
try {
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("hdfs")
.setSettings(Settings.settingsBuilder()
// .put("uri", "hdfs://127.0.0.1:51227/")
.put("conf.fs.es-hdfs.impl", TestingFs.class.getName())
.put("uri", "es-hdfs:///")
.put("path", path + "a@b$c#11:22")
.put("chunk_size", randomIntBetween(100, 1000) + "k")
.put("compress", randomBoolean()))
.get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
createIndex("test-idx-1", "test-idx-2", "test-idx-3");
ensureGreen();
fail("Path name is invalid");
} catch (RepositoryException re) {
// expected
}
}
/**
* Deletes repositories, supports wildcard notation.
*/
public static void wipeRepositories(String... repositories) {
// if nothing is provided, delete all
if (repositories.length == 0) {
repositories = new String[]{"*"};
}
for (String repository : repositories) {
try {
client().admin().cluster().prepareDeleteRepository(repository).execute().actionGet();
} catch (RepositoryMissingException ex) {
// ignore
}
}
}
private long count(Client client, String index) {
return client.prepareSearch(index).setSize(0).get().getHits().totalHits();
}
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.elasticsearch.common.SuppressForbidden;
import java.io.File;
public class MiniHDFSCluster {
@SuppressForbidden(reason = "Hadoop is messy")
public static void main(String[] args) throws Exception {
FileUtil.fullyDelete(new File(System.getProperty("test.build.data", "build/test/data"), "dfs/"));
// MiniHadoopClusterManager.main(new String[] { "-nomr" });
Configuration cfg = new Configuration();
cfg.set(DataNode.DATA_DIR_PERMISSION_KEY, "666");
cfg.set("dfs.replication", "0");
MiniDFSCluster dfsCluster = new MiniDFSCluster(cfg, 1, true, null);
FileSystem fs = dfsCluster.getFileSystem();
System.out.println(fs.getClass());
System.out.println(fs.getUri());
System.out.println(dfsCluster.getHftpFileSystem().getClass());
// dfsCluster.shutdown();
}
}

View File

@ -0,0 +1,11 @@
package org.elasticsearch.plugin.hadoop.hdfs;
import org.elasticsearch.test.ESTestCase;
public class UtilsTests extends ESTestCase {
public void testDetectLibFolder() {
String location = HdfsPlugin.class.getProtectionDomain().getCodeSource().getLocation().toString();
assertEquals(location, Utils.detectLibFolder());
}
}

View File

@ -0,0 +1,12 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>foo</name>
<value>foo</value>
</property>
<property>
<name>paradise</name>
<value>lost</value>
</property>
</configuration>

View File

@ -0,0 +1,12 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>foo</name>
<value>foo</value>
</property>
<property>
<name>paradise</name>
<value>lost</value>
</property>
</configuration>

View File

@ -0,0 +1,16 @@
# Integration tests for HDFS Repository plugin
#
# Check plugin is installed
#
"HDFS Repository loaded":
- do:
cluster.state: {}
# Get master node id
- set: { master_node: master }
- do:
nodes.info: {}
- match: { nodes.$master.plugins.0.name: repository-hdfs }
- match: { nodes.$master.plugins.0.jvm: true }

View File

@ -0,0 +1,25 @@
# Integration tests for HDFS Repository plugin
#
# Check plugin is installed
#
"HDFS Repository Config":
- do:
snapshot.create_repository:
repository: test_repo_hdfs_1
verify: false
body:
type: hdfs
settings:
# local HDFS implementation
conf.fs.es-hdfs.impl: "org.elasticsearch.repositories.hdfs.TestingFs"
uri: "es-hdfs://./build/"
path: "build/data/repo-hdfs"
# Get repositry
- do:
snapshot.get_repository:
repository: test_repo_hdfs_1
- is_true: test_repo_hdfs_1
- is_true: test_repo_hdfs_1.settings.uri
- match: {test_repo_hdfs_1.settings.path : "build/data/repo-hdfs"}

View File

@ -29,6 +29,7 @@ List projects = [
'plugins:mapper-murmur3',
'plugins:mapper-size',
'plugins:repository-azure',
'plugins:repository-hdfs',
'plugins:repository-s3',
'plugins:jvm-example',
'plugins:site-example',