From 7584810ff4d32e1f7c4bd2cda6a0a7187a0258d3 Mon Sep 17 00:00:00 2001 From: Costin Leau Date: Sat, 19 Dec 2015 01:26:58 +0200 Subject: [PATCH] * Make plugin hadoop2-only Polish MiniDFS cluster to be Hadoop2 (instead of Hadoop1) based --- plugins/repository-hdfs/build.gradle | 163 +++++++----------- .../plugin/hadoop/hdfs/HdfsPlugin.java | 28 +-- .../plugin/hadoop/hdfs/Utils.java | 9 +- .../hdfs/{FsCallback.java => FcCallback.java} | 6 +- ...emFactory.java => FileContextFactory.java} | 6 +- .../repositories/hdfs/HdfsBlobContainer.java | 108 ++++++++---- .../repositories/hdfs/HdfsBlobStore.java | 26 +-- .../repositories/hdfs/HdfsRepository.java | 56 +++--- .../repositories/hdfs/SecurityUtils.java | 10 +- .../plugin-metadata/plugin-security.policy | 32 ++-- .../hadoop/hdfs/HdfsRepositoryRestIT.java | 11 +- .../plugin/hadoop/hdfs/HdfsTestPlugin.java | 3 +- .../plugin/hadoop/hdfs/HdfsTests.java | 21 ++- .../plugin/hadoop/hdfs/MiniHDFS.java | 114 ++++++++++++ .../plugin/hadoop/hdfs/MiniHDFSCluster.java | 48 ------ .../plugin/hadoop/hdfs/UtilsTests.java | 3 +- .../test/hdfs_repository/20_repository.yaml | 25 +++ 17 files changed, 377 insertions(+), 292 deletions(-) rename plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/{FsCallback.java => FcCallback.java} (88%) rename plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/{FileSystemFactory.java => FileContextFactory.java} (87%) create mode 100644 plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/MiniHDFS.java delete mode 100644 plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/MiniHDFSCluster.java create mode 100644 plugins/repository-hdfs/src/test/resources/rest-api-spec/test/hdfs_repository/20_repository.yaml diff --git a/plugins/repository-hdfs/build.gradle b/plugins/repository-hdfs/build.gradle index ca444768590..cb7d0e4628f 100644 --- a/plugins/repository-hdfs/build.gradle +++ b/plugins/repository-hdfs/build.gradle @@ -24,51 +24,23 @@ esplugin { classname 'org.elasticsearch.plugin.hadoop.hdfs.HdfsPlugin' } -configurations { - hadoop1 - hadoop2 -} - versions << [ - 'hadoop1': '1.2.1', 'hadoop2': '2.7.1' ] +configurations { + hadoop2 +} + dependencies { - provided "org.elasticsearch:elasticsearch:${versions.elasticsearch}" - provided "org.apache.hadoop:hadoop-core:${versions.hadoop1}" - - // use Hadoop1 to compile and test things (a subset of Hadoop2) - testCompile "org.apache.hadoop:hadoop-core:${versions.hadoop1}" - testCompile "org.apache.hadoop:hadoop-test:${versions.hadoop1}" - // Hadoop dependencies - testCompile "commons-configuration:commons-configuration:1.6" - testCompile "commons-lang:commons-lang:${versions.commonslang}" - testCompile "commons-collections:commons-collections:3.2.2" - testCompile "commons-net:commons-net:1.4.1" - testCompile "org.mortbay.jetty:jetty:6.1.26" - testCompile "org.mortbay.jetty:jetty-util:6.1.26" - testCompile "org.mortbay.jetty:servlet-api:2.5-20081211" - testCompile "com.sun.jersey:jersey-core:1.8" - - - hadoop1("org.apache.hadoop:hadoop-core:${versions.hadoop1}") { - exclude module: "commons-cli" - exclude group: "com.sun.jersey" - exclude group: "org.mortbay.jetty" - exclude group: "tomcat" - exclude module: "commons-el" - exclude module: "hsqldb" - exclude group: "org.eclipse.jdt" - exclude module: "commons-beanutils" - exclude module: "commons-beanutils-core" - exclude module: "junit" - // provided by ES itself - exclude group: "log4j" - } - - hadoop2("org.apache.hadoop:hadoop-client:${versions.hadoop2}") { - exclude module: "commons-cli" + hadoop2 ("org.apache.hadoop:hadoop-client:${versions.hadoop2}") { + exclude module: "hadoop-yarn-common" + exclude module: "hadoop-mapreduce-client-app" + exclude module: "hadoop-mapreduce-client-core" + exclude module: "hadoop-mapreduce-client-jobclient" + exclude module: "hadoop-yarn-api" + + exclude group: "commons-cli" exclude group: "com.sun.jersey" exclude group: "com.sun.jersey.contribs" exclude group: "com.sun.jersey.jersey-test-framework" @@ -82,37 +54,57 @@ dependencies { exclude module: "commons-beanutils-core" exclude module: "javax.servlet" exclude module: "junit" + exclude module: "netty" // provided by ES itself exclude group: "log4j" } - hadoop2("org.apache.hadoop:hadoop-hdfs:${versions.hadoop2}") { + hadoop2 ("org.apache.hadoop:hadoop-hdfs:${versions.hadoop2}") { + // prevent jar hell + exclude module: "hadoop-yarn-common" + exclude module: "commons-cli" + exclude module: "netty" exclude module: "guava" exclude module: "junit" // provided by ES itself exclude group: "log4j" } -} + + provided "org.elasticsearch:elasticsearch:${versions.elasticsearch}" + provided configurations.hadoop2 + + testCompile ("org.apache.hadoop:hadoop-hdfs:${versions.hadoop2}:tests") { + exclude module: "commons-cli" + exclude module: "netty" + } + + testCompile ("org.apache.hadoop:hadoop-common:${versions.hadoop2}:tests") { + exclude module: "commons-cli" + } +} configurations.all { + // used due to _transitive_ configuration resolutionStrategy { + force "commons-cli:commons-cli:1.3.1" + force "io.netty:netty:3.10.5.Final" force "commons-codec:commons-codec:${versions.commonscodec}" force "commons-logging:commons-logging:${versions.commonslogging}" force "commons-lang:commons-lang:2.6" force "commons-httpclient:commons-httpclient:3.0.1" - force "org.codehaus.jackson:jackson-core-asl:1.8.8" - force "org.codehaus.jackson:jackson-mapper-asl:1.8.8" + force "org.codehaus.jackson:jackson-core-asl:1.9.13" + force "org.codehaus.jackson:jackson-mapper-asl:1.9.13" force "com.google.code.findbugs:jsr305:3.0.0" force "com.google.guava:guava:16.0.1" - force "org.slf4j:slf4j-api:1.7.10" - force "org.slf4j:slf4j-log4j12:1.7.10" + force "org.slf4j:slf4j-api:${versions.slf4j}" + force "org.slf4j:slf4j-log4j12:${versions.slf4j}" + force "junit:junit:${versions.junit}" + force "org.apache.httpcomponents:httpclient:4.3.6" + force "log4j:log4j:${versions.log4j}" } } - - dependencyLicenses { - mapping from: /hadoop-core.*/, to: 'hadoop-1' - mapping from: /hadoop-.*/, to: 'hadoop-2' + mapping from: /hadoop-.*/, to: 'hadoop' } compileJava.options.compilerArgs << '-Xlint:-deprecation,-rawtypes' @@ -145,62 +137,31 @@ bundlePlugin { } } - -task distZipHadoop1(type: Zip, dependsOn: [hadoopLinkedJar, jar]) { zipTask -> - from (zipTree(bundlePlugin.archivePath)) { - include "*" - include "internal-libs/**" - } - - description = "Builds archive (with Hadoop1 dependencies) suitable for download page." - classifier = "hadoop1" - - into ("hadoop-libs") { - from configurations.hadoop1.allArtifacts.files - from configurations.hadoop1 - } +task miniHdfsStart(type: JavaExec) { + classpath = sourceSets.test.compileClasspath + sourceSets.test.output + main = "org.elasticsearch.plugin.hadoop.hdfs.MiniHDFS" + errorOutput = new FileOutputStream("build/minihdfs.err") + standardOutput = new FileOutputStream("build/minihdfs.out") + //ext.hdfsPid = (main as Class).getPid } -task distZipHadoop2(type: Zip, dependsOn: [hadoopLinkedJar, jar]) { zipTask -> - from (zipTree(bundlePlugin.archivePath)) { - include "*" - include "internal-libs/**" - } - - description = "Builds archive (with Hadoop2/YARN dependencies) suitable for download page." - classifier = "hadoop2" +//task miniHdfsStop(type: org.elasticsearch.gradle.LoggedExec) { +// onlyIf { hdfsPid > -1 } +// if (Os.isFamily(Os.FAMILY_WINDOWS)) { +// executable 'Taskkill' +// args '/PID', hdfsCluster.pid, '/F' +// } else { +// executable 'kill' +// args '-9', hdfsCluster.pid +// } +//} - into ("hadoop-libs") { - from configurations.hadoop2.allArtifacts.files - from configurations.hadoop2 - } -} - -task distZipNoHadoop(type: Zip, dependsOn: [hadoopLinkedJar, jar]) { zipTask -> - from (zipTree(bundlePlugin.archivePath)) { - exclude "hadoop-libs/**" - } - - from sourceSets.main.output.resourcesDir - - description = "Builds archive (without any Hadoop dependencies) suitable for download page." - classifier = "lite" -} +//integTest.dependsOn(miniHdfsStart) +//integTest.finalizedBy(miniHdfsStop) +thirdPartyAudit.enabled = false artifacts { archives bundlePlugin 'default' bundlePlugin - archives distZipHadoop1 - archives distZipHadoop2 - archives distZipNoHadoop -} - -integTest { - cluster { - plugin(pluginProperties.extension.name, zipTree(distZipHadoop2.archivePath)) - } -} - -// classes are missing, e.g. org.mockito.Mockito -thirdPartyAudit.missingClasses = true +} \ No newline at end of file diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/plugin/hadoop/hdfs/HdfsPlugin.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/plugin/hadoop/hdfs/HdfsPlugin.java index 242dc2f3269..9b65f7bec2f 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/plugin/hadoop/hdfs/HdfsPlugin.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/plugin/hadoop/hdfs/HdfsPlugin.java @@ -18,16 +18,6 @@ */ package org.elasticsearch.plugin.hadoop.hdfs; -import org.elasticsearch.SpecialPermission; -import org.elasticsearch.common.SuppressForbidden; -import org.elasticsearch.common.io.FileSystemUtils; -import org.elasticsearch.common.io.PathUtils; -import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository; -import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.repositories.RepositoriesModule; -import org.elasticsearch.repositories.Repository; - import java.io.IOException; import java.lang.reflect.Method; import java.net.URI; @@ -41,13 +31,23 @@ import java.util.ArrayList; import java.util.List; import java.util.Locale; +import org.elasticsearch.SpecialPermission; +import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.io.FileSystemUtils; +import org.elasticsearch.common.io.PathUtils; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.repositories.RepositoriesModule; +import org.elasticsearch.repositories.Repository; + // // Note this plugin is somewhat special as Hadoop itself loads a number of libraries and thus requires a number of permissions to run even in client mode. // This poses two problems: // - Hadoop itself comes with tons of jars, many providing the same classes across packages. In particular Hadoop 2 provides package annotations in the same // package across jars which trips JarHell. Thus, to allow Hadoop jars to load, the plugin uses a dedicated CL which picks them up from the hadoop-libs folder. // - The issue though with using a different CL is that it picks up the jars from a different location / codeBase and thus it does not fall under the plugin -// permissions. In other words, the plugin permissions don't apply to the hadoop libraries. +// permissions. In other words, the plugin permissions don't apply to the hadoop libraries. // There are different approaches here: // - implement a custom classloader that loads the jars but 'lies' about the codesource. It is doable but since URLClassLoader is locked down, one would // would have to implement the whole jar opening and loading from it. Not impossible but still fairly low-level. @@ -64,7 +64,7 @@ import java.util.Locale; // - package plugin.hadoop.hdfs is part of the plugin // - all the other packages are assumed to be in the nested Hadoop CL. -// Code +// Code public class HdfsPlugin extends Plugin { @Override @@ -81,7 +81,7 @@ public class HdfsPlugin extends Plugin { public void onModule(RepositoriesModule repositoriesModule) { String baseLib = Utils.detectLibFolder(); List cp = getHadoopClassLoaderPath(baseLib); - + ClassLoader hadoopCL = URLClassLoader.newInstance(cp.toArray(new URL[cp.size()]), getClass().getClassLoader()); Class repository = null; @@ -170,4 +170,4 @@ public class HdfsPlugin extends Plugin { } } } -} +} \ No newline at end of file diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/plugin/hadoop/hdfs/Utils.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/plugin/hadoop/hdfs/Utils.java index 89fa3f5910f..ad915adeb2a 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/plugin/hadoop/hdfs/Utils.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/plugin/hadoop/hdfs/Utils.java @@ -1,5 +1,3 @@ -package org.elasticsearch.plugin.hadoop.hdfs; - /* * Licensed to Elasticsearch under one or more contributor * license agreements. See the NOTICE file distributed with @@ -18,8 +16,7 @@ package org.elasticsearch.plugin.hadoop.hdfs; * specific language governing permissions and limitations * under the License. */ - -import org.elasticsearch.SpecialPermission; +package org.elasticsearch.plugin.hadoop.hdfs; import java.net.URL; import java.security.AccessControlContext; @@ -28,6 +25,8 @@ import java.security.DomainCombiner; import java.security.PrivilegedAction; import java.security.ProtectionDomain; +import org.elasticsearch.SpecialPermission; + public abstract class Utils { protected static AccessControlContext hadoopACC() { @@ -100,4 +99,4 @@ public abstract class Utils { return base; } -} +} \ No newline at end of file diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/FsCallback.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/FcCallback.java similarity index 88% rename from plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/FsCallback.java rename to plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/FcCallback.java index 7b9ec8331d9..c430d4f6aed 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/FsCallback.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/FcCallback.java @@ -19,11 +19,11 @@ package org.elasticsearch.repositories.hdfs; -import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileContext; import java.io.IOException; -interface FsCallback { +interface FcCallback { - V doInHdfs(FileSystem fs) throws IOException; + V doInHdfs(FileContext fc) throws IOException; } diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/FileSystemFactory.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/FileContextFactory.java similarity index 87% rename from plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/FileSystemFactory.java rename to plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/FileContextFactory.java index b0b5fb10c33..0080b7fe239 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/FileSystemFactory.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/FileContextFactory.java @@ -18,11 +18,11 @@ */ package org.elasticsearch.repositories.hdfs; -import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileContext; import java.io.IOException; -interface FileSystemFactory { +interface FileContextFactory { - FileSystem getFileSystem() throws IOException; + FileContext getFileContext() throws IOException; } diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java index f71ca7020a8..47024796b03 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java @@ -18,24 +18,27 @@ */ package org.elasticsearch.repositories.hdfs; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Options.CreateOpts; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.Syncable; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.io.Streams; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.Collections; +import java.util.EnumSet; import java.util.LinkedHashMap; -import java.util.Locale; import java.util.Map; public class HdfsBlobContainer extends AbstractBlobContainer { @@ -52,10 +55,10 @@ public class HdfsBlobContainer extends AbstractBlobContainer { @Override public boolean blobExists(String blobName) { try { - return SecurityUtils.execute(blobStore.fileSystemFactory(), new FsCallback() { + return SecurityUtils.execute(blobStore.fileContextFactory(), new FcCallback() { @Override - public Boolean doInHdfs(FileSystem fs) throws IOException { - return fs.exists(new Path(path, blobName)); + public Boolean doInHdfs(FileContext fc) throws IOException { + return fc.util().exists(new Path(path, blobName)); } }); } catch (Exception e) { @@ -65,46 +68,77 @@ public class HdfsBlobContainer extends AbstractBlobContainer { @Override public void deleteBlob(String blobName) throws IOException { - SecurityUtils.execute(blobStore.fileSystemFactory(), new FsCallback() { + SecurityUtils.execute(blobStore.fileContextFactory(), new FcCallback() { @Override - public Boolean doInHdfs(FileSystem fs) throws IOException { - return fs.delete(new Path(path, blobName), true); + public Boolean doInHdfs(FileContext fc) throws IOException { + return fc.delete(new Path(path, blobName), true); } }); } @Override public void move(String sourceBlobName, String targetBlobName) throws IOException { - boolean rename = SecurityUtils.execute(blobStore.fileSystemFactory(), new FsCallback() { + SecurityUtils.execute(blobStore.fileContextFactory(), new FcCallback() { @Override - public Boolean doInHdfs(FileSystem fs) throws IOException { - return fs.rename(new Path(path, sourceBlobName), new Path(path, targetBlobName)); + public Void doInHdfs(FileContext fc) throws IOException { + // _try_ to hsync the file before appending + // since append is optional this is a best effort + Path source = new Path(path, sourceBlobName); + + // try-with-resource is nice but since this is optional, it's hard to figure out + // what worked and what didn't. + // it's okay to not be able to append the file but not okay if hsync fails + // classic try / catch to the rescue + + FSDataOutputStream stream = null; + try { + stream = fc.create(source, EnumSet.of(CreateFlag.APPEND, CreateFlag.SYNC_BLOCK), CreateOpts.donotCreateParent()); + } catch (IOException ex) { + // append is optional, ignore + } + if (stream != null) { + try (OutputStream s = stream) { + if (s instanceof Syncable) { + ((Syncable) s).hsync(); + } + } + } + + // finally rename + fc.rename(source, new Path(path, targetBlobName)); + return null; } }); - - if (!rename) { - throw new IOException(String.format(Locale.ROOT, "can not move blob from [%s] to [%s]", sourceBlobName, targetBlobName)); - } } @Override public InputStream readBlob(String blobName) throws IOException { // FSDataInputStream does buffering internally - return SecurityUtils.execute(blobStore.fileSystemFactory(), new FsCallback() { + return SecurityUtils.execute(blobStore.fileContextFactory(), new FcCallback() { @Override - public InputStream doInHdfs(FileSystem fs) throws IOException { - return fs.open(new Path(path, blobName), blobStore.bufferSizeInBytes()); + public InputStream doInHdfs(FileContext fc) throws IOException { + return fc.open(new Path(path, blobName), blobStore.bufferSizeInBytes()); } }); } @Override public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException { - SecurityUtils.execute(blobStore.fileSystemFactory(), new FsCallback() { + SecurityUtils.execute(blobStore.fileContextFactory(), new FcCallback() { @Override - public Void doInHdfs(FileSystem fs) throws IOException { - try (OutputStream stream = createOutput(blobName)) { - Streams.copy(inputStream, stream); + public Void doInHdfs(FileContext fc) throws IOException { + // don't use Streams to manually call hsync + // note that the inputstream is NOT closed here for two reasons: + // 1. it is closed already by ES after executing this method + // 0. closing the stream twice causes Hadoop to issue WARNING messages which are basically noise + // see https://issues.apache.org/jira/browse/HDFS-8099 + try (FSDataOutputStream stream = createOutput(fc, blobName)) { + int bytesRead; + byte[] buffer = new byte[blobStore.bufferSizeInBytes()]; + while ((bytesRead = inputStream.read(buffer)) != -1) { + stream.write(buffer, 0, bytesRead); + } + stream.hsync(); } return null; } @@ -113,34 +147,34 @@ public class HdfsBlobContainer extends AbstractBlobContainer { @Override public void writeBlob(String blobName, BytesReference bytes) throws IOException { - SecurityUtils.execute(blobStore.fileSystemFactory(), new FsCallback() { + SecurityUtils.execute(blobStore.fileContextFactory(), new FcCallback() { @Override - public Void doInHdfs(FileSystem fs) throws IOException { - try (OutputStream stream = createOutput(blobName)) { + public Void doInHdfs(FileContext fc) throws IOException { + try (FSDataOutputStream stream = createOutput(fc, blobName)) { bytes.writeTo(stream); + stream.hsync(); } return null; } }); } - private OutputStream createOutput(String blobName) throws IOException { - Path file = new Path(path, blobName); - // FSDataOutputStream does buffering internally - return blobStore.fileSystemFactory().getFileSystem().create(file, true, blobStore.bufferSizeInBytes()); + private FSDataOutputStream createOutput(FileContext fc, String blobName) throws IOException { + return fc.create(new Path(path, blobName), EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK), + CreateOpts.bufferSize(blobStore.bufferSizeInBytes()), CreateOpts.createParent()); } @Override public Map listBlobsByPrefix(final @Nullable String blobNamePrefix) throws IOException { - FileStatus[] files = SecurityUtils.execute(blobStore.fileSystemFactory(), new FsCallback() { + FileStatus[] files = SecurityUtils.execute(blobStore.fileContextFactory(), new FcCallback() { @Override - public FileStatus[] doInHdfs(FileSystem fs) throws IOException { - return fs.listStatus(path, new PathFilter() { + public FileStatus[] doInHdfs(FileContext fc) throws IOException { + return (!fc.util().exists(path) ? null : fc.util().listStatus(path, new PathFilter() { @Override public boolean accept(Path path) { return path.getName().startsWith(blobNamePrefix); } - }); + })); } }); if (files == null || files.length == 0) { @@ -155,10 +189,10 @@ public class HdfsBlobContainer extends AbstractBlobContainer { @Override public Map listBlobs() throws IOException { - FileStatus[] files = SecurityUtils.execute(blobStore.fileSystemFactory(), new FsCallback() { + FileStatus[] files = SecurityUtils.execute(blobStore.fileContextFactory(), new FcCallback() { @Override - public FileStatus[] doInHdfs(FileSystem fs) throws IOException { - return fs.listStatus(path); + public FileStatus[] doInHdfs(FileContext fc) throws IOException { + return (!fc.util().exists(path) ? null : fc.util().listStatus(path)); } }); if (files == null || files.length == 0) { diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobStore.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobStore.java index b75485fa7fe..9c6dac7b68a 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobStore.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobStore.java @@ -18,7 +18,7 @@ */ package org.elasticsearch.repositories.hdfs; -import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.blobstore.BlobContainer; @@ -35,14 +35,14 @@ import java.util.concurrent.Executor; public class HdfsBlobStore extends AbstractComponent implements BlobStore { - private final FileSystemFactory ffs; + private final FileContextFactory fcf; private final Path rootHdfsPath; private final ThreadPool threadPool; private final int bufferSizeInBytes; - public HdfsBlobStore(Settings settings, FileSystemFactory ffs, Path path, ThreadPool threadPool) throws IOException { + public HdfsBlobStore(Settings settings, FileContextFactory ffs, Path path, ThreadPool threadPool) throws IOException { super(settings); - this.ffs = ffs; + this.fcf = ffs; this.rootHdfsPath = path; this.threadPool = threadPool; @@ -52,11 +52,11 @@ public class HdfsBlobStore extends AbstractComponent implements BlobStore { } private void mkdirs(Path path) throws IOException { - SecurityUtils.execute(ffs, new FsCallback() { + SecurityUtils.execute(fcf, new FcCallback() { @Override - public Void doInHdfs(FileSystem fs) throws IOException { - if (!fs.exists(path)) { - fs.mkdirs(path); + public Void doInHdfs(FileContext fc) throws IOException { + if (fc.util().exists(path)) { + fc.mkdir(path, null, true); } return null; } @@ -68,8 +68,8 @@ public class HdfsBlobStore extends AbstractComponent implements BlobStore { return rootHdfsPath.toUri().toString(); } - public FileSystemFactory fileSystemFactory() { - return ffs; + public FileContextFactory fileContextFactory() { + return fcf; } public Path path() { @@ -91,10 +91,10 @@ public class HdfsBlobStore extends AbstractComponent implements BlobStore { @Override public void delete(BlobPath path) throws IOException { - SecurityUtils.execute(ffs, new FsCallback() { + SecurityUtils.execute(fcf, new FcCallback() { @Override - public Void doInHdfs(FileSystem fs) throws IOException { - fs.delete(translateToHdfsPath(path), true); + public Void doInHdfs(FileContext fc) throws IOException { + fc.delete(translateToHdfsPath(path), true); return null; } }); diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java index b5b5b4d0f4a..d5208665c6f 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java @@ -19,9 +19,9 @@ package org.elasticsearch.repositories.hdfs; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.AbstractFileSystem; +import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.UserGroupInformation; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchGenerationException; @@ -51,7 +51,7 @@ import java.util.Locale; import java.util.Map; import java.util.Map.Entry; -public class HdfsRepository extends BlobStoreRepository implements FileSystemFactory { +public class HdfsRepository extends BlobStoreRepository implements FileContextFactory { public final static String TYPE = "hdfs"; @@ -60,7 +60,7 @@ public class HdfsRepository extends BlobStoreRepository implements FileSystemFac private final ByteSizeValue chunkSize; private final boolean compress; private final RepositorySettings repositorySettings; - private FileSystem fs; + private FileContext fc; @Inject public HdfsRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository, ThreadPool threadPool) throws IOException { @@ -74,16 +74,16 @@ public class HdfsRepository extends BlobStoreRepository implements FileSystemFac } // get configuration - fs = getFileSystem(); - Path hdfsPath = SecurityUtils.execute(fs, new FsCallback() { + fc = getFileContext(); + Path hdfsPath = SecurityUtils.execute(fc, new FcCallback() { @Override - public Path doInHdfs(FileSystem fs) throws IOException { - return fs.makeQualified(new Path(path)); + public Path doInHdfs(FileContext fc) throws IOException { + return fc.makeQualified(new Path(path)); } }); this.basePath = BlobPath.cleanPath(); - logger.debug("Using file-system [{}] for URI [{}], path [{}]", fs, fs.getUri(), hdfsPath); + logger.debug("Using file-system [{}] for URI [{}], path [{}]", fc.getDefaultFileSystem(), fc.getDefaultFileSystem().getUri(), hdfsPath); blobStore = new HdfsBlobStore(settings, this, hdfsPath, threadPool); this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", settings.getAsBytesSize("chunk_size", null)); this.compress = repositorySettings.settings().getAsBoolean("compress", settings.getAsBoolean("compress", false)); @@ -91,7 +91,7 @@ public class HdfsRepository extends BlobStoreRepository implements FileSystemFac // as the FileSystem is long-lived and might go away, make sure to check it before it's being used. @Override - public FileSystem getFileSystem() throws IOException { + public FileContext getFileContext() throws IOException { SecurityManager sm = System.getSecurityManager(); if (sm != null) { // unprivileged code such as scripts do not have SpecialPermission @@ -99,10 +99,10 @@ public class HdfsRepository extends BlobStoreRepository implements FileSystemFac } try { - return AccessController.doPrivileged(new PrivilegedExceptionAction() { + return AccessController.doPrivileged(new PrivilegedExceptionAction() { @Override - public FileSystem run() throws IOException { - return doGetFileSystem(); + public FileContext run() throws IOException { + return doGetFileContext(); } }, SecurityUtils.AccBridge.acc()); } catch (PrivilegedActionException pae) { @@ -120,37 +120,37 @@ public class HdfsRepository extends BlobStoreRepository implements FileSystemFac } } - private FileSystem doGetFileSystem() throws IOException { + private FileContext doGetFileContext() throws IOException { // check if the fs is still alive // make a cheap call that triggers little to no security checks - if (fs != null) { + if (fc != null) { try { - fs.isFile(fs.getWorkingDirectory()); + fc.util().exists(fc.getWorkingDirectory()); } catch (IOException ex) { if (ex.getMessage().contains("Filesystem closed")) { - fs = null; + fc = null; } else { throw ex; } } } - if (fs == null) { + if (fc == null) { Thread th = Thread.currentThread(); ClassLoader oldCL = th.getContextClassLoader(); try { th.setContextClassLoader(getClass().getClassLoader()); - return initFileSystem(repositorySettings); + return initFileContext(repositorySettings); } catch (IOException ex) { throw ex; } finally { th.setContextClassLoader(oldCL); } } - return fs; + return fc; } - private FileSystem initFileSystem(RepositorySettings repositorySettings) throws IOException { + private FileContext initFileContext(RepositorySettings repositorySettings) throws IOException { Configuration cfg = new Configuration(repositorySettings.settings().getAsBoolean("load_defaults", settings.getAsBoolean("load_defaults", true))); cfg.setClassLoader(this.getClass().getClassLoader()); @@ -175,15 +175,16 @@ public class HdfsRepository extends BlobStoreRepository implements FileSystemFac } String uri = repositorySettings.settings().get("uri", settings.get("uri")); - URI actualUri = (uri != null ? URI.create(uri) : FileSystem.getDefaultUri(cfg)); - String user = repositorySettings.settings().get("user", settings.get("user")); + URI actualUri = (uri != null ? URI.create(uri) : null); try { // disable FS cache String disableFsCache = String.format(Locale.ROOT, "fs.%s.impl.disable.cache", actualUri.getScheme()); cfg.setBoolean(disableFsCache, true); - return (user != null ? FileSystem.get(actualUri, cfg, user) : FileSystem.get(actualUri, cfg)); + // create the AFS manually since through FileContext is relies on Subject.doAs for no reason at all + AbstractFileSystem fs = AbstractFileSystem.get(actualUri, cfg); + return FileContext.getFileContext(fs, cfg); } catch (Exception ex) { throw new ElasticsearchGenerationException(String.format(Locale.ROOT, "Cannot create Hdfs file-system for uri [%s]", actualUri), ex); } @@ -253,7 +254,8 @@ public class HdfsRepository extends BlobStoreRepository implements FileSystemFac protected void doClose() throws ElasticsearchException { super.doClose(); - IOUtils.closeStream(fs); - fs = null; + // TODO: FileContext does not support any close - is there really no way + // to handle it? + fc = null; } -} +} \ No newline at end of file diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/SecurityUtils.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/SecurityUtils.java index 550224082d9..545e1fc62b5 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/SecurityUtils.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/SecurityUtils.java @@ -19,7 +19,7 @@ package org.elasticsearch.repositories.hdfs; -import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileContext; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.SpecialPermission; import org.elasticsearch.plugin.hadoop.hdfs.Utils; @@ -38,11 +38,11 @@ class SecurityUtils { } } - static V execute(FileSystemFactory ffs, FsCallback callback) throws IOException { - return execute(ffs.getFileSystem(), callback); + static V execute(FileContextFactory fcf, FcCallback callback) throws IOException { + return execute(fcf.getFileContext(), callback); } - static V execute(FileSystem fs, FsCallback callback) throws IOException { + static V execute(FileContext fc, FcCallback callback) throws IOException { SecurityManager sm = System.getSecurityManager(); if (sm != null) { // unprivileged code such as scripts do not have SpecialPermission @@ -53,7 +53,7 @@ class SecurityUtils { return AccessController.doPrivileged(new PrivilegedExceptionAction() { @Override public V run() throws IOException { - return callback.doInHdfs(fs); + return callback.doInHdfs(fc); } }, AccBridge.acc()); } catch (PrivilegedActionException pae) { diff --git a/plugins/repository-hdfs/src/main/plugin-metadata/plugin-security.policy b/plugins/repository-hdfs/src/main/plugin-metadata/plugin-security.policy index d26acd121e4..f80e103201b 100644 --- a/plugins/repository-hdfs/src/main/plugin-metadata/plugin-security.policy +++ b/plugins/repository-hdfs/src/main/plugin-metadata/plugin-security.policy @@ -28,40 +28,34 @@ grant { permission java.lang.RuntimePermission "setContextClassLoader"; // - // Hadoop 1 + // Hadoop 2 // - - // UserGroupInformation (UGI) + + // UserGroupInformation (UGI) Metrics + permission java.lang.RuntimePermission "accessDeclaredMembers"; + permission java.lang.reflect.ReflectPermission "suppressAccessChecks"; + // Shell initialization - reading system props + permission java.util.PropertyPermission "*", "read,write"; + // UGI triggers JAAS permission javax.security.auth.AuthPermission "getSubject"; // JAAS libraries are not loaded with the proper context in Hadoop, hence why the permission is needed here permission java.lang.RuntimePermission "loadLibrary.jaas_nt"; - // which triggers the use of the Kerberos library - permission java.lang.RuntimePermission "accessClassInPackage.sun.security.krb5"; - // plus LoginContext permission javax.security.auth.AuthPermission "modifyPrincipals"; permission javax.security.auth.AuthPermission "modifyPublicCredentials"; permission javax.security.auth.AuthPermission "modifyPrivateCredentials"; - - // - // Hadoop 2 - // - // UGI (Ugi Metrics) - permission java.lang.RuntimePermission "accessDeclaredMembers"; - - // Shell initialization - reading system props - permission java.util.PropertyPermission "*", "read,write"; + //permission javax.security.auth.PrivateCredentialPermission "org.apache.hadoop.security.Credentials \"*\"", "read"; - permission javax.security.auth.PrivateCredentialPermission "org.apache.hadoop.security.Credentials \"*\"", "read"; + permission javax.security.auth.AuthPermission "doAs"; - // HftpFileSystem (all present FS are loaded and initialized at startup ...) - permission java.lang.RuntimePermission "setFactory"; -}; \ No newline at end of file + // DFSClient init (metrics again) + permission java.lang.RuntimePermission "shutdownHooks"; + }; \ No newline at end of file diff --git a/plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/HdfsRepositoryRestIT.java b/plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/HdfsRepositoryRestIT.java index 065c06208ef..30d1aafcaba 100644 --- a/plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/HdfsRepositoryRestIT.java +++ b/plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/HdfsRepositoryRestIT.java @@ -1,5 +1,3 @@ -package org.elasticsearch.plugin.hadoop.hdfs; - /* * Licensed to Elasticsearch under one or more contributor * license agreements. See the NOTICE file distributed with @@ -18,17 +16,20 @@ package org.elasticsearch.plugin.hadoop.hdfs; * specific language governing permissions and limitations * under the License. */ +package org.elasticsearch.plugin.hadoop.hdfs; + +import java.io.IOException; +import java.util.Collection; import com.carrotsearch.randomizedtesting.annotations.Name; import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + +import org.elasticsearch.plugin.hadoop.hdfs.HdfsPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.test.rest.RestTestCandidate; import org.elasticsearch.test.rest.parser.RestTestParseException; -import java.io.IOException; -import java.util.Collection; - public class HdfsRepositoryRestIT extends ESRestTestCase { @Override diff --git a/plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/HdfsTestPlugin.java b/plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/HdfsTestPlugin.java index e980b6a26e3..868e39813e3 100644 --- a/plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/HdfsTestPlugin.java +++ b/plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/HdfsTestPlugin.java @@ -1,5 +1,3 @@ -package org.elasticsearch.plugin.hadoop.hdfs; - /* * Licensed to Elasticsearch under one or more contributor * license agreements. See the NOTICE file distributed with @@ -18,6 +16,7 @@ package org.elasticsearch.plugin.hadoop.hdfs; * specific language governing permissions and limitations * under the License. */ +package org.elasticsearch.plugin.hadoop.hdfs; import java.net.URL; import java.util.Collections; diff --git a/plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/HdfsTests.java b/plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/HdfsTests.java index b4b530e916b..6ea9cd16544 100644 --- a/plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/HdfsTests.java +++ b/plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/HdfsTests.java @@ -32,7 +32,6 @@ import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; -import org.elasticsearch.test.ESIntegTestCase.ThirdParty; import org.elasticsearch.test.store.MockFSDirectoryService; import org.junit.After; import org.junit.Before; @@ -45,7 +44,11 @@ import static org.hamcrest.Matchers.greaterThan; /** * You must specify {@code -Dtests.thirdparty=true} */ -@ThirdParty +// Make sure to start MiniHDFS cluster before +// otherwise, one will get some wierd PrivateCredentialPermission exception +// caused by the HDFS fallback code (which doesn't do much anyway) + +// @ThirdParty @ClusterScope(scope = Scope.SUITE, numDataNodes = 1, transportClientRatio = 0.0) public class HdfsTests extends ESIntegTestCase { @@ -75,10 +78,12 @@ public class HdfsTests extends ESIntegTestCase { } private String path; + private int port; @Before public final void wipeBefore() throws Exception { wipeRepositories(); + port = MiniHDFS.getPort(); path = "build/data/repo-" + randomInt(); } @@ -94,9 +99,9 @@ public class HdfsTests extends ESIntegTestCase { PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo") .setType("hdfs") .setSettings(Settings.settingsBuilder() - //.put("uri", "hdfs://127.0.0.1:51227") + .put("uri", "hdfs://127.0.0.1:" + port) .put("conf.fs.es-hdfs.impl", TestingFs.class.getName()) - .put("uri", "es-hdfs://./build/") + // .put("uri", "es-hdfs:///") .put("path", path) .put("conf", "additional-cfg.xml, conf-2.xml") .put("chunk_size", randomIntBetween(100, 1000) + "k") @@ -178,9 +183,9 @@ public class HdfsTests extends ESIntegTestCase { PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo") .setType("hdfs") .setSettings(Settings.settingsBuilder() - // .put("uri", "hdfs://127.0.0.1:51227/") - .put("conf.fs.es-hdfs.impl", TestingFs.class.getName()) - .put("uri", "es-hdfs:///") + .put("uri", "hdfs://127.0.0.1:" + port) + // .put("uri", "es-hdfs:///") + .put("conf.fs.es-hdfs.impl", TestingFs.class.getName()) .put("path", path + "a@b$c#11:22") .put("chunk_size", randomIntBetween(100, 1000) + "k") .put("compress", randomBoolean())) @@ -215,4 +220,4 @@ public class HdfsTests extends ESIntegTestCase { private long count(Client client, String index) { return client.prepareSearch(index).setSize(0).get().getHits().totalHits(); } -} +} \ No newline at end of file diff --git a/plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/MiniHDFS.java b/plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/MiniHDFS.java new file mode 100644 index 00000000000..a492ce7ae6d --- /dev/null +++ b/plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/MiniHDFS.java @@ -0,0 +1,114 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.hadoop.hdfs; + +import com.carrotsearch.randomizedtesting.RandomizedTest; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.elasticsearch.common.io.PathUtils; +import org.elasticsearch.monitor.jvm.JvmInfo; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Locale; + +public class MiniHDFS { + + private static volatile MiniDFSCluster dfs; + + private static String PORT_FILE_NAME = "minihdfs.port"; + private static String PID_FILE_NAME = "minihdfs.pid"; + + public static void main(String[] args) throws Exception { + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + MiniHDFS.stop(); + } + }); + start(); + } + + public static int start() throws IOException { + if (dfs != null) { + return -1; + } + + Path basePath = getBasePath(); + Path portPath = basePath.resolve(PORT_FILE_NAME); + Path pidPath = basePath.resolve(PID_FILE_NAME); + + if (Files.exists(basePath)) { + RandomizedTest.rmDir(basePath); + } + + Configuration cfg = new Configuration(); + cfg.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, getBasePath().toAbsolutePath().toString()); + // lower default permission + cfg.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY, "766"); + dfs = new MiniDFSCluster.Builder(cfg).build(); + int port = dfs.getNameNodePort(); + + // write port + Files.write(portPath, Integer.toString(port).getBytes(StandardCharsets.UTF_8)); + // write pid + Files.write(pidPath, Long.toString(JvmInfo.jvmInfo().getPid()).getBytes(StandardCharsets.UTF_8)); + + System.out.printf(Locale.ROOT, "Started HDFS at %s\n", dfs.getURI()); + System.out.printf(Locale.ROOT, "Port information available at %s\n", portPath.toRealPath()); + System.out.printf(Locale.ROOT, "PID information available at %s\n", pidPath.toRealPath()); + return port; + } + + private static Path getBasePath() { + Path tmpFolder = PathUtils.get(System.getProperty("java.io.tmpdir")); + // "test.build.data" + String baseFolder = System.getProperty(MiniDFSCluster.PROP_TEST_BUILD_DATA, "es-test/build/test/data"); + return tmpFolder.resolve(baseFolder); + } + + public static int getPort() throws IOException { + Path portPath = getBasePath().resolve(PORT_FILE_NAME); + if (Files.exists(portPath)) { + return Integer.parseInt(new String(Files.readAllBytes(portPath), StandardCharsets.UTF_8)); + } + throw new IllegalStateException(String.format(Locale.ROOT, "Cannot find Mini DFS port file at %s ; was '%s' started?", portPath.toAbsolutePath(), MiniHDFS.class)); + } + + public static long getPid() throws Exception { + Path pidPath = getBasePath().resolve(PID_FILE_NAME); + if (Files.exists(pidPath)) { + return Long.parseLong(new String(Files.readAllBytes(pidPath), StandardCharsets.UTF_8)); + } + throw new IllegalStateException(String.format(Locale.ROOT, "Cannot find Mini DFS pid file at %s ; was '%s' started?", pidPath.toAbsolutePath(), MiniHDFS.class)); + } + + + public static void stop() { + if (dfs != null) { + dfs.shutdown(true); + dfs = null; + } + } +} diff --git a/plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/MiniHDFSCluster.java b/plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/MiniHDFSCluster.java deleted file mode 100644 index 0d700615a1a..00000000000 --- a/plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/MiniHDFSCluster.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.plugin.hadoop.hdfs; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.server.datanode.DataNode; -import org.elasticsearch.common.SuppressForbidden; - -import java.io.File; - -public class MiniHDFSCluster { - - @SuppressForbidden(reason = "Hadoop is messy") - public static void main(String[] args) throws Exception { - FileUtil.fullyDelete(new File(System.getProperty("test.build.data", "build/test/data"), "dfs/")); - // MiniHadoopClusterManager.main(new String[] { "-nomr" }); - Configuration cfg = new Configuration(); - cfg.set(DataNode.DATA_DIR_PERMISSION_KEY, "666"); - cfg.set("dfs.replication", "0"); - MiniDFSCluster dfsCluster = new MiniDFSCluster(cfg, 1, true, null); - FileSystem fs = dfsCluster.getFileSystem(); - System.out.println(fs.getClass()); - System.out.println(fs.getUri()); - System.out.println(dfsCluster.getHftpFileSystem().getClass()); - - // dfsCluster.shutdown(); - } -} diff --git a/plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/UtilsTests.java b/plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/UtilsTests.java index 37aecb04b9b..b0ef392447e 100644 --- a/plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/UtilsTests.java +++ b/plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/UtilsTests.java @@ -1,5 +1,3 @@ -package org.elasticsearch.plugin.hadoop.hdfs; - /* * Licensed to Elasticsearch under one or more contributor * license agreements. See the NOTICE file distributed with @@ -18,6 +16,7 @@ package org.elasticsearch.plugin.hadoop.hdfs; * specific language governing permissions and limitations * under the License. */ +package org.elasticsearch.plugin.hadoop.hdfs; import org.elasticsearch.test.ESTestCase; diff --git a/plugins/repository-hdfs/src/test/resources/rest-api-spec/test/hdfs_repository/20_repository.yaml b/plugins/repository-hdfs/src/test/resources/rest-api-spec/test/hdfs_repository/20_repository.yaml new file mode 100644 index 00000000000..f1f5f7a65e0 --- /dev/null +++ b/plugins/repository-hdfs/src/test/resources/rest-api-spec/test/hdfs_repository/20_repository.yaml @@ -0,0 +1,25 @@ +# Integration tests for HDFS Repository plugin +# +# Check plugin is installed +# +"HDFS Repository Config": + - do: + snapshot.create_repository: + repository: test_repo_hdfs_1 + verify: false + body: + type: hdfs + settings: + # local HDFS implementation + conf.fs.es-hdfs.impl: "org.elasticsearch.repositories.hdfs.TestingFs" + uri: "es-hdfs://./build/" + path: "build/data/repo-hdfs" + + # Get repositry + - do: + snapshot.get_repository: + repository: test_repo_hdfs_1 + + - is_true: test_repo_hdfs_1 + - is_true: test_repo_hdfs_1.settings.uri + - match: {test_repo_hdfs_1.settings.path : "build/data/repo-hdfs"}